{
"type" : "record",
"name" : "name1",
"namespace" : "com.data"
"fields" : [
{
"name" : "id",
"type" : "string"
},
{
"name" : "count",
"type" : "int"
},
{
"name" : "val_type",
"type" : {
"type" : "enum",
"name" : "ValType"
"symbols" : [ "s1", "s2" ]
}
}
]
}
当我尝试基于avro模式将avro数据写入s3时
DF数据类型:
root
|-- id: string (nullable = true)
|-- count: integer (nullable = true)
|-- val_type: string (nullable = true)
finaldf.write().option(“avroschema”,string.valueof(inAvroSchema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target_s3_path”);
我得到了错误:
User class threw exception: org.apache.spark.SparkException: Job aborted.
......
Caused by: org.apache.avro.AvroRuntimeException: **Not a union: "string"**
at org.apache.avro.Schema.getTypes(Schema.java:299)
at
org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)
我在附近挖了一些东西,发现了一些有趣的东西,
case class Name1(id: String, count: Int, val_type: String)
val schema = """{
| "type" : "record",
| "name" : "name1",
| "namespace" : "com.data",
| "fields" : [
| {
| "name" : "id",
| "type" : "string"
| },
| {
| "name" : "count",
| "type" : "int"
| },
| {
| "name" : "val_type",
| "type" : {
| "type" : "enum",
| "name" : "ValType",
| "symbols" : [ "s1", "s2" ]
| }
| }
| ]
|}""".stripMargin
val d = Seq(Name1("1",2,"s1"),Name1("1",3,"s2"),Name1("1",4,"s2"),Name1("11",2,"s1")).toDF()
d.write.mode(SaveMode.Overwrite).format("avro").option("avroSchema",schema).save("data/tes2/")
当我使用Spark2.4.x执行代码时,上面的代码失败了,但是当我使用新的Spark3.0.0运行相同的代码时,代码成功了,数据被成功写入。
val df = spark.read.format("avro").load("data/tes2/")
df.printSchema()
df.show(10)
root
|-- id: string (nullable = true)
|-- count: integer (nullable = true)
|-- val_type: string (nullable = true)
+---+-----+--------+
| id|count|val_type|
+---+-----+--------+
| 11| 2| s1|
| 1| 4| s2|
| 1| 3| s2|
| 1| 2| s1|
+---+-----+--------+
我想最好的办法是升级spark版本或更改avro模式定义。
我想从kafka主题中读取流数据,并以avro或parquet格式写入S3。数据流看起来像是json字符串,但我无法以avro或parquet格式转换并写入S3。 val stream=env.AddSource(myConsumerSource).AddSink(sink) 请帮忙,谢谢!
根据Avro文档中“default”属性的定义:“此字段的默认值,用于读取缺少此字段的实例(可选)。” 这意味着,如果缺少相应的字段,则采用默认值。 但事实似乎并非如此。考虑下面的<代码>学生<代码>模式: 模式表示:如果“年龄”字段丢失,则将值视为-1。“名称”字段也是如此。 现在,如果我尝试从以下JSON构建学生模型: 我得到一个例外: 看起来默认设置未按预期工作。那么,违约的作用到底是什么呢
显然它无法解码数据。有什么想法吗?
我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。
根据Avro模式规范(适用于接头):https://avro.apache.org/docs/current/spec.html 如上所述,Unions使用JSON数组表示。例如,["null","string"]声明一个模式,该模式可以是null或string。 ( 请注意,当为类型为联合的记录字段指定默认值时,默认值的类型必须与联合的第一个元素匹配。 因此,对于包含“null”的联合,通常首先
我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。