当前位置: 首页 > 知识库问答 >
问题:

Avro模式演化与Enum–反序列化崩溃

彭星津
2023-03-14

我在两个独立的AVCS模式文件中定义了记录的两个版本。我使用命名空间来区分版本SimpleV1.avsc

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v1",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}

示例JSON

{"name":"A","status":"ON"}

版本2只是有一个带有默认值的附加描述字段。

SimpleV2.avsc

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v2",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "description",
        "type" : "string",
        "default" : ""
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}

示例JSON

{"name":"B","description":"b","status":"ON"}

这两个模式都序列化为Java类。在我的示例中,我将测试向后兼容性。V1写入的记录应由使用V2的读取器读取。我希望看到插入默认值。只要我不使用枚举,这就可以工作。

public class EnumEvolutionExample {

    public static void main(String[] args) throws IOException {
        Schema schemaV1 = new org.apache.avro.Schema.Parser().parse(new File("./src/main/resources/SimpleV1.avsc"));
        //works as well
        //Schema schemaV1 = test.simple.v1.Simple.getClassSchema();
        Schema schemaV2 = new org.apache.avro.Schema.Parser().parse(new File("./src/main/resources/SimpleV2.avsc"));

        test.simple.v1.Simple simpleV1 = test.simple.v1.Simple.newBuilder()
                .setName("A")
                .setStatus(test.simple.v1.Status.ON)
                .build();
        
        
        SchemaPairCompatibility schemaCompatibility = SchemaCompatibility.checkReaderWriterCompatibility(
                schemaV2,
                schemaV1);
        //Checks that writing v1 and reading v2 schemas is compatible
        Assert.assertEquals(SchemaCompatibilityType.COMPATIBLE, schemaCompatibility.getType());
        
        byte[] binaryV1 = serealizeBinary(simpleV1);
        
        //Crashes with: AvroTypeException: Found test.simple.v1.Status, expecting test.simple.v2.Status
        test.simple.v2.Simple v2 = deSerealizeBinary(binaryV1, new test.simple.v2.Simple(), schemaV1);
        
    }
    
    public static byte[] serealizeBinary(SpecificRecord record) {
        DatumWriter<SpecificRecord> writer = new SpecificDatumWriter<>(record.getSchema());
        byte[] data = new byte[0];
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Encoder binaryEncoder = EncoderFactory.get()
            .binaryEncoder(stream, null);
        try {
            writer.write(record, binaryEncoder);
            binaryEncoder.flush();
            data = stream.toByteArray();
        } catch (IOException e) {
            System.out.println("Serialization error " + e.getMessage());
        }

        return data;
    }
    
    public static <T extends SpecificRecord> T deSerealizeBinary(byte[] data, T reuse, Schema writer) {
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        DatumReader<T> datumReader = new SpecificDatumReader<>(writer, reuse.getSchema());
        try {
            T datum = datumReader.read(null, decoder);
            return datum;
        } catch (IOException e) {
            System.out.println("Deserialization error" + e.getMessage());
        }
        return null;
    }

}

检查读者作家兼容性方法确认模式是兼容的。但是当我反序列化时,我得到了以下异常

Exception in thread "main" org.apache.avro.AvroTypeException: Found test.simple.v1.Status, expecting test.simple.v2.Status
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:309)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
    at org.apache.avro.io.ResolvingDecoder.readEnum(ResolvingDecoder.java:260)
    at org.apache.avro.generic.GenericDatumReader.readEnum(GenericDatumReader.java:267)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:136)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at test.EnumEvolutionExample.deSerealizeBinary(EnumEvolutionExample.java:70)
    at test.EnumEvolutionExample.main(EnumEvolutionExample.java:45)

我不明白为什么Avro认为它是v1.Status。名称空间不是编码的一部分。这是一个bug还是有人知道如何运行它?

共有2个答案

戚英逸
2023-03-14

找到了一个解决方法。我将枚举移动到“未版本化”命名空间。所以它在两个版本中都是一样的。但实际上对我来说它看起来像一个错误。转换记录不是问题,但枚举不起作用。两者都是Avro中的复杂类型。

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v1",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "namespace" : "test.model.unversioned",
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}
淳于宏伯
2023-03-14

尝试添加@别名。

例如:

v1

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v1",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}

v2

{
  "type" : "record",
  "name" : "Simple",
  "namespace" : "test.simple.v2",
  "fields" : [ 
      {
        "name" : "name",
        "type" : "string"
      }, 
      {
        "name" : "description",
        "type" : "string",
        "default" : ""
      }, 
      {
        "name" : "status",
        "type" : {
          "type" : "enum",
          "name" : "Status",
          "aliases" : [ "test.simple.v1.Status" ]
          "symbols" : [ "ON", "OFF" ]
        },
        "default" : "ON"
      }
   ]
}
 类似资料:
  • 我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。

  • 我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?

  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 我一直在尝试将avro通用记录进行串行化,并生成avro串行化的数据发送给Kafka。主要目标是不使用合并模式注册表存储模式,而是将模式与序列化数据一起发送,以便从kafka主题中提取并反序列化。 下面是AvroSerializer用于生成Avro数据的部分。 Kafka中出现的序列化数据如下所示。

  • 我尝试使用avro-python3(向后兼容性)重新创建一个模式演变案例。 我有两个模式: 第二个模式没有字段,但有两个附加字段:和。 根据avro模式演化规则,如果我用schema_v1写入avro记录: …我可以使用schema_v2读取它,前提是不存在字段有默认值 但我得到了以下错误: 我知道这在Java中有效。这是一个视频课程的示例。有没有办法让它在python中工作?

  • 我有2个模式: 模式1(旧模式): 我用一个布尔字段更新了模式: 方案2(新方案): kafka主题包含属于旧模式(schema1)的消息。更新使用者模式后,即使更新字段中存在默认值,使用者也无法反序列化旧模式消息。 根据Avro文档: 阿夫罗博士 我得到以下错误,而反序列化: 当记录缺少字段时,为什么默认值未应用于使用者?任何帮助都非常感谢。提前致谢!