我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。
让我来解释一下我想要达到的目标:
在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。
在web上找到的大多数示例中,Avro记录都是使用已知的模式(或数据类型)生成的,但在这种情况下我不知道序列化数据时将使用哪种模式。因此我动态地获取消息类型(通过CXF拦截器)并以这种方式序列化:
// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);
EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);
// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);
ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();
KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);
这样,我可以发送关于我的主题的数据,但我不能从传入的消息中获取模式。
有没有办法:
当数据类型未知时,在Kafka主题上发送Avro记录的“最佳”实践是什么?
也许我在阅读Avro文档时错过了什么,没有按预期使用它。
谢谢你的帮助...
发送到Kafka主题的消息应该对模式和Avro记录进行编码。如果在每条消息中发送模式的开销太大,则改为发送该模式的标识符。消息使用者可以使用标识符从架构注册表检索完整的架构定义。例如,序列化Kafka消息的代码将模式标识符写入消息的前几个字节:
ByteArrayOutputStream out = new ByteArrayOutputStream();
schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
writer = new SpecificDatumWriter<Object>(schema);
} else {
writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();
byte[] bytes = out.toByteArray();
out.close();
return bytes;
目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc 示例JSON 版本2只是有一个带有默认值的附加描述字段。 SimpleV2.avsc 示例JSON 这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。 检查读者作家兼容性方法确认模式是
主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示
我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在