当前位置: 首页 > 知识库问答 >
问题:

Flinkkafka消费者/制作人

公羊宇定
2023-03-14

大家好,我正在努力将一个简单的avro模式与模式注册表一起序列化。

设置:

  • 两个用java编写的Flink jobs(一个消费者,一个生产者)

目标:生产者应该发送一条用ConfluentRegistryAvroSerializationSchema序列化的消息,其中包括更新和验证模式。

然后,使用者应将消息反序列化为具有接收到的模式的对象。使用ConfluentRegistryAvroDeserializationSchema

到目前为止还不错:如果我将架构注册表上的主题配置为FORWARD-兼容,生产者将正确的avro架构写入注册表,但它以错误结束(即使我完全永久地删除主题第一):

Failed to send data to Kafka: Schema being registered is incompatible with an earlier schema for subject "my.awesome.MyExampleEntity-value"

已成功编写架构:

{
        "subject": "my.awesome.MyExampleEntity-value",
        "version": 1,
        "id": 100028,
        "schema": "{\"type\":\"record\",\"name\":\"input\",\"namespace\":\"my.awesome.MyExampleEntity\",\"fields\":[{\"name\":\"serialNumber\",\"type\":\"string\"},{\"name\":\"editingDate\",\"type\":\"int\",\"logicalType\":\"date\"}]}"
}

接下来,我可以尝试将可兼容性设置为NONE

如果我这样做,我可以在Kafka上生成数据,但是:模式注册表有一个新版本的我的模式,如下所示:

{
        "subject": "my.awesome.MyExampleEntity-value",
        "version": 2,
        "id": 100031,
        "schema": "\"bytes\""
}

现在,我可以生成数据,但消费者无法反序列化此模式,并发出以下错误:

Caused by: org.apache.avro.AvroTypeException: Found bytes, expecting my.awesome.MyExampleEntity
...

我目前不确定问题到底出在哪里。即使我完全永久地删除主题(包括模式),我的生产者也应该从头开始正常工作,用模式注册一个完整的“新”主题。另一方面,如果我将兼容性设置为“NONE”,模式交换无论如何都应该通过注册一个可以被消费者读取的模式来工作。

有人能帮我吗?

共有2个答案

丁恩
2023-03-14

序列化的整个问题是关于kafka配置中以下标志的使用:

"schema.registry.url"
"key.serializer"
"key.deserializer"
"value.serializer"
"value.deserializer"

在flink中设置这些标志,即使它们在逻辑上是正确的,也会导致不可调试的模式验证和序列化混乱。省略了所有这些标志,效果很好。注册表url只需在ConfluentRegistryAvro(De)serializationSchema中设置。

韦澄邈
2023-03-14

根据最新的合流文档NONE:模式兼容性检查被禁用文档:

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我正在Java中实现一个简单的Kafka消费者。代码如下: 我在网上查看的任何文档都给出了range或roundrobin作为可能的分配策略,据我所知,groupId是一个自定义名称。不确定这里什么是正确的配置值。