当前位置: 首页 > 知识库问答 >
问题:

字段顺序存在Avro模式Java deepcopy问题

徐皓君
2023-03-14

我目前正在寻找意外行为的解决方案,这些意外行为是在使用Java处理特定的AVRO模式演化场景时出现的,我还在消费者中做了一次深度复制,将GenericRecord类解析为从AVRO模式生成的特定类。

为了解释正在发生的事情,我将使用一个简化的模式示例:

{
  "name":"SimpleEvent",
  "type":"record",
  "namespace":"com.simple.schemas",
  "fields":[
     {
        "name":"firstfield",
        "type":"string",
        "default":""
     },
     {
        "name":"secondfield",
        "type":"string",
        "default":""
     },
     {
        "name":"thirdfield",
        "type":"string",
        "default":""
     }
  ]
}

这只是一个简单的模式,有三个字符串字段,都是可选的,因为它们都有默认值。假设在某个时候我想添加另一个字符串字段,并删除一个字段,因为不再需要它,您最终会得到这样的结果:

{
  "name":"SimpleEvent",
  "type":"record",
  "namespace":"com.simple.schemas",
  "fields":[
     {
        "name":"firstfield",
        "type":"string",
        "default":""
     },
     {
        "name":"secondfield",
        "type":"string",
        "default":""
     },
     {
        "name":"newfield",
        "type":"string",
        "default":""
     }
  ]
}

这不应该破坏根据模式演化规则所做的更改。然而,当生产者开始使用较新的模式生成事件时,下游消费者中会发生一些奇怪的事情。

事实证明,生成的Java类(我使用Gradle avro插件生成类,但maven插件和avro工具命令行代码生成产生相同的输出)只查看字段顺序,而不是根据它们的名称映射字段。

这意味着字段“newfield”的值由使用旧版本的架构读取数据的下游使用者映射到“第三字段”。

我发现了一些基于名称执行手动映射的工作,但是,这对嵌套对象不起作用。

通过一些局部实验,我还发现了另一种可以正确解决架构差异的方法:

    Schema readerSchema = SimpleEvent.getClassSchema();
    Schema writerSchema = request.getSchema();

    if (readerSchema.equals(writerSchema)){
        return (SimpleEvent)SpecificData.get().deepCopy(writerSchema, request);
    }

    DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(writerSchema);
    BinaryEncoder encoder = null;
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    encoder = EncoderFactory.get().binaryEncoder(stream, encoder);

    writer.write(request, encoder);
    encoder.flush();

    byte[] recordBytes = stream.toByteArray();

    Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);

    SpecificDatumReader<SimpleEvent> specificDatumReader = new SpecificDatumReader(writerSchema, readerSchema);
    SimpleEvent result = specificDatumReader.read(null, decoder);
    return result;

但是,这似乎是一种相当浪费/不优雅的方法,因为您首先必须将GenericRecord转换为byteArray,然后使用IcitaryDatumReader再次读取它。

deepcopy和datumreader类之间的区别在于,datumReader类似乎可以适应编写器模式不同于读取器模式的情况。

我觉得应该/可以有一个更好、更优雅的方法来处理这个问题。我真的很感激任何帮助/提示。

感谢提前:)

奥斯卡

共有1个答案

董昕
2023-03-14

在进一步挖掘和研究了我们之前在侦听器中使用的Kafkavrodeserializer之后,我注意到AbstractKafkarvodeserialiser有一个反序列化函数,可以在读取器模式中传递。这看起来不错,但它的工作!

package com.oskar.generic.consumer.demo;

import com.simple.schemas;

import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

public class SimpleEventDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {

private boolean isKey;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
    this.isKey = isKey;
    configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public Object deserialize(String s, byte[] bytes) {
    return super.deserialize(bytes, SimpleEvent.getClassSchema());
}

@Override
public void close() {

}
}

然后在消费工厂中使用,如下所示:

@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29095");
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "one");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SimpleEventDeserializer.class);

    return new DefaultKafkaConsumerFactory<>(props);
}

侦听器代码本身如下所示:

 @KafkaListener(topics = "my-topic")
public GenericRecord listen(@Payload GenericRecord request, @Headers MessageHeaders headers) throws IOException {
    SimpleEvent event = (SimpleEvent) SpecificData.get().deepCopy(request.getSchema(), request);
    return request;
}
 类似资料:
  • 我们定义了一个avro模式,并使用代码生成器在Java和 这是我们用来从avro模式生成java类的代码生成器 https://plugins.gradle.org/plugin/com.commercehub.gradle.plugin.avro 和使用 现在,我尝试使用c#生产者将消息发布到Kafka上,并尝试使用java消费者消费它。这适用于大多数数据类型,但以下合同“OptionalCom

  • 我有一个简单的案例类: 我正在添加字段“name” java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumer

  • 我正在使用Java将JSON转换为Avro,并使用Google DataFlow将其存储到GCS。Avro模式是使用SchemaBuilder在运行时创建的。 我在模式中定义的字段之一是可选的LONG字段,它是这样定义的: 现在,当我使用上面的模式创建GenericRecord时,并且“key1”未设置,当将结果GenericRecord放在我的DoFn的上下文中时:我得到以下错误: 异常在线程"

  • 使用以下定义的Avro模式和测试代码,在考虑Avro模式演变以及如何存储Avro数据的第一个版本并随后使用模式的第二个版本检索时,我有几个问题。在我的示例中,表示第一个版本,表示第二个版本,其中我们添加了属性。 < li >有没有办法在Java中将Avro模式和二进制编码数据存储为字节数组?我们希望将Avro对象存储到DynamoDB中,并且希望将Avro数据存储为一个blob,模式存储在它旁边(

  • 我试图在Avro模式中创建Union字段,并用它发送相应的JSON消息,但要有一个字段-。 https://avro.apache.org/docs/1.8.2/spec.html#工会 具有相应JSON数据的最简单联合类型(avro模式)的示例是什么?(尝试制作不含NULL/空数据的示例和含NULL/空数据的示例)。

  • 问题内容: 我有一些带有名称字段的文档。我正在使用名称字段的分析版本进行搜索和排序。排序是在一个级别上进行的,即名称首先是按字母顺序排序的。但是在字母列表中,名称是按字典顺序而不是按字母顺序排序的。这是我使用的映射: 谁能提供相同的解决方案? 问题答案: 深入研究Elasticsearch文档,我偶然发现了这一点: 排序和排序规则 不区分大小写的排序 假设我们有三个用户文档,其名称字段分别包含Bo