当前位置: 首页 > 知识库问答 >
问题:

有没有一种方法可以在没有魔法字节的情况下使用Kafka模式注册表?

申屠瀚海
2023-03-14

我试图使用confluent的模式注册表使我的应用程序工作,但此时我并不是生产者的完全控制者,你甚至可以将它们视为不绑定confluent产品的遗留应用程序

我正在查看融合信息,似乎所有消息都应该在有效负载中包含一个魔法字节和模式ID
https://docs.confluent.io/3.2.0/schema-registry/docs/serializer-formatter.html

或者,当我尝试使用它时,我会出现一个错误:

[2020-09-25 13:12:09,008] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:468)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:200)
            at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
            at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic com.obj_pos to Protobuf: 
            at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:123)
            at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
            at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:491)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
            at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
            ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-25 13:12:09,010] ERROR WorkerSinkTask{id=s3_parquet_connector-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)

我的问题是,是否有一种方法可以禁用这个神奇的字节检查,或者我可以创建一个Kafka流,只在初始消息中附加一个this 5字节,这样之后我就可以与连接到模式注册表的消费者一起使用它。

发生的事情是生产者超出了我的控制,所以我需要以某种方式能够反序列化不包含这5个字节的消息,因为它们是由不依赖于汇合序列化器/反序列化器的生产者产生的

共有1个答案

晁璞
2023-03-14

它们由不依赖合流连载器的制作人制作

那么问题不在注册处。

您不应该使用Confluent编写的转换器来使用这些消息,因为这些消息绑定到注册表,并且没有办法跳过它。

您可以使用BlueAporter(假设数据真的是protobuf),或者编写自己的转换器类。

 类似资料:
  • 我想在我的spring web应用程序中添加几个过滤器,但至少现在不会有任何关于安全性的内容。所以。没有spring-security我所能做的就是在web.xml中定义多个过滤器(定义过滤器的旧方法)。要能够使用spring过滤器链,我需要为我的项目添加spring-security作为依赖项,这似乎很奇怪。也许我做错了什么,而且确实有过滤器链可以在没有spring-security依赖的情况下

  • vanilla apache Avro和带有confluent schema registry的Avro的区别在于,当使用apache Avro时,我们在kafka主题中发送,而在confluent schema registry中,我们在kafka主题中发送?所以在这里,schema registry通过在registry中查找schema来帮助提高性能。使用confluent schema r

  • 所以我想要一个“Void Repository”,通过它可以访问不一定在实体上操作的存储过程。 但这当然不起作用,因为期望是一个实体。 有没有一种方法可以使用注释而无需创建虚拟实体,或者我是否坚持使用使用通过准备好的语句进行查询的已实现类? 因为老实说,这很难看:

  • 问题内容: 我想使用java.util.Preferences API,但是我不想让程序尝试读取或写入Windows注册表。我将如何处理? 问题答案: 我相信您已经使用Java读取了对Windows注册表的读/写操作,然后您希望在使用API 时拥有另一个不同于注册表的后端 如本文所述,您可以像Bernhard或Croft一样扩展API: 因为首选项API是后端中立的,所以您不必关心数据是存储在文件

  • 问题内容: 关于将行旋转为各种数据库的列,有很多文章。他们似乎分为两个阵营,使用案例陈述或使用数据库供应商的内置功能。我正在使用 MySQL ,到目前为止,在任何内置函数上都找不到任何东西,这将使我能够选择任意数量的行值(我想将其转换为列)。如果我不提前知道这些值,则无法构建经常出现在的CASE查询。我想知道在MySQL中是否有类似于其他数据库中称为交叉表或数据透视表的东西: -Postgresq

  • 问题内容: 如果通过创建缓冲区,则该内存将位于Java堆之外。有没有一种方法可以以跨平台的方式测量应用程序中此类内存的使用情况,类似于我可以使用and 来测量Java堆使用情况的方法? 问题答案: 您可以使用反射来获取Java 7的OpenJDK / HotSpot。没有独立于平台的方式,它仅通过ByteBuffer.allocateDirect()向您显示用法,而没有其他分配本地内存的方式。 另