当前位置: 首页 > 知识库问答 >
问题:

无法解码Kafka中Avro消费者端的自定义对象

阎裕
2023-03-14

我有一个具体的类,我正在序列化字节数组,以发送到一个Kafka主题。对于序列化,我使用ReflectDatumWriter。在发送bytes[]之前,我在查看了一些在线教程之后,将模式ID放在前4个字节中。

./bin/kafka-avro-console-consumer--bootstrap-server 0:9092--property schema.stry.url=http://0:8081--property print.key=true--主题测试

"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000" 
"1" "\u0000"
    MParams ddb = new MParams();
    ddb.setKey("ss");

    for (int i = 0; i < 10; i++) {
        ProducerRecord record = new ProducerRecord<String, byte[]>("Test", "1", build(1, Producer.serialize(ddb)));
        Future resp = kafkaFullAckProducer.send(record);

        System.out.println("Success" + resp.get());
    }
}

public static <T> byte[] serialize(T data) {
    Schema schema = null;
    if (data == null) {
        throw new RuntimeException("Data cannot be null in AvroByteSerializer");
    }
    try {
        schema = ReflectData.get().getSchema(data.getClass());
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
        writer.write(data, new EncoderFactory().directBinaryEncoder(out, null));
        byte[] bytes = out.toByteArray();
        return bytes;
    } catch (java.io.IOException e) {
        throw new RuntimeException("Error serializing Avro message", e);
    }
}

public static byte[] build(Integer schemaId, byte[] data) {
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    out.write(0);
    try {
        out.write(ByteBuffer.allocate(4).putInt(schemaId).array());
        out.write(data);
        byte[] bytes = out.toByteArray();
        out.close();
        return bytes;
    } catch (IOException e) {
        throw new RuntimeException("Exception in avro record builder , msg :" + e.getMessage());
    }



@Data
public class MParams extends MetricParams{

    // POJO version

    @Nullable
    private String key;


}

@JsonTypeInfo(use = Id.CLASS, include = As.PROPERTY, property = "@c")
@Union(value= {MParams.class})
public abstract class MetricParams {

}

工作序列化程序段

public byte[] serialize(String topic, T record) {
        Schema schema;
        int id;
        try {
            schema = ReflectData.get().getSchema(record.getClass());
            id = client.register(topic + "-value", schema);
        } catch (IOException | RestClientException e) {
            throw new RuntimeException(e);
        }
        return serializeImpl(id, schema, record);
    }

    protected byte[] serializeImpl(int id, Schema schema, T object) throws SerializationException {
        if (object == null) {
            return null;
        }
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            out.write(0x0);
            out.write(ByteBuffer.allocate(4).putInt(id).array());

            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(out, null);
            DatumWriter<T> writer = new ReflectDatumWriter<T>(schema);
            writer.write(object, encoder);
            encoder.flush();
            byte[] bytes = out.toByteArray();
            out.close();
            return bytes;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error serializing Avro message", e);
        }
    }

反序列化程序:

protected T deserialize(Schema schema, byte[] payload) throws SerializationException {
        // Even if the caller requests schema & version, if the payload is null
        // cannot include it. The caller must handle
        // this case.
        if (payload == null) {
            return null;
        }

        int id = -1;
        try {
            ByteBuffer buffer = getByteBuffer(payload);
            id = buffer.getInt();
            int length = buffer.limit() - 1 - 4;

            int start = buffer.position() + buffer.arrayOffset();
            DatumReader<T> reader = new ReflectDatumReader<T>(schema);
            T res = reader.read(null, new DecoderFactory().binaryDecoder(buffer.array(), start, length, null));
            return res;
        } catch (IOException | RuntimeException e) {
            throw new SerializationException("Error deserializing Avro message for id " + id, e);
        }
    }

    private ByteBuffer getByteBuffer(byte[] payload) {
        ByteBuffer buffer = ByteBuffer.wrap(payload);
        if (buffer.get() != 0x0) {
            throw new SerializationException("Unknown magic byte!");
        }
        return buffer;
    }

共有1个答案

松旭
2023-03-14

对于序列化,我使用ReflectDatumWriter。在发送bytes[]之前,我将模式ID放入带有模式ID的前4个字节中

不清楚您为什么试图绕过KafKaavroSerializer类的默认行为。(在您的情况下,请从该示例中删除schema.parser,并使用您的反射记录类型,而不是genericrecord)

您可以将您的具体类作为生产者的第二种类型,只要它实现了基本Avro类,它就应该正确地序列化(意思是正确计算ID,而不是您创建的某个数字,并转换为字节),注册到注册表,然后发送到Kafka

最重要的是,模式ID在注册表中不一定是1,通过设置该ID,控制台使用者可能试图不正确地反序列化您的消息,从而导致错误的输出

换句话说,试着

ProducerRecord<String, MParams> record = new ProducerRecord<>(...)
 类似资料:
  • 我一直在尝试为Spring引导Kafka骆驼Avro消费者寻找示例代码,但没有运气。我在以下URL找到了Spring Camel Kafka消费者和生产者示例: https://thysmichels.com/2015/09/04/apache-camel-kafka-spring-integration/ 我的具体问题是,一旦我的bean从Avro模式创建,并且我有了POJO类,我如何将上面的c

  • 一、消费者和消费者群组 在 Kafka 中,消费者通常是消费者群组的一部分,多个消费者群组共同读取同一个主题时,彼此之间互不影响。Kafka 之所以要引入消费者群组这个概念是因为 Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS ,或者进行耗时的计算,在这些情况下,单个消费者无法跟上数据生成的速度。此时可以增加更多的消费者,让它们分担负载,分别处理部分分区的消息,这就是

  • 我正在将Spark Scala应用程序Kafka API升级到0.10版。我曾经创建自定义方法来反序列化字节字符串格式的消息。 我已经意识到有一种方法可以将StringDeserializer或ByteArrayDeserializer作为参数传递给键或值。 但是,我找不到有关如何创建自定义Avro模式反序列化器的任何信息,以便我的kafkaStream在创建DirectStream和使用Kafk

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka