当前位置: 首页 > 知识库问答 >
问题:

Avro:将联合模式转换为记录模式

屠钊
2023-03-14

我为简单的类层次结构自动生成了Avro模式:

trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T

看起来是这样的:

 [{
  "name": "org.example.schema.raw.A",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "value",
    "type": "int"
  }]
}, {
  "name": "org.example.schema.raw.B",
  "type": "record",
  "fields": [{
    "name": "name",
    "type": "string"
  }, {
    "name": "history",
    "type": {
      "type": "array",
      "items": "string"
    }
  }]
}]

此模式适用于使用普通Avro API将数据从JSON读取到GenericRecord。我尝试实现的下一件事是使用AvroParquetWriter将所有此类GenericRecord对象存储到单个拼花文件中:

val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()

此代码在第一行失败

java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)

难怪AvroSchemaConverter包含以下代码行:

if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
      throw new IllegalArgumentException("Avro schema must be a record.");
}

我的模式类型是UNION。非常感谢将此UNION模式映射(合并)到RECORD模式或任何其他建议的任何想法/帮助。

解决方案

1)使用union scheme将JSON从输入读取到< code>GenericRecord中2)获取或创建类型的< code>AvroParquetWriter:

val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)

3) 将记录写入文件:

writer.write(record)

4)当所有数据从输入中消耗时关闭所有写入器:

writers.values.foreach(_.close())

5)将目录中的数据加载到Spark SQL数据框架中:

sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")

6)数据可以按原样处理或存储——它已经被Spark合并:

df.write.format("parquet").save("merged.parquet")

共有2个答案

欧阳智志
2023-03-14

您可以用case类包装您的trait,它是一个记录。

案例类Reord[K](键:K,值:T)

韩经武
2023-03-14

要回答您关于合并的问题:您可以使用以下< code>case类Merged(name: String,value: Option[Int],history:Option[Array[String]])并使用其生成的模式来写入数据。一般来说,如果您有一个对A和B都向前兼容的模式,它将正确地写这两者。

或者,既然如你所说,avro不会让你把所有数据都写在同一个文件里,也许你可以把输出按类型拆分,每种类型写一个文件?我知道在我能想到的大多数用例中,我可能会这样做,但可能对你不适用。

 类似资料:
  • 有一个网站这样做,但我想要一个图书馆或CLI。 谢了!

  • 如何使用用户指定的架构将dataframe转换为Avro格式?

  • Avro SpecificRecord(即生成的java类)是否与模式演化兼容?一、 e.如果我有一个Avro消息源(在我的例子中是Kafka),并且我想将这些消息反序列化为特定的记录,那么这样做是否安全? 我所看到的: 将字段添加到架构的末尾效果很好-可以将ok反序列化为specificrecord 在中间添加字段不会破坏现有客户机 即使消息兼容,这也是一个问题。 如果我能找到新的模式(例如使用

  • 我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(

  • 大家好,我需要为下面的示例创建AVRO模式; 当我按照建议更改所有者对象时,avro-tool返回错误。 ]} 测试:

  • 对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?