当前位置: 首页 > 知识库问答 >
问题:

无法通过kafka avro控制台读取avro消息消费者(最终目标通过spark streaming读取)

燕宏胜
2023-03-14

(最终目标)在尝试是否最终可以从Confluent平台读取avro数据usng spark stream之前,如这里所述:将spark结构化流与Confluent Schema Registry集成

我要验证是否可以使用以下命令来读取它们:

$ kafka-avro-console-consumer \
> --topic my-topic-produced-using-file-pulse-xml \
> --from-beginning \
> --bootstrap-server localhost:9092 \
> --property schema.registry.url=http://localhost:8081

我收到这个错误消息,未知的魔法字节

Processed a total of 1 messages
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2020-09-10 12:59:54,795] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:76)
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

注意,可以这样读取消息(使用console consumer而不是avro console consumer):

kafka-console-consumer \
--bootstrap-server localhost:9092 --group my-group-console \
--from-beginning \
--topic my-topic-produced-using-file-pulse-xml

该消息是使用conFluent connect file-Pulch(1.5.2)读取xml文件(Stremoth/kafka-Connect-file-Pulp)生成的

请在此处提供帮助:我是否错误地使用了kafka-avro-console-消费者?我尝试了此处描述的“反序列化器”属性选项:https://stackoverflow.com/a/57703102/4582240,没有帮助

我不想勇敢地开始火花流读取数据。

我使用的文件pulse 1.5.2属性如下所示,于2020年9月11日添加完成。

name=connect-file-pulse-xml
connector.class=io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector
topic= my-topic-produced-using-file-pulse-xml
tasks.max=1

# File types
fs.scan.filters=io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter
file.filter.regex.pattern=.*\\.xml$
task.reader.class=io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader
force.array.on.fields=sometagNameInXml

# File scanning
fs.cleanup.policy.class=io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy
fs.scanner.class=io.streamthoughts.kafka.connect.filepulse.scanner.local.LocalFSDirectoryWalker

fs.scan.directory.path=/tmp/kafka-connect/xml/
fs.scan.interval.ms=10000

# Internal Reporting
internal.kafka.reporter.bootstrap.servers=localhost:9092
internal.kafka.reporter.id=connect-file-pulse-xml
internal.kafka.reporter.topic=connect-file-pulse-status

# Track file by name
offset.strategy=name

共有1个答案

夏俊人
2023-03-14

如果您向消费者获取未知魔法字节,则生产者没有使用ConFluent AvroSerializer,并且可能推送了不使用Schema注册表的Avro数据。

如果看不到生产者代码或使用和检查二进制格式的数据,很难知道是哪种情况。

消息是使用汇合连接文件脉冲生成的

您是否使用了值。AvroConverter类的转换器?

 类似资料:
  • 我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。 这是我的Kafka制作人配置: 这就是REST控制器如何将传入的JSON转换为Avro 这是toAvro方法的实现: 然后将此对象传递给我使用上面给出的属性配置的SchemaVali

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 我有行车记录仪公司卡,用于在允许客户通过远程下载方式下载行车记录仪数据之前,对客户进行身份验证。下面代码中使用的APDU命令是行车记录仪和公司卡之间成功认证的命令。 双方之间的联系如下: 行车记录仪 如上所述的通信运行良好,应用用户正在进行身份验证。现在我试图读取一些信息直接从公司卡没有应用程序如下: 我的客户端程序 在我的客户端程序中,我使用的是从Android应用程序发送到公司卡的相同APDU

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我们有一个Kafka制作人,偶尔会制作一些信息。 我写了一个消费者来消费这些消息。问题是,只有当两个消息叠加时,它们才会被使用。例如,如果消息是在13:00产生的,消费者不做任何事情。如果另一条消息是在13:01生成的,则消费者会使用这两条消息。在kafkaTool中,在消费者属性中有一个名为LAG的列,当消息未被消费时,该列为1。我缺少的这个东西有什么配置吗? 消费者配置:

  • 我正在为Kafka0.9.0.0做Kafka快速入门。 我让zookeeper在监听,因为我运行了 只有一个代理在处侦听,因为我运行了 我有一个制作人在主题“测试”上发帖,因为我跑了 当我运行旧的API使用者时,它通过运行 但是,当我运行新的API使用者时,我在运行时没有得到任何东西 是否可以使用新的API从控制台使用者订阅主题?我该怎么修好它?