使用以下定义的Avro模式和测试代码,在考虑Avro模式演变以及如何存储Avro数据的第一个版本并随后使用模式的第二个版本检索时,我有几个问题。在我的示例中,Person.avsc
表示第一个版本,PersonWithMiddleName.avsc
表示第二个版本,其中我们添加了middleName
属性。
Java测试代码
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SchemaEvolutionTest {
Logger log = LoggerFactory.getLogger(this.getClass());
@Test
public void createAndReadPerson() {
// Create the Person using the Person schema
var person = new Person();
person.setFirstName("Joe");
person.setLastName("Cool");
log.info("Person has been created: {}", person);
SpecificDatumWriter<Person> personSpecificDatumWriter =
new SpecificDatumWriter<Person>(Person.class);
DataFileWriter<Person> dataFileWriter = new DataFileWriter<Person>(personSpecificDatumWriter);
try {
dataFileWriter.create(person.getSchema(), new File("person.avro"));
dataFileWriter.append(person);
dataFileWriter.close();
} catch (IOException e) {
Assertions.fail();
}
log.info("Person has been written to an Avro file");
// ******************************************************************************************************
// Next, read as Person from the Avro file using the Person schema
DatumReader<Person> personDatumReader =
new SpecificDatumReader<Person>(Person.getClassSchema());
var personAvroFile = new File("person.avro");
DataFileReader<Person> personDataFileReader = null;
try {
personDataFileReader = new DataFileReader<Person>(personAvroFile, personDatumReader);
} catch (IOException e1) {
Assertions.fail();
}
Person personReadFromFile = null;
while (personDataFileReader.hasNext()) {
// Reuse object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
try {
personReadFromFile = personDataFileReader.next(person);
} catch (IOException e) {
Assertions.fail();
}
}
log.info("Person read from the file: {}", personReadFromFile.toString());
// ******************************************************************************************************
// Read the Person from the Person file as PersonWithMiddleName using only the
// PersonWithMiddleName schema
DatumReader<PersonWithMiddleName> personWithMiddleNameDatumReader =
new SpecificDatumReader<PersonWithMiddleName>(PersonWithMiddleName.getClassSchema());
DataFileReader<PersonWithMiddleName> personWithMiddleNameDataFileReader = null;
try {
personWithMiddleNameDataFileReader =
new DataFileReader<PersonWithMiddleName>(personAvroFile, personWithMiddleNameDatumReader);
} catch (IOException e1) {
Assertions.fail();
}
PersonWithMiddleName personWithMiddleName = null;
while (personWithMiddleNameDataFileReader.hasNext()) {
// Reuse object by passing it to next(). This saves us from
// allocating and garbage collecting many objects for files with
// many items.
try {
personWithMiddleName = personWithMiddleNameDataFileReader.next(personWithMiddleName);
} catch (IOException e) {
Assertions.fail();
}
}
log.info(
"Now a PersonWithMiddleName has been read from the file that was written as a Person: {}",
personWithMiddleName.toString());
// ******************************************************************************************************
// Serialize the Person to a byte array
byte[] personByteArray = new byte[0];
ByteArrayOutputStream personByteArrayOutputStream = new ByteArrayOutputStream();
Encoder encoder = null;
try {
encoder = EncoderFactory.get().binaryEncoder(personByteArrayOutputStream, null);
personSpecificDatumWriter.write(person, encoder);
encoder.flush();
personByteArray = personByteArrayOutputStream.toByteArray();
} catch (IOException e) {
log.error("Serialization error:" + e.getMessage());
}
log.info("The Person is now serialized to a byte array: {}", new String(personByteArray));
// ******************************************************************************************************
// Deserialize the Person byte array into a Person object
BinaryDecoder binaryDecoder = null;
Person decodedPerson = null;
try {
binaryDecoder = DecoderFactory.get().binaryDecoder(personByteArray, null);
decodedPerson = personDatumReader.read(null, binaryDecoder);
} catch (IOException e) {
log.error("Deserialization error:" + e.getMessage());
}
log.info("Decoded Person from byte array {}", decodedPerson.toString());
// ******************************************************************************************************
// Deserialize the Person byte array into a PesonWithMiddleName object
PersonWithMiddleName decodedPersonWithMiddleName = null;
try {
binaryDecoder = DecoderFactory.get().binaryDecoder(personByteArray, null);
decodedPersonWithMiddleName = personWithMiddleNameDatumReader.read(null, binaryDecoder);
} catch (IOException e) {
log.error("Deserialization error:" + e.getMessage());
}
log.info(
"Decoded PersonWithMiddleName from byte array {}", decodedPersonWithMiddleName.toString());
// ******************************************************************************************************
// Serialize the Person to JSON
byte[] jsonByteArray = new byte[0];
personByteArrayOutputStream = new ByteArrayOutputStream();
Encoder jsonEncoder = null;
try {
jsonEncoder =
EncoderFactory.get().jsonEncoder(Person.getClassSchema(), personByteArrayOutputStream);
personSpecificDatumWriter.write(person, jsonEncoder);
jsonEncoder.flush();
jsonByteArray = personByteArrayOutputStream.toByteArray();
} catch (IOException e) {
log.error("Serialization error:" + e.getMessage());
}
log.info("The Person is now serialized to JSON: {}", new String(jsonByteArray));
// ******************************************************************************************************
// Deserialize the Person JSON into a Person object
JsonDecoder jsonDecoder = null;
try {
jsonDecoder =
DecoderFactory.get().jsonDecoder(Person.getClassSchema(), new String(jsonByteArray));
decodedPerson = personDatumReader.read(null, jsonDecoder);
} catch (IOException e) {
log.error("Deserialization error:" + e.getMessage());
}
log.info("Decoded Person from JSON: {}", decodedPerson.toString());
// ******************************************************************************************************
// Deserialize the Person JSON into a PersonWithMiddleName object
try {
jsonDecoder =
DecoderFactory.get()
.jsonDecoder(PersonWithMiddleName.getClassSchema(), new String(jsonByteArray));
decodedPersonWithMiddleName = personWithMiddleNameDatumReader.read(null, jsonDecoder);
} catch (AvroTypeException ae) {
// Do nothing. We expect this since JSON didn't serialize anything out.
log.error(
"An AvroTypeException occurred trying to deserialize Person JSON back into a PersonWithMiddleName. Here's the exception: {}",ae.getMessage());
} catch (Exception e) {
log.error("Deserialization error:" + e.getMessage());
}
}
}
Person.avsc
{
"type": "record",
"namespace": "org.acme.avro_testing",
"name": "Person",
"fields": [
{
"name": "firstName",
"type": ["null", "string"],
"default": null
},
{
"name": "lastName",
"type": ["null", "string"],
"default": null
}
]
}
PersonWithMiddleName.avsc
{
"type": "record",
"namespace": "org.acme.avro_testing",
"name": "PersonWithMiddleName",
"fields": [
{
"name": "firstName",
"type": ["null", "string"],
"default": null
},
{
"name": "middleName",
"type": ["null", "string"],
"default": null
},
{
"name": "lastName",
"type": ["null", "string"],
"default": null
}
]
}
测试输出
Person has been created: {"firstName": "Joe", "lastName": "Cool"}
Person has been written to an Avro file
Person read from the file: {"firstName": "Joe", "lastName": "Cool"}
Now a PersonWithMiddleName has been read from the file that was written as a Person: {"firstName": "Joe", "middleName": null, "lastName": "Cool"}
The Person is now serialized to a byte array: JoeCool
Decoded Person from byte array {"firstName": "Joe", "lastName": "Cool"}
Decoded PersonWithMiddleName from byte array {"firstName": "Joe", "middleName": null, "lastName": "Cool"}
The Person is now serialized to JSON: {"firstName":{"string":"Joe"},"lastName":{"string":"Cool"}}
Decoded Person from JSON: {"firstName": "Joe", "lastName": "Cool"}
An AvroTypeException occurred trying to deserialize Person JSON back into a PersonWithMiddleName. Here's the exception: Expected field name not found: middleName
person.avro
Objavro.schema�{"type":"record","name":"Person","namespace":"org.acme.avro_testing","fields":[{"name":"firstName","type":["null","string"],"default":null},{"name":"lastName","type":["null","string"],"default":null}]}
对于问题一,我不是Java专家,但是在Python中,不是写入实际文件,而是有一个类似文件的对象的概念,它与文件具有相同的接口,但只是写入字节缓冲区。例如,这样做:
file = open(file_name, "wb")
# use avro library to write to file
file.close()
你可以这样做:
from io import BytesIO
bytes_interface = BytesIO()
# use bytes_interface the same way you would the previous "file" object
byte_output = bytes_interface.getvalue()
因此,最终的byte_output
将是通常写入文件的字节,但现在只是一个可以存储在任何地方的字节缓冲区。Java有这样的概念吗?或者,如果您绝对必须执行写入实际临时文件的过程,我假设Java有某种方法可以将文件内容读回字节缓冲区。
对于问题二,我认为您正在遇到此Jira票中提到的相同问题:https://issues.apache.org/jira/browse/AVRO-2890。目前,JSON 解码器需要编写数据的架构,并且不能使用与写入数据时使用的架构不同的架构进行任何类型的架构演进。
我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。
我正试图了解更多关于我们在Kafka主题中使用的Avro模式的信息,我对这一点相对来说比较陌生。 我想知道是否有一种方法可以在特定情况下发展模式。我们用一个不能为null的新字段或任何默认值来更新模式,因为这些新字段是标识符。解决这个问题的方法是创建新主题,但是有没有更好的方法来改进现有模式?
如果我使用模式版本1序列化一个对象,然后将模式更新为版本2(比如添加一个字段),那么在以后反序列化该对象时是否需要使用模式版本2?理想情况下,我只希望使用模式版本2,并使反序列化对象具有在对象最初序列化后添加到模式中的字段的默认值。 也许一些代码会更好地解释... 架构 1: 方案2: 使用通用非代码生成方法: 导致EOFException。使用会导致AvroTypeException。 我知道如
我有一个简单的案例类: 我正在添加字段“name” java.util.NoSuchelementException:scala.collection.immutable.stream$empt$.head(stream.scala:1104)在scala.collection.immutable.stream$empt$.head(stream.scala:1102)在test.consumer
我尝试使用avro-python3(向后兼容性)重新创建一个模式演变案例。 我有两个模式: 第二个模式没有字段,但有两个附加字段:和。 根据avro模式演化规则,如果我用schema_v1写入avro记录: …我可以使用schema_v2读取它,前提是不存在字段有默认值 但我得到了以下错误: 我知道这在Java中有效。这是一个视频课程的示例。有没有办法让它在python中工作?
SchemaRegistry 有助于与需要写入架构来解码接收到的消息的使用者共享用于对消息进行编码的写入 Avro 架构。另一个重要功能是协助架构演变。 假设生产者P定义了存储在逻辑模式S下的写Avro模式v1,消费者C1定义了读(投影)模式v1,另一个消费者C2定义了它自己的读(投影)模式。读取模式不共享,因为Avro在本地使用它们将消息从编写器模式转换到读取器模式。 想象一下没有任何突破性变化