当前位置: 首页 > 知识库问答 >
问题:

Spark from_json-StructType和ArrayType

仲孙鸿飞
2023-03-14

我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。

我可以将字符串转换为JSON,但如何编写模式来处理数组?

没有数组的字符串-工作得很好

import org.apache.spark.sql.functions._

val schemaExample = new StructType()
          .add("FirstName", StringType)
          .add("Surname", StringType)

val dfExample = spark.sql("""select "{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }" as theJson""")

val dfICanWorkWith = dfExample.select(from_json($"theJson", schemaExample))

dfICanWorkWith.collect()

// Results \\
res19: Array[org.apache.spark.sql.Row] = Array([[Johnny,Boy]])

带数组的字符串 - 无法弄清楚这个

import org.apache.spark.sql.functions._

val schemaExample2 = new StructType()
                              .add("", ArrayType(new StructType()
                                                          .add("FirstName", StringType)
                                                          .add("Surname", StringType)
                                                )
                                  )

val dfExample2= spark.sql("""select "[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }" as theJson""")

val dfICanWorkWith = dfExample2.select(from_json($"theJson", schemaExample2))

dfICanWorkWith.collect()

// Result \\
res22: Array[org.apache.spark.sql.Row] = Array([null])

共有2个答案

商高谊
2023-03-14

作为火花2.4的schema_of_json功能帮助:

> SELECT schema_of_json('[{"col":0}]');
  array<struct<col:int>>

在您的情况下,您可以使用下面的代码来解析son对象的数组:

scala> spark.sql("""select from_json("[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }]", 'array<struct<FirstName:string,Surname:string>>' ) as theJson""").show(false)
+------------------------------+
|theJson                       |
+------------------------------+
|[[Johnny, Boy], [Franky, Man]]|
+------------------------------+
秦跃
2023-03-14

问题是你没有一个完全合格的json。您的json缺少一些东西:

  • 首先,您缺少完成 json 的周围 {}
  • 其次,您缺少变量值(您将其设置为“”,但未添加它)
  • 最后,你错过了结束 ]

尝试将其替换为:

val dfExample2= spark.sql("""select "{\"\":[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }]}" as theJson""")

您将获得:

scala> dfICanWorkWith.collect()
res12: Array[org.apache.spark.sql.Row] = Array([[WrappedArray([Johnny,Boy], [Franky,Man])]])
 类似资料:
  • 我有一个类似这样的JSON: 我正在尝试将此结构映射到 Spark 架构。我已经创建了以下内容;但是它不起作用。我还尝试在值字段映射中移除。 另外,请注意,它们“key1”和“key2”是动态字段,将使用唯一标识符生成。也可以有两个以上的键。有没有人能够将数组类型映射到结构类型?

  • 我有一个具有以下模式的数据帧: 我想使用一个UDF,它将user_loans_arr和new_loan作为输入,并将new_loan结构添加到现有的user_loans_arr中。然后,从user_loans_arr中删除loan_date超过12个月的所有元素。 提前谢谢。

  • 我知道,Case类是最小的正则类,而StructType是一种spark数据类型,它是StructFields的集合。 但是我们可以使用Case类和StructType以类似的方式创建数据帧和其他用例。 想要理解 在什么情况下,我们应该选择其中一种,为什么

  • 这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。 我正在使用一个Spark数据框架,它可以从几个不同的模式版本之一加载数据: 我正在使用Spark Avro加载数据。 它可能是版本一文件或版本二文件。但是我希望能够以相同的方式处理它,将未知值设置为“null”。我之前的问题中的建议是设置模式,但是我不想重复自己在文件中编写模式,也不想重复自己在和朋友中编写模式。如何将avro

  • 如果我想从中创建一个(即),有没有一种方法可以在不创建的情况下实现它?我很容易做到: 但是,当我想要的只是架构时,实际创建似乎有些过头了。 (如果您很好奇,问题背后的原因是我正在定义一个,为此,您要覆盖几个返回的方法,并且我用例类。)

  • 您将如何在avro架构中描述spark 数据类型?我正在生成一个 Parquet 文件,其格式在 avro 架构中进行了描述。然后,将此文件从 S3 加载到 spark 中。有一个和数据类型,但它们与不对应。