我有两个Avro模式V1和V2,在spark中读取如下:
import org.apache.spark.sql.avro.functions._
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/V1.avsc")))
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
val output = df
.select(from_avro($"value", jsonFormatSchema) as $"avroFields")
V1有两个字段“一”和“二”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
}
]
}
V2 与新字段:“三”
{
"name": "test",
"namespace": "foo.bar",
"type": "record",
"fields": [
{
"name": "one",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "two",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "three",
"type": [
"null",
"string"
],
"default": null
}
]
}
场景:编写器使用 V1 进行写入,读取器使用 V2 对 avro 记录进行解码。我的期望是看到字段3填充了默认值,即null。但是我在spark工作中遇到了以下异常。
我是不是错过了什么?我的理解是avro支持向后兼容。
Exception in thread "main" java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
您总是必须使用写入的确切模式来解码Avro。这是因为Avro使用未标记的数据来更紧凑,并且要求编写器模式在解码时出现。
因此,当您使用V2模式阅读时,它会查找字段<code>三个<code>(或者该字段的空标记),并抛出一个错误。
您可以做的是将解码数据(用writers模式解码)映射到阅读器模式,Java有一个API:<code>SpecificDatumReader(模式编写器,模式读取器)</code>。
协议缓冲区或节俭做你想要的,是标记的格式。Avro 希望架构与数据一起传输,例如在 Avro 文件中。
我想使用查找从一个集合中获取一些数据并将其放入另一个集合中。 在localfield或foreignfield中写什么都不重要,因为它从player_game_stats中获取所有数据并将其插入player集合中的每个文档中。我想检查localfield和foreignField是否相等,但lookup不检查这一点。我对mongodb使用NoSqlBooster
最后是持久性上下文配置: 我很感谢你的帮助。
我有一个avro模式定义,比如- 上线后,我们增加了另一个领域- CusterType被定义为null, string。即使在向合流注册表注册架构时,我们也会收到错误-正在注册的架构与早期架构不兼容。 如果有什么原因,请告诉我们。我们通过显式地将customerType默认为null来解决这个问题, Union{null, string}CusterType=null; 但不知何故,我觉得这不是必
我们的系统由多个微服务组成,这些微服务发出并使用以avro格式编码的事件(参见底部的模式)。一个特定的用例如下:服务A在主题T1上发出一个事件(类型为InvoiceEvents),服务B和C(不同的开发团队)在T1上消费。例如,服务B是税务团队的一部分,而服务C是产品履行团队的一部分。 我本以为以下是真的(但似乎不是真的): 通过添加新的联合类型(即为字段“payload”创建的InvoiceCr
我正在使用wedriveri o 4.5: 我需要等到某个元素存在,如果它不存在,处理这种情况。 例如: 但如果页面上不存在元素,webdriver会将我的测试标记为失败,并显示消息:“超时10000毫秒。”。尝试减少运行时间或增加测试规格的超时时间(http://webdriver.io/guide/testrunner/timeouts.html); 如果回复promise,确保其得到解决 >
ngrok承诺有关其接口的兼容性和稳定性,以便您可以自信地构建集成顶部,知道在升级到较新版本时期望的更改。 兼容性承诺 Point Release (2.0.0 -> 2.0.1) - ngrok承诺在点发布之间没有突破性的变化 Minor Version Change (2.0 -> 2.1) - ngrok可能会进行小的更改,打破兼容性的次要版本更改。 ngrok承诺,任何破坏性更改将由一个版