我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。
下面是AvroSerializer用于生成Avro数据的部分。
@Override
public byte[] serialize(String topic, T data) {
try {
byte[] result = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.setSchema(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
}
return result;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
Kafka中出现的序列化数据如下所示。
@Override
public T deserialize(String topic, byte[] data) {
GenericRecord person = null;
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
Schema schema = Schema.parse(schemaString);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug(result.getSchema().toString());
LOGGER.debug("deserialized data='{}'", result);
}
return result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
public class KafkaAvroProducerUtil {
public Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {
Properties properties = new Properties();
// normal producer
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("acks", "all");
properties.setProperty("retries", "10");
// avro part
properties.setProperty("key.serializer", StringSerializer.class.getName());
properties.setProperty("value.serializer", AvroSerializer.class.getName());
String topic = "avro";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
topic, object
);
Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
System.out.println(metadata);
} else {
exception.printStackTrace();
}
}
});
producer.flush();
producer.close();
return data;
}
编辑:根据@onecricketeer和@chinhuang的建议,我用两种方法找到了答案。
下文对这两种方法进行了解释。但头方法的答案如下所示。
方法1:将模式与数据一起发送
方法2:在头中发送模式
在这里,不是将模式与数据一起发送,而是将模式发送到报头中。
@Override
public byte[] serialize(String topic, T data) {
return null;
}
public byte[] serialize(String topic, Headers headers, T data) {
try {
byte[] result = null;
byte[] payload = null;
if (data != null) {
LOGGER.debug("data='{}'", data);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
BinaryEncoder binaryEncoder =
EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
byte[] schemaBytes = data.getSchema().toString().getBytes();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
datumWriter.setSchema(data.getSchema());
datumWriter.write(data, binaryEncoder);
binaryEncoder.flush();
byteArrayOutputStream.close();
result = byteArrayOutputStream.toByteArray();
ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
outputStream2.write( result );
payload = outputStream2.toByteArray( );
headers.add("schema",schemaBytes);
}
LOGGER.info("headers added");
return payload;
} catch (IOException ex) {
throw new SerializationException(
"Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
}
}
反序列化器方法如下所示。
@Override
public T deserialize(String topic, byte[] data) {
return null
}
public T deserialize(String topic, Headers headers, byte[] data) {
try {
T result = null;
if (data != null) {
LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
Header header = headers.lastHeader("schema");
String schemaString2 = new String(header.value());
Schema schema = Schema.parse(schemaString2);
DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
DataFileReader<GenericRecord> dataFileReader = null;
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
result = (T) datumReader.read(null, decoder);
LOGGER.debug(result.getSchema().toString());
LOGGER.debug("deserialized data='{}'", result);
}
return (T) result;
} catch (Exception ex) {
throw new SerializationException(
"Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
}
}
我是flink和kafka的新手。我正在尝试使用合流模式注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和kafka。此外,在运行代码之前已经创建了“测试”主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,该代码执行以下操作: 运行flink可执行jar的
我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc 示例JSON 版本2只是有一个带有默认值的附加描述字段。 SimpleV2.avsc 示例JSON 这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。 检查读者作家兼容性方法确认模式是
我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式
我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以
我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢