当前位置: 首页 > 知识库问答 >
问题:

AVRO-反序列化POJOs

卜存
2023-03-14

我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。

让我来解释一下我想要达到的目标:

在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。

在web上找到的大多数示例中,Avro记录都是使用已知的模式(或数据类型)生成的,但在这种情况下我不知道序列化数据时将使用哪种模式。因此我动态地获取消息类型(通过CXF拦截器)并以这种方式序列化:

// get unmarshaled POJO
MessageContentsList objs = MessageContentsList.getContentsList(message);
Object obj = objs.get(0);

EncoderFactory factory = EncoderFactory.get();
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = factory.directBinaryEncoder(out, null);

// getting schema from class name (first approach)
String scName = obj.getClass().getSimpleName();
InputStream avroRes = this.getClass().getClassLoader().getResourceAsStream(scName);
Schema schema = new Schema.Parser().parse(avroRes);

ReflectDatumWriter<Object> writer = new ReflectDatumWriter<Object>(schema);
writer.write(obj, encoder);
encoder.flush();
out.close();

KeyedMessage< String, byte[]> kMessage = new KeyedMessage<String, byte[]>("mytopic", out.toByteArray());
producer.send(kMessage);

这样,我可以发送关于我的主题的数据,但我不能从传入的消息中获取模式。

有没有办法:

  • 从Kafka主题中读取消息并获取用于序列化的架构?
  • 在消耗和反序列化时将泛型记录映射到POJO?

当数据类型未知时,在Kafka主题上发送Avro记录的“最佳”实践是什么?

也许我在阅读Avro文档时错过了什么,没有按预期使用它。

谢谢你的帮助...

共有1个答案

魏安宁
2023-03-14

发送到Kafka主题的消息应该对模式和Avro记录进行编码。如果在每条消息中发送模式的开销太大,则改为发送该模式的标识符。消息使用者可以使用标识符从架构注册表检索完整的架构定义。例如,序列化Kafka消息的代码将模式标识符写入消息的前几个字节:

ByteArrayOutputStream out = new ByteArrayOutputStream();

schema = getSchema(object);
int id = schemaRegistry.register(subject, schema);
out.write(MAGIC_BYTE);
out.write(ByteBuffer.allocate(idSize).putInt(id).array());

BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<Object> writer;
if (object instanceof SpecificRecord) {
  writer = new SpecificDatumWriter<Object>(schema);
} else {
  writer = new GenericDatumWriter<Object>(schema);
}
writer.write(object, encoder);
encoder.flush();

byte[] bytes = out.toByteArray();
out.close();
return bytes;
 类似资料:
  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc 示例JSON 版本2只是有一个带有默认值的附加描述字段。 SimpleV2.avsc 示例JSON 这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。 检查读者作家兼容性方法确认模式是

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我试图将Kafka中的Avro消息反序列化为从Avro模式生成的POJO。我正在使用Kafkaavroderializer进行此转换。 我可以在