当前位置: 首页 > 知识库问答 >
问题:

无法读取Kafka-Avro架构消息

施默
2023-03-14

有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。

以下是技术堆栈:

  1. LogStash 2.3-当前生产版本
  2. 汇流3.0。
  3. 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。
  4. 动物园管理员:3.4.6
  5. Kafka:0.10.0.0

Logstash配置文件如下所示:

input {
stdin{}
}

filter {
mutate {
remove_field => ["@timestamp","@version"]
  }
}

output {
  kafka {
topic_id => 'logstash_logs14'

codec => avro  { 
schema_uri => "/opt/logstash/bin/schema.avsc"
    }
  }
}

schema.avsc文件如下所示:

{
    "type":"record",
    "name":"myrecord",
    "fields":[
        {"name":"message","type":"string"},
        {"name":"host","type":"string"}
        ]
}

2在自己的终端启动Kafka

./bin/kafka-server-start ./etc/kafka/server.properties

3在自己的终端中启动架构注册表

./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

4从logstash目录中,运行以下命令

bin/logstash -f ./bin/logstash.conf
./bin/kafka-avro-console-consumer --zookeeper localhost:2181 --topic logstash_logs14 --from-beginning
_While consuming we get the following error:_

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/kafka-serde-tools/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/root/confluent-3.0.0/share/java/schema-registry/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Processed a total of 1 messages
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
[2016-06-08 18:42:41,627] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$:103)
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

请告诉我如何解决这个问题

谢谢,乌彭德拉

共有1个答案

贺自明
2023-03-14

你是如何写/出版Kafaka的?您看到SerializationException是因为数据不是使用schema-registry(或KafkaAvroSerializer)编写的,但是当您使用schema-registry时,kafka-avro-console-consumer在内部使用schema-registry(或KafkaAvroDeserializer),它希望数据具有某种格式(特别是 )。如果您使用kafka-avro-console-producer来编写avro数据,那么您不应该得到这个异常,或者您可以在键和值序列化器的生成器属性中设置KafkaAvroSerializer,还可以设置schema-registry-url。

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
 类似资料:
  • Debezium连接器的Kafka connect事件是Avro编码的。 在传递给Kafka connect standalone服务的connect-standalone.properties中提到了以下内容。 使用这些属性配置Kafka使用者代码: 在消费者实现中,下面是读取键和值组件的代码。我使用REST从模式注册表中获取键和值的模式。 解析密钥工作正常。在解析消息的值部分时,我得到了Arr

  • (最终目标)在尝试是否最终可以从Confluent平台读取avro数据usng spark stream之前,如这里所述:将spark结构化流与Confluent Schema Registry集成 我要验证是否可以使用以下命令来读取它们: 我收到这个错误消息,未知的魔法字节 注意,可以这样读取消息(使用console consumer而不是avro console consumer): 该消息是

  • 我正在编写一个REST代理,就像合流REST代理一样。它接受JSON负载、模式主题和id,然后将JSON负载作为Avro对象写入流中。当我使用kafka avro控制台消费者阅读消息时,我收到了“未知魔法字节”错误。 这是我的Kafka制作人配置: 这就是REST控制器如何将传入的JSON转换为Avro 这是toAvro方法的实现: 然后将此对象传递给我使用上面给出的属性配置的SchemaVali

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我有一个Kafka集群(版本:0.10.1.0),有9个代理和10个分区。 我尝试使用camel kafka从java应用程序中获取消息。这是我的pom。xml 这只是我使用的与骆驼Kafka相关的依赖项。下面是骆驼Kafka消费者代码。 我正在使用文档中指定的KafkaURIhttps://camel.apache.org/components/latest/kafka-component.ht

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https: