我想使用Avro来序列化我的Kafka消息的数据,并想将其与Avro模式存储库一起使用,这样我就不必将模式包含在每条消息中。
将Avro与Kafka结合使用似乎是一件很流行的事情,许多博客/堆栈溢出问题/用户组等都提到了将模式Id与消息一起发送,但我找不到一个实际的示例来说明它应该去哪里。
我想它应该放在Kafka消息头的某个地方,但我找不到一个明显的地方。如果它在Avro消息中,则必须根据模式对其进行解码,以获取消息内容并显示需要解码的模式,这显然存在问题。
我正在使用C#客户端,但任何语言的示例都很好。message类具有以下字段:
public MessageMetadata Meta { get; set; }
public byte MagicNumber { get; set; }
public byte Attribute { get; set; }
public byte[] Key { get; set; }
public byte[] Value { get; set; }
但这些似乎都不正确。MessageMetaData只有Offset和PartitionId。
那么,Avro模式Id应该放在哪里呢?
模式id实际上是在avro消息本身中编码的。看看这个,看看编码器/解码器是如何实现的。
一般来说,当您向Kafka发送Avro消息时会发生什么:
0x0
字节,模式ID是一个4字节的整数值其余的是实际编码的消息。当您解码信息时,会发生以下情况:
如果您的密钥是Avro编码的,那么您的密钥将采用上述格式。值也是如此。这样,您的键和值可能都是Avro值并使用不同的模式。
编辑以回答评论中的问题:
实际的模式存储在模式存储库中(这实际上是模式存储库的整个点,用于存储模式:)。Avro对象容器文件格式与上述格式无关。Kafka夫罗编码器/解码器使用略有不同的消息格式(但实际消息的编码方式完全相同)。
这些格式之间的主要区别在于,对象容器文件携带实际的模式,并且可能包含与该模式对应的多条消息,而上面描述的格式只携带模式id,并且只携带与该模式对应的一条消息。
传递对象容器文件编码的消息可能不容易理解/维护,因为一条Kafka消息将包含多条Avro消息。或者您可以确保一条Kafka消息只包含一条Avro消息,但这会导致每个消息都带有模式。
Avro模式可能非常大(我见过600 KB或更大的模式),将模式与每条消息一起携带会非常昂贵和浪费,因此这就是模式存储库的作用所在-模式只提取一次,并在本地缓存,所有其他查找都只是快速的映射查找。
因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?
我正在使用带有Avro和汇流模式注册表的Spring云流。我正在为所有服务使用一个单独的DLQ主题,因此具有不同模式的消息可能会落在这个主题中。我已禁用动态架构注册,以确保不传递错误消息()。 然而,问题是由于dlq上缺少模式,我可能会在进入这个主题时丢失一条消息。因此,我希望能够以JSON格式向dlq生成消息,并在管道的其余部分使用Avro。如果有人能帮助我如何做到这一点,或者能为我指出这件事的
不知何故,除了使用case类将AVRO消息转换为DataFrame之外,我找不到其他方法。是否有可能改用模式?我使用的是和。 完整的代码,如果你感兴趣的话。
我使用hikaricp作为我的数据库连接池。当我完成我的SQL语句时,我将关闭连接,在连接上调用关闭,我相信您应该将连接代理返回到池。然而,我看到以下警告(不是错误)消息,我不得不怀疑这是否是一个需要解决的问题,因为我没有正确清理我的连接资源。我不是使用资源尝试,而是使用尝试捕获最后(我在最后关闭连接
我已经创建了要将它们连接在一起的kstream。两个流的输出如下所示: 流1: 流2: 我想创建这两个Stream的连接流(内连接),所以我创建了以下KStream: 在这个KStream中,我只使用了一个连接,我正在更改输出消息的格式,仅此而已。 通过一个例子,我将解释我想做什么: 在窗口内发布以下消息: 流1 流2 加入流 出版的是什么 我想出版什么 总之,我只想在窗口中发布最新消息,而不是所