当前位置: 首页 > 知识库问答 >
问题:

无合流模式注册表的反序列化:Avro序列化数据不包含Avro模式

艾泰
2023-03-14

我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。

下面是AvroSerializer用于生成Avro数据的部分。


  @Override
  public byte[] serialize(String topic, T data) {
    try {
      byte[] result = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
            EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);
        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close(); 
        result = byteArrayOutputStream.toByteArray();


      }

      return result;
    } catch (IOException ex) {
      throw new SerializationException(
          "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

Kafka中出现的序列化数据如下所示。

  @Override
  public T deserialize(String topic, byte[] data) {

    GenericRecord person = null;

    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));

        Schema schema = Schema.parse(schemaString);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);

 
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);
        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);
      }

      return result;

    } catch (Exception ex) {
      throw new SerializationException(
          "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }


public class KafkaAvroProducerUtil {


    public  Future<RecordMetadata> produceTokafka(GenericRecord object) throws IOException {


        Properties properties = new Properties();
        // normal producer
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "10");
        // avro part

        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", AvroSerializer.class.getName());
 


        String topic = "avro";

        Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties);
        ProducerRecord<String, GenericRecord> producerRecord = new ProducerRecord<String, GenericRecord>(
                topic, object
        );

        Future<RecordMetadata> data = producer.send(producerRecord, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println(metadata);
                } else {
                    exception.printStackTrace();
                }
            }
        });

        producer.flush();
        producer.close();


        return data;
    }

共有1个答案

巫英纵
2023-03-14

编辑:根据@onecricketeer和@chinhuang的建议,我用两种方法找到了答案。

下文对这两种方法进行了解释。但头方法的答案如下所示。

方法1:将模式与数据一起发送

  1. 它肯定是无效的
  2. 如果定界符出现在模式中,整个方法就会中断

方法2:在头中发送模式

在这里,不是将模式与数据一起发送,而是将模式发送到报头中。


  @Override
  public byte[] serialize(String topic, T data) {


   return null;
 
}

  public  byte[] serialize(String topic, Headers headers, T data) {


    try {

      byte[] result = null;
      byte[] payload = null;
      if (data != null) {
        LOGGER.debug("data='{}'", data);


        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        BinaryEncoder binaryEncoder =
                EncoderFactory.get().binaryEncoder(byteArrayOutputStream, null);

        byte[] schemaBytes = data.getSchema().toString().getBytes();

        DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(data.getSchema());
        datumWriter.setSchema(data.getSchema());
        datumWriter.write(data, binaryEncoder);

        binaryEncoder.flush();
        byteArrayOutputStream.close();


        result = byteArrayOutputStream.toByteArray();

        ByteArrayOutputStream outputStream2 = new ByteArrayOutputStream( );
        outputStream2.write( result );
        payload =  outputStream2.toByteArray( );

        headers.add("schema",schemaBytes);

      }

      LOGGER.info("headers added");
      return payload;
    } catch (IOException ex) {
      throw new SerializationException(
              "Can't serialize data='" + data + "' for topic='" + topic + "'", ex);
    }
  }

反序列化器方法如下所示。



  @Override
  public T deserialize(String topic, byte[] data) {

      return  null


   }
  public T deserialize(String topic, Headers headers, byte[] data) {


    try {
      T result = null;

      if (data != null) {
        LOGGER.debug("data='{}'", DatatypeConverter.printHexBinary(data));
        Header header = headers.lastHeader("schema");

        String schemaString2 = new String(header.value());
  
        Schema schema = Schema.parse(schemaString2);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
        DataFileReader<GenericRecord> dataFileReader = null;

        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);

        result = (T) datumReader.read(null, decoder);

        LOGGER.debug(result.getSchema().toString());
        LOGGER.debug("deserialized data='{}'", result);

      }

      return (T) result;

    } catch (Exception ex) {
      throw new SerializationException(
              "Can't deserialize data '" + Arrays.toString(data) + "' from topic '" + topic + "'", ex);
    }
  }

 类似资料:
  • 我是flink和kafka的新手。我正在尝试使用合流模式注册表对avro数据进行反序列化。我已经在ec2机器上安装了flink和kafka。此外,在运行代码之前已经创建了“测试”主题。 代码路径:https://gist.github.com/mandar2174/5dc13350b296abf127b92d0697c320f2 作为实现的一部分,该代码执行以下操作: 运行flink可执行jar的

  • 我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc 示例JSON 版本2只是有一个带有默认值的附加描述字段。 SimpleV2.avsc 示例JSON 这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。 检查读者作家兼容性方法确认模式是

  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我试图使用Confluent Kafka REST Proxy从我的一个主题中检索Avro格式的数据,但不幸的是,我得到了一个反序列化错误。我使用以下命令查询Kafka REST代理 我得到的回应是 Kafka Rest Proxy服务器上的日志如下: 数据是使用KafkaAvroSerializer生成的,模式在模式注册表中。还请注意,在CLI上使用avro console consumer可以

  • 我试图构建一个流,它获得一个Avro主题,做一个简单的转换,然后以Avro格式再次将其发送回另一个主题,我有点卡在最后的序列化部分。 我创建了一个AVRO模式,我正在导入它并使用它创建特定的AVRO Serde。但是我不知道如何使用这个serde将电影对象序列化回AVRO。 这是流类: 谢谢