当前位置: 首页 > 知识库问答 >
问题:

发送字节数组并将其反序列化为带有kafka的Avro记录

师成弘
2023-03-14

我正在向kafka发送对应于Avro记录的字节数组。

生产商:

props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro ClientOrderRequest Producer");
props.put("schema.registry.url", "schema-registry address");

KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);

        
while (true) {
    ClientOrderRequest clientOrderRequest = createClientOrderRequest();
    byte[] bytes = toByteArray(clientOrderRequest);
    final ProducerRecord<String, byte[]> producerOrderRequest = new ProducerRecord<>("client-order-request",
                "ClientOrderRequest-" + calendar.getTimeInMillis(), bytes);
    producerRequest.send(producerOrderRequest);
    producerRequest.flush();
    System.out.println("Produced 1 record.");
    Thread.sleep(2000);
}

消费者:

props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "schema-registry address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "events-group-3");

KafkaConsumer<String, ClientOrderRequest> clientOrderRequestConsumer = new KafkaConsumer<>(props);
        clientOrderRequestConsumer.subscribe(Collections.singletonList("client-order-request"));

while (true) {
    ConsumerRecords<String, ClientOrderRequest> records = clientOrderRequestConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, ClientOrderRequest> record : records) {
        String key = record.key();
        ClientOrderRequest value = record.value();
            System.out.println(key);
            System.out.println(value);
        }
    }
}

生产者能够将字节数组发送到主题。但消费者无法将其反序列化。我有一个错误:

无法将B转换为com。瑞士报价。埃弗雷克斯。生成。阿夫罗。交易。客户端请求

当我生成消息时,客户端订单请求值主题已在模式注册表上注册。我知道我发送了一个字节数组,并且kafka/模式注册表期望一个Avro记录,但我希望它能够反序列化它。

如果不能使用简单的AvroKafkaSerializer,我应该实现自己的序列化程序吗?

共有1个答案

李星辰
2023-03-14

什么是toByteArray?

您不需要手动序列化数据。这就是serializer类的全部要点。不清楚您为什么将此更改为之前的问题

ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
                "ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
producerRequest.send(producerOrderRequest);
producerRequest.flush();

当您使用KafkaAvroSerializer.class发送字节数组时,它会发送{"type":"bytes"}的Avro对象,而不是您的自定义记录

如果确实要直接发送字节数组,可以使用ByteArraySerializer,但这会绕过架构注册表

 类似资料:
  • 问题内容: 我目前无法在KSTREAM APP 中 反序列化avro PRIMITIVE密钥 用avro模式编码的密钥(已在模式注册表中注册), 当我使用kafka-avro-console-consumer时,我可以看到密钥已正确反序列化 但是不可能使其在KSTREAM应用程序中工作 密钥的avro模式是主要的: 我已经关注了合流的文档 它对于该值工作得很好,但是该键将是一个字符串,该字符串包含

  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在

  • 问题内容: 我正在尝试使用Gson反序列化从Web服务返回的json字符串 该结构将以返回。 哪里像 和ItemDTO就像 当我按如下方式调用代码时 对象内部的所有内容均为空 但是,如果我使用 并将它们从org.json罐子中逐段拉出,效果很好,并相应地填充了字段。 关于我在做什么错的任何想法吗?Gson非常快吗?还是我最好坚持自己已经做的工作​​? 谢谢大卫 问题答案: 原始问题中的示例Java

  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju