当前位置: 首页 > 知识库问答 >
问题:

用avro模式(avsc)在Java Spark作业中向s3写入avro数据

印劲
2023-03-14
{
  "type" : "record",
  "name" : "name1",
  "namespace" : "com.data"
  "fields" : [
  {
    "name" : "id",
    "type" : "string"
  },
  {
    "name" : "count",
    "type" : "int"
  },
  {
    "name" : "val_type",
    "type" : {
      "type" : "enum",
      "name" : "ValType"
      "symbols" : [ "s1", "s2" ]
    }
  }
  ]
}

当我尝试基于avro模式将avro数据写入s3时

DF数据类型:

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

finaldf.write().option(“avroschema”,string.valueof(inAvroSchema)).format(“com.databricks.spark.avro”).mode(“overwrite”).save(“target_s3_path”);

我得到了错误:

User class threw exception: org.apache.spark.SparkException: Job aborted.
    ......
    Caused by: org.apache.avro.AvroRuntimeException: **Not a union: "string"**
        at org.apache.avro.Schema.getTypes(Schema.java:299)
        at 
org.apache.spark.sql.avro.AvroSerializer.org$apache$spark$sql$avro$AvroSerializer$$resolveNullableType(AvroSerializer.scala:229)

共有1个答案

司徒志强
2023-03-14

我在附近挖了一些东西,发现了一些有趣的东西,

case class Name1(id: String, count: Int, val_type: String)

val schema = """{
                   |  "type" : "record",
                   |  "name" : "name1",
                   |  "namespace" : "com.data",
                   |  "fields" : [
                   |  {
                   |    "name" : "id",
                   |    "type" : "string"
                   |  },
                   |  {
                   |    "name" : "count",
                   |    "type" : "int"
                   |  },
                   |  {
                   |    "name" : "val_type",
                   |    "type" : {
                   |      "type" : "enum",
                   |      "name" : "ValType",
                   |      "symbols" : [ "s1", "s2" ]
                   |    }
                   |  }
                   |  ]
                   |}""".stripMargin


val d = Seq(Name1("1",2,"s1"),Name1("1",3,"s2"),Name1("1",4,"s2"),Name1("11",2,"s1")).toDF()

d.write.mode(SaveMode.Overwrite).format("avro").option("avroSchema",schema).save("data/tes2/")

当我使用Spark2.4.x执行代码时,上面的代码失败了,但是当我使用新的Spark3.0.0运行相同的代码时,代码成功了,数据被成功写入。

val df = spark.read.format("avro").load("data/tes2/")
df.printSchema()
df.show(10)

root
 |-- id: string (nullable = true)
 |-- count: integer (nullable = true)
 |-- val_type: string (nullable = true)

+---+-----+--------+
| id|count|val_type|
+---+-----+--------+
| 11|    2|      s1|
|  1|    4|      s2|
|  1|    3|      s2|
|  1|    2|      s1|
+---+-----+--------+

我想最好的办法是升级spark版本或更改avro模式定义。

 类似资料:
  • 我想从kafka主题中读取流数据,并以avro或parquet格式写入S3。数据流看起来像是json字符串,但我无法以avro或parquet格式转换并写入S3。 val stream=env.AddSource(myConsumerSource).AddSink(sink) 请帮忙,谢谢!

  • 根据Avro文档中“default”属性的定义:“此字段的默认值,用于读取缺少此字段的实例(可选)。” 这意味着,如果缺少相应的字段,则采用默认值。 但事实似乎并非如此。考虑下面的<代码>学生<代码>模式: 模式表示:如果“年龄”字段丢失,则将值视为-1。“名称”字段也是如此。 现在,如果我尝试从以下JSON构建学生模型: 我得到一个例外: 看起来默认设置未按预期工作。那么,违约的作用到底是什么呢

  • 显然它无法解码数据。有什么想法吗?

  • 我有两个问题: > 我曾尝试使用模式V1编写记录,并使用模式V2读取记录,但出现以下错误: org.apache.avro。AvroTypeException:找到foo,应为foo 我使用avro-1.7.3和: 以下是这两种模式的示例(我也尝试过添加命名空间,但没有成功)。 架构V1: 架构V2: 提前谢谢。

  • 根据Avro模式规范(适用于接头):https://avro.apache.org/docs/current/spec.html 如上所述,Unions使用JSON数组表示。例如,["null","string"]声明一个模式,该模式可以是null或string。 ( 请注意,当为类型为联合的记录字段指定默认值时,默认值的类型必须与联合的第一个元素匹配。 因此,对于包含“null”的联合,通常首先

  • 我正在从Cloudera包裹中运行带有Spark 0.9.0的CDH 4.4。 我有一堆Avro文件是通过Pig的AvroStorage UDF创建的。我想在 Spark 中加载这些文件,使用通用记录或载入 Avro 文件的架构。到目前为止,我已经尝试过这个: 这适用于一个文件,但它不能扩展——我将所有数据加载到本地RAM中,然后从那里跨spark节点分发。