当前位置: 首页 > 知识库问答 >
问题:

流式应用程序,正在注册的模式与主题的早期模式不兼容

步衡
2023-03-14

我有一个源连接器,可以推送到新创建的主题。“key.converter”和“value.converter”未配置,因此转换是使用Avro完成的。

"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",

当Kafka Connect填充此主题时,它会在注册表中创建一个主题名为“主题名键”和“主题名值”的模式。例如,此模式可能具有如下配置的字段

"fields: [{"name" : "user_id", "type" : "long"}]

但当记录被拉入流媒体应用程序时,Kafka的代码(Kafka流:3.0.1)有一个模式伴随着数据。有时(并非总是)关联的模式具有不同的数据类型(user\u id是int而不是long)。

"fields: [{"name" : "user_id", "type" : "int"}]

抛出异常,并显示以下消息“正在注册的模式与主题的早期模式不兼容”。

问题:

  1. 当schemas.enable=false时,为什么模式伴随Avro消息?
  2. 为什么Kafka不使用关联的模式,而不是从模式注册表中查找它,比较注册表版本,并抱怨不兼容?

错误:

org.apache.kafka.streams.errors。StreamsException:进程中捕获到异常。taskId=0\u 1,processor=KSTREAM-SOURCE-0000000000,topic=member\u device\u 5,partition=1,offset=0,stacktrace=org.apache.kafka.common.errors。InvalidConfigurationException:正在注册的架构与主题“foo-campaign-KSTREAM-JOINOTHER-000000000 6-store-changelog-key”的早期架构不兼容;错误代码:409

共有1个答案

幸越泽
2023-03-14

为什么模式在运行模式时会伴随Avro消息。启用=false?

因为Avro总是有一个模式。它不能被禁用。该属性仅适用于JSONConverter(查看源代码)。

为什么Kafka不使用关联的模式而不是从模式注册表中查找它,

Kafka不在乎。您自己应用程序中的Serde在乎。如果您使用了特定的模式,您需要修改StreamsConfig属性映射以启用“specific.avro.reader”。否则,它将在反序列化期间使用注册表中的模式。

其次,foo-proxing-KSTREAM-JOINO-0000000006-store-change elog-key是注册表中自己的模式,与连接器写入Kafka的主题没有直接关系。

最重要的是,Avro long类型不能降级为int类型

 类似资料:
  • 我有一个关于使用Kafka和主题(Kafka代理)和主题(Schema注册表)的不同名称设置流处理器的问题。 首先,任何操作似乎都可以与 Kafka 代理和模式注册表一起工作,但是如果处理器收到该事件,则模式注册表将魔术开始。 而不是将abc作为主题发送到模式注册表abc。bla将被发送。架构注册表的回答为“未找到”。 预期:localhost:8081/subjects/ABC/versions

  • 我正在了解Confluent的模式注册表,以满足所有模式管理需求。 我不太理解他们的版本控制方法...有一个的概念,我将其视为一个名称空间。据我所知,subject在模式注册表中必须是唯一。 然后是模式id,或者只是,它也是唯一的。 最后,还有一个。 以下是文档中的片段: :此主题的架构版本,每个主题从1开始 :全局唯一的架构版本id,在所有主题中的所有架构中都是唯一的 因此,一旦我想修改特定主题

  • 我现在一直在查看Spring Cloud模式注册表和汇合模式注册表。我可以看到一些区别,例如Spring Cloud模式注册表将模式保存在普通数据库中,默认情况下保存在h2中,而汇合模式注册表保存在kafka主题中。 spring云模式注册表的这种方法是否会对性能产生任何影响。据我所知,即使数据保留在主题上,以防汇合,查询它时仍然会有延迟。但会有重大影响吗? 我还可以看到,spring云模式注册表

  • 我试图从REST代理发布json模式,但遇到异常 curl-k-x post-h“content-type:application/vnd.schemaregistry.v1+json”--数据“{”schema“:”{“type”:“object”,“properties”:{“firstname”:{“type”:“string”},“lastname”:{“type”:“string”},“

  • 我试图用函数编程(和spring cloud stream)转换来自输入主题的输入AVRO消息,并在输出主题上发布新消息。下面是我的转换函数: 我的spring boot应用程序是以这种方式声明的,并激活了模式注册表客户机: 谢谢你能给我带来的任何帮助。 视CG

  • 我正在使用从oracle db获取数据,并按下(两个键 我有一个Kafka流收听这个主题,并有avro Genericrecord。当我启动流时,我开始得到<code>ClassCastException:java.lang.Long不能强制转换为org.apache.avro.generic。GenericRecordconnect生成的架构具有数据类型为“long”的字段 有人对如何解决这个问