当前位置: 首页 > 知识库问答 >
问题:

如何使用嵌入式模式从Kafka反序列化Avro

孙元明
2023-03-14

我收到Kafka主题中的二进制Avro文件,我必须对它们进行反序列化。在Kafka收到的消息中,我可以在每条消息的开头看到一个模式。我知道不嵌入模式并将其与实际的Avro文件分离是一种更好的做法,但我无法控制生产者,也无法更改。

我的代码在Apache Storm上运行。首先,我创建一个读卡器:

mDatumReader = new GenericDatumReader<GenericRecord>();

然后,我尝试反序列化消息,但不声明架构:

Decoder decoder = DecoderFactory.get().binaryDecoder(messageBytes, null);
GenericRecord payload = mDatumReader.read(null, decoder);

但当消息到达时,我会收到一个错误:

Caused by: java.lang.NullPointerException: writer cannot be null!
at org.apache.avro.io.ResolvingDecoder.resolve(ResolvingDecoder.java:77) ~[stormjar.jar:?]
at org.apache.avro.io.ResolvingDecoder.<init>(ResolvingDecoder.java:46) ~[stormjar.jar:?]
at org.apache.avro.io.DecoderFactory.resolvingDecoder(DecoderFactory.java:307) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.getResolver(GenericDatumReader.java:122) ~[stormjar.jar:?]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:137) ~[stormjar.jar:?]

我看到的所有答案都是关于使用其他格式、更改传递给Kafka或其他东西的消息。我无法控制这些事情。

我的问题是,给定一条二进制消息中嵌入模式的字节消息,如何在不声明模式的情况下反序列化Avro文件,以便我可以读取它。

共有2个答案

景鹏飞
2023-03-14

>

  • 添加Maven依赖性

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>1.9.1</version>
        <type>maven-plugin</type>
    </dependency>
    

    创建如下文件

     {"namespace": "tachyonis.space",
       "type": "record",
       "name": "Avro",
       "fields": [
          {"name": "Id", "type": "string"},
        ]
      }
    

    以上另存为Avro。src/main/resources中的avsc。

    在Eclipse或任何IDE运行中

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL_CONFIG);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
    KafkaConsumer<String, Avro> consumer = new KafkaConsumer<>(props);
    

    消费者/生产者必须在同一台机器上运行。否则,您需要在Windows/Linux中配置主机文件,并将所有组件配置属性从localhost更改为映射到实际IP地址,以便广播给生产者/消费者。否则会出现网络连接问题等错误

    Connection to node -3 (/127.0.0.1:9092) could not be established. Broker may not be available
    

  • 元胡媚
    2023-03-14

    对于DatumReader/Writer,没有像嵌入式模式这样的东西。是我在看Avro时的误解

    DataFileWriter在文件的开头编写模式,然后使用GenericDatumWriter添加GenericRecords。

    既然您刚开始说有一个模式,我假设您可以读取它,将其转换为模式对象,然后将其传递给GenericDatumReader(模式)构造函数。想知道消息是如何序列化的。也许DataFileWriter用于写入字节[],而不是实际的文件,那么您可以使用DataFileReader来反序列化数据?

     类似资料:
    • 我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign

    • 我正在使用kafka从源接收数据,我正在使用用< code>Node.js编写的消费者应用程序,并使用< code>kafka-node连接到kafka服务器。另一方面,生产者是用< code>Java编写的,他们使用一些kafka流库来产生带有模式的avro消息。我可以接收消息,但它们是avro序列化的,下面是我接收的序列化消息格式- 我正在尝试反序列化它,但无法使用 npm模块,因为avsc只

    • 我正在学习Kafka,对我来说,使用Avro有一个Kafka主题和图式是有意义的。 但是当涉及到将模式定义放在何处时,我缺少了一些东西: 当序列化和反序列化消息时,有什么方法可以从模式定义中获益吗? 我找不到任何在两端都这样做的例子,特别是使用模式注册表。

    • 我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju

    • 我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc 示例JSON 版本2只是有一个带有默认值的附加描述字段。 SimpleV2.avsc 示例JSON 这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。 检查读者作家兼容性方法确认模式是

    • 如何配置RESTTemplate来使用Jackson的@JsonDeserialize反序列化响应Json? 我的基于Builder模式的域类和Jackson的@JSONdeserialize marshall和unmarshall fine在单元测试中都很好。但是,当与Spring的RESTTemplate(Spring 3.1)结合使用时,它会失败。