我为简单的类层次结构自动生成了Avro模式:
trait T {def name: String}
case class A(name: String, value: Int) extends T
case class B(name: String, history: Array[String]) extends T
看起来是这样的:
[{
"name": "org.example.schema.raw.A",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "value",
"type": "int"
}]
}, {
"name": "org.example.schema.raw.B",
"type": "record",
"fields": [{
"name": "name",
"type": "string"
}, {
"name": "history",
"type": {
"type": "array",
"items": "string"
}
}]
}]
此模式适用于使用普通Avro API将数据从JSON读取到GenericRecord
。我尝试实现的下一件事是使用AvroParquetWriter
将所有此类GenericRecord
对象存储到单个拼花文件中:
val writer = new AvroParquetWriter[GenericRecord](file, schema)
writer.write(record)
writer.close()
此代码在第一行失败
java.lang.IllegalArgumentException: Avro schema must be a record.
at parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:96)
at parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:137)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:54)
at parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:86)
难怪AvroSchemaConverter包含以下代码行:
if (!avroSchema.getType().equals(Schema.Type.RECORD)) {
throw new IllegalArgumentException("Avro schema must be a record.");
}
我的模式类型是UNION。非常感谢将此UNION模式映射(合并)到RECORD模式或任何其他建议的任何想法/帮助。
解决方案
1)使用union scheme将JSON从输入读取到< code>GenericRecord中2)获取或创建类型的< code>AvroParquetWriter:
val writer = writers.getOrElseUpdate(record.getType, new AvroParquetWriter[GenericRecord](getPath(record.getType), record.getShema)
3) 将记录写入文件:
writer.write(record)
4)当所有数据从输入中消耗时关闭所有写入器:
writers.values.foreach(_.close())
5)将目录中的数据加载到Spark SQL数据框架中:
sqlContext.option("mergeSchema", "true").parquet("/tmp/data/")
6)数据可以按原样处理或存储——它已经被Spark合并:
df.write.format("parquet").save("merged.parquet")
您可以用case类包装您的trait,它是一个记录。
案例类Reord[K](键:K,值:T)
要回答您关于合并的问题:您可以使用以下< code>case类Merged(name: String,value: Option[Int],history:Option[Array[String]])并使用其生成的模式来写入数据。一般来说,如果您有一个对A和B都向前兼容的模式,它将正确地写这两者。
或者,既然如你所说,avro不会让你把所有数据都写在同一个文件里,也许你可以把输出按类型拆分,每种类型写一个文件?我知道在我能想到的大多数用例中,我可能会这样做,但可能对你不适用。
有一个网站这样做,但我想要一个图书馆或CLI。 谢了!
如何使用用户指定的架构将dataframe转换为Avro格式?
Avro SpecificRecord(即生成的java类)是否与模式演化兼容?一、 e.如果我有一个Avro消息源(在我的例子中是Kafka),并且我想将这些消息反序列化为特定的记录,那么这样做是否安全? 我所看到的: 将字段添加到架构的末尾效果很好-可以将ok反序列化为specificrecord 在中间添加字段不会破坏现有客户机 即使消息兼容,这也是一个问题。 如果我能找到新的模式(例如使用
我试图构建一个系统,从Kafka读取json数据(无模式),将其转换为avro并将其推送到s3。 我已经能够使用kstream和KSQL实现json到avro的转换。我想知道使用Kafka Connect的自定义转换是否可以实现同样的效果。 这是我迄今为止所尝试的: 其中avro_schema是avsc文件中指定的架构名称。 我不确定这是否是正确的方法,但我面临的问题是,当调用newRecord(
大家好,我需要为下面的示例创建AVRO模式; 当我按照建议更改所有者对象时,avro-tool返回错误。 ]} 测试:
对我来说,这似乎是如此基本和简单,但我在网上没有找到任何东西。 有许多关于如何获得JSON模式的示例,也有许多如何从以下对象创建mongoose模式的示例: 如果我试图直接放置JSON模式,我会得到一个错误 但是我在网上找不到从一种类型到另一种类型的转移。 以前有人有这个问题吗?