当前位置: 首页 > 知识库问答 >
问题:

如何从kafka中接收到的消息中获取Avro模式对象?

慕兴平
2023-03-14

我尝试向kafka发布/使用我的java对象。我使用Avro模式。

我的基本程序运行良好。在我的程序中,我在生产者(用于编码)和消费者(用于解码)中使用我的模式。

如果我在接收者处将不同的对象发布到不同的主题(例如:100个主题),我不知道我收到了什么类型的消息?...我想从接收到的字节中获取avro模式,并想将其用于解码...我的理解正确吗?如果是这样,我如何从接收到的对象中检索?

共有1个答案

田德运
2023-03-14

您不会在收到的字节中收到Avro模式,而且您也不想这样做。Avro的整个想法是将模式与记录分离,使其成为一种更加紧凑的格式。按照我的方式,我有一个主题叫做模式。Kafka消费者流程要做的第一件事是从一开始就听这个主题,并解析所有模式。

Avro模式只是JSON字符串对象——您可以在Schema主题中为每条记录存储一个模式。

至于找出哪个模式与哪个主题相匹配,正如我在前面的回答中所说的,每个主题只需要一个模式,而不是更多。所以,当您解析来自特定主题的消息时,您确切地知道模式适用于什么,因为只有一个模式。

如果从不重复使用该模式,则可以将该模式命名为与主题相同的名称。然而,在实践中,您可能会在多个主题上使用相同的模式。在这种情况下,您希望有一个单独的主题,将架构映射到主题。您可以创建如下Avro模式:

{"name":"SchemaMapping", "type":"record", "fields":[
  {"name":"schemaName", "type":"string"},
  {"name":"topicName", "type":"string"}
]}

您可以使用Avro编码的映射将每个主题发布一条记录到一个特殊的主题中,例如称为“模式映射”(SchemaMapping),并且在从一开始就使用了模式主题之后,消费者将收听模式映射(SchemaMapping),之后它将确切地知道为每个主题应用哪个模式。

 类似资料:
  • 我正在运行一个简单的Kafka streams应用程序,它将使用Node JS记录的信息带到一个Kafka主题。 还需要注意的是,时间戳只是一个数字,表示自1970年6月以来的秒数。 我使用scala中的Kafka流来使用这些数据。 例如。 然而,我不确定如何将时间戳(我从nodeJS发送的)提取到这个流中。 例如,如果我尝试做这样的事情 这会导致错误“无法解析符号流”。我在想我该怎么解决这个问题

  • 我正在使用弹性搜索Kafka连接在独立模式下。我不困惑使用哪种配置来启动Kafka连接并从最后一个故障点开始。 例如,生产者将继续推动记录进入Kafka和消费者,因为弹性搜索接收器连接器正在消费,现在我的由于某种原因我的消费者下降了,但我的骄傲将继续推动信息进入Kafka。现在,当我修复了ES sink连接器端的问题后,如果我重新启动ES sink连接器,它应该从上次故障中选择,而不是从开始或最近

  • 我有一个avro架构,我想从中提取所有字段名称。有什么办法可以做到这一点吗? 测试架构是这样的: 这是代码: 以上是这样打印出来的: 但我希望它只返回数组列表中的“左”和“右”,而不返回其他内容。现在,它还返回类型和pos,我不需要它们。有什么办法可以做到吗?

  • 我从以下链接获得所有信息: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaConsumer.html 当我们运行consumer时,我们没有从consumer端得到任何通知。请给我一点主意。

  • 我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只