我正在向kafka发送对应于Avro记录的字节数组。
生产商:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "Kafka Avro ClientOrderRequest Producer");
props.put("schema.registry.url", "schema-registry address");
KafkaProducer<String, ClientOrderRequest> producerRequest = new KafkaProducer<>(props);
while (true) {
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
byte[] bytes = toByteArray(clientOrderRequest);
final ProducerRecord<String, byte[]> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), bytes);
producerRequest.send(producerOrderRequest);
producerRequest.flush();
System.out.println("Produced 1 record.");
Thread.sleep(2000);
}
消费者:
props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "broker-address");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put("schema.registry.url", "schema-registry address");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "events-group-3");
KafkaConsumer<String, ClientOrderRequest> clientOrderRequestConsumer = new KafkaConsumer<>(props);
clientOrderRequestConsumer.subscribe(Collections.singletonList("client-order-request"));
while (true) {
ConsumerRecords<String, ClientOrderRequest> records = clientOrderRequestConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, ClientOrderRequest> record : records) {
String key = record.key();
ClientOrderRequest value = record.value();
System.out.println(key);
System.out.println(value);
}
}
}
生产者能够将字节数组发送到主题。但消费者无法将其反序列化。我有一个错误:
无法将B转换为com。瑞士报价。埃弗雷克斯。生成。阿夫罗。交易。客户端请求
当我生成消息时,客户端订单请求值主题已在模式注册表上注册。我知道我发送了一个字节数组,并且kafka/模式注册表期望一个Avro记录,但我希望它能够反序列化它。
如果不能使用简单的AvroKafkaSerializer,我应该实现自己的序列化程序吗?
什么是toByteArray?
您不需要手动序列化数据。这就是serializer类的全部要点。不清楚您为什么将此更改为之前的问题
ClientOrderRequest clientOrderRequest = createClientOrderRequest();
final ProducerRecord<String, ClientOrderRequest> producerOrderRequest = new ProducerRecord<>("client-order-request",
"ClientOrderRequest-" + calendar.getTimeInMillis(), clientOrderRequest);
producerRequest.send(producerOrderRequest);
producerRequest.flush();
当您使用KafkaAvroSerializer.class发送字节数组时,它会发送{"type":"bytes"}
的Avro对象,而不是您的自定义记录
如果确实要直接发送字节数组,可以使用ByteArraySerializer,但这会绕过架构注册表
问题内容: 我目前无法在KSTREAM APP 中 反序列化avro PRIMITIVE密钥 用avro模式编码的密钥(已在模式注册表中注册), 当我使用kafka-avro-console-consumer时,我可以看到密钥已正确反序列化 但是不可能使其在KSTREAM应用程序中工作 密钥的avro模式是主要的: 我已经关注了合流的文档 它对于该值工作得很好,但是该键将是一个字符串,该字符串包含
我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在
问题内容: 我正在尝试使用Gson反序列化从Web服务返回的json字符串 该结构将以返回。 哪里像 和ItemDTO就像 当我按如下方式调用代码时 对象内部的所有内容均为空 但是,如果我使用 并将它们从org.json罐子中逐段拉出,效果很好,并相应地填充了字段。 关于我在做什么错的任何想法吗?Gson非常快吗?还是我最好坚持自己已经做的工作? 谢谢大卫 问题答案: 原始问题中的示例Java
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。 我也尝试过在Scala中的spark-shell(没有ju