我一直在尝试将kafka-avro-console-consumer从Confluent连接到我们的遗留Kafka集群,该集群是在没有Confluent模式注册表的情况下部署的。我使用以下属性显式提供了模式:
kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
--topic test \
--from-beginning \
--property key.schema='{"type":"long"}' \
--property value.schema='{"type":"long"}'
但我得到的是‘未知的魔法字节!’org.apache.kafka.common.errors.serializationexception
出错
是否可以使用Confluent kafka-avro-console-consumer(未使用Confluent的AvroSerializer序列化)和Schema Registry来使用来自Kafka的Avro消息?
汇合模式注册表序列化器/反序列化器使用有线格式,该格式在消息的初始字节中包含关于模式ID等的信息。
如果您的消息没有使用架构注册表序列化程序序列化,那么您将无法用它反序列化它,并且将得到unknown magic byte!
错误。
因此,您需要编写一个使用者来提取消息,使用Avro avsc模式执行反序列化,然后假设您希望保留数据,使用模式注册表序列化器重新序列化它
编辑:我最近写了一篇文章,更深入地解释了这整件事:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explaned
我有一个问题反序列化来自Kafka主题的消息。这些消息已经使用spring-cloud-stream和Apache Avro序列化。我正在用斯普林斯·Kafka阅读它们,并试图反序列化它们。如果我使用spring-cloud来生成和使用消息,那么我就可以很好地反序列化消息。问题是当我用Spring Kafka消费它们,然后试图反序列化。 我正在使用一个模式注册表(用于开发的spring-boot模
我用本地安装的Confluent4.0.0尝试了官方模式-注册表-汇合示例(Consumer/Producer),它可以在发送post请求和在listener接收时发送“Sensor”avro消息,但当我使用Confluent4.0.0附带的kafka-avro-console-consumer工具查看发送的avro消息时,该工具引发了以下错误(a)。我还尝试使用kafka-avro-consol
我需要编写外壳脚本来读取特定主题上的kafka消息,使用“bin/kafka-console-consumer.sh--主题快速启动-事件--from-开始--引导服务器localhost:9092”,只需要一定的时间,比如10秒。使用选项--time out 10s没有帮助,因为如果特定主题上的消息连续出现,该过程不会在10s之后停止。请建议如何做同样的事情。
我正在尝试使用镜头从MQTT到Kafka的消息。ioReactor。流式反应器的最新版本 Kafka/融合版 预期行为:avro主题应打印在控制台上 org.apache.kafka.common.errors。序列化异常:未知的魔法字节! 附加细节 连接器属性配置(我的连接器属性) avro json文件 MQTT代理消息 完整日志 还在模式注册表上注册了模式 我会做错什么?
我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。 这是我的Kafka制作人配置: 这就是REST控制器如何将传入的JSON转换为Avro 这是toAvro方法的实现: 然后将此对象传递给我使用上面给出的属性配置的SchemaVali
我正在尝试用3个代理&Zookeeper来测试运行一个单独的Kafka节点。我希望使用控制台工具进行测试。我是这样管理制作人的: 然后我以这样的方式运行消费者: 我可以在生产者中输入消息,并在消费者中看到它们,这是预期的。但是,当我使用bootstrap-server运行消费者的更新版本时,我什么也得不到。例如: 当我有一个代理在端口9092上运行时,这工作得很好,所以我彻底搞糊涂了。有没有办法让