当前位置: 首页 > 知识库问答 >
问题:

如何使用Spark DataFrames查询JSON数据列?

汝和裕
2023-03-14

我有一个卡桑德拉表,为了简单起见,看起来像:

key: text
jsonData: text
blobData: blob

我可以使用spark和spark cassandra连接器为此创建一个基本数据框,使用:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

不过,我正在努力将JSON数据扩展到它的底层结构中。我最终希望能够根据json字符串中的属性进行筛选并返回blob数据。类似于jsonData.foo="bar"并返回blob数据。这目前可能吗?

共有3个答案

莫宝
2023-03-14

来自_json的函数正是您想要的。您的代码将类似于:

val df = sqlContext.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "mytable", "keyspace" -> "ks1"))
  .load()

//You can define whatever struct type that your json states
val schema = StructType(Seq(
  StructField("key", StringType, true), 
  StructField("value", DoubleType, true)
))

df.withColumn("jsonData", from_json(col("jsonData"), schema))

简宏义
2023-03-14
匿名用户

zero323的回答是彻底的,但遗漏了Spark 2.1中可用的一种方法,它比使用json()的模式更简单、更健壮。:

import org.apache.spark.sql.functions.from_json

val json_schema = spark.read.json(df.select("jsonData").as[String]).schema
df.withColumn("jsonData", from_json($"jsonData", json_schema))

以下是Python的等价物:

from pyspark.sql.functions import from_json

json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
df.withColumn("jsonData", from_json("jsonData", json_schema))

正如zero323所指出的那样,json()的模式的问题在于它检查单个字符串并从中导出模式。如果JSON数据具有不同的模式,那么从schema_of_JSON()返回的模式将不会反映如果将所有JSON数据的模式合并到数据帧中会得到什么。使用来自_json()的解析该数据将产生大量的null或空值,其中_json()schema_返回的模式与数据不匹配。

通过使用Spark从JSON字符串的RDD派生出全面的JSON模式的能力,我们可以保证所有JSON数据都可以被解析。

下面是一个示例(在Python中,Scala的代码非常类似),说明了使用schema\u of_json()从单个元素派生模式与使用spark从所有数据派生模式之间的区别。阅读json()

>>> df = spark.createDataFrame(
...     [
...         (1, '{"a": true}'),
...         (2, '{"a": "hello"}'),
...         (3, '{"b": 22}'),
...     ],
...     schema=['id', 'jsonData'],
... )

a在一行中有一个布尔值,在另一行中有一个字符串值。a的合并模式将其类型设置为字符串。b将是一个整数。

让我们看看不同的方法是如何比较的。首先,json()的schema\u方法:

python prettyprint-override">>>> json_schema = schema_of_json(df.select("jsonData").take(1)[0][0])
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: boolean (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1|  [true]|
|  2|    null|
|  3|      []|
+---+--------+

如您所见,我们派生的JSON模式非常有限<代码>“a”:“hello”无法被解析为布尔值并返回null“b”:22刚刚被删除,因为它不在我们的架构中。

现在使用spark。阅读json()

>>> json_schema = spark.read.json(df.select("jsonData").rdd.map(lambda x: x[0])).schema
>>> parsed_json_df = df.withColumn("jsonData", from_json("jsonData", json_schema))
>>> parsed_json_df.printSchema()
root
 |-- id: long (nullable = true)
 |-- jsonData: struct (nullable = true)
 |    |-- a: string (nullable = true)
 |    |-- b: long (nullable = true)

>>> parsed_json_df.show()
+---+--------+
| id|jsonData|
+---+--------+
|  1| [true,]|
|  2|[hello,]|
|  3|  [, 22]|
+---+--------+

在这里,我们保留了所有的数据,并有一个全面的模式来解释所有的数据“a”:true被转换为字符串,以匹配“a”:“hello”的架构。

使用spark.read.json()的主要缺点是Spark将扫描所有数据以导出模式。根据您拥有的数据量,这种开销可能很大。如果您知道所有JSON数据都有一个一致的模式,那么可以直接使用schema_of_json()来针对单个元素。如果您有模式可变性,但不想扫描所有数据,可以在调用spark.read.json()来查看数据的子集时,将samplingRatio设置为小于1.0

以下是spark的文档。阅读json():Scala API/Python API

闻人昕
2023-03-14

火花

如果需要,可以使用_json函数的schema_来确定模式(请注意,这假设任意一行是模式的有效代表)。

import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._

val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))

火花

您可以使用_json函数中的

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
  StructField("k", StringType, true), StructField("v", DoubleType, true)
))

df.withColumn("jsonData", from_json($"jsonData", schema))

火花

您可以使用get_json_object,它接受一列和一个路径:

import org.apache.spark.sql.functions.get_json_object

val exprs = Seq("k", "v").map(
  c => get_json_object($"jsonData", s"$$.$c").alias(c))

df.select($"*" +: exprs: _*)

并将字段提取到单个字符串,这些字符串可以进一步转换为预期的类型。

path参数使用点语法表示,前导为$ 表示文档根(因为上面的代码使用字符串插值$必须转义,因此$$。)。

火花

目前是否可能?

据我所知不是直接可以的。你可以试试类似的:

val df = sc.parallelize(Seq(
  ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
  ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")

我假设blob字段不能用JSON表示。否则,将忽略拆分和合并:

import org.apache.spark.sql.Row

val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
  case Row(key: String, json: String) =>
    s"""{"key": "$key", "jsonData": $json}"""
}) 

val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema

// root
//  |-- jsonData: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: double (nullable = true)
//  |-- key: long (nullable = true)
//  |-- blobData: string (nullable = true)

另一种方法(更便宜,但更复杂)是使用UDF解析JSON并输出结构map列。例如:

import net.liftweb.json.parse

case class KV(k: String, v: Int)

val parseJson = udf((s: String) => {
  implicit val formats = net.liftweb.json.DefaultFormats
  parse(s).extract[KV]
})

val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show

// +---+--------------------+------------------+----------+
// |key|            jsonData|          blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// |  1|{"k": "foo", "v":...|some_other_field_1|   [foo,1]|
// |  2|{"k": "bar", "v":...|some_other_field_2|   [bar,3]|
// +---+--------------------+------------------+----------+

parsed.printSchema

// root
//  |-- key: string (nullable = true)
//  |-- jsonData: string (nullable = true)
//  |-- blobData: string (nullable = true)
//  |-- parsedJSON: struct (nullable = true)
//  |    |-- k: string (nullable = true)
//  |    |-- v: integer (nullable = false)

 类似资料:
  • 问题内容: 我有一个Cassandra表,为简单起见,它类似于: 我可以使用spark和spark-cassandra-connector为此创建一个基本的数据框架: 我正在努力将JSON数据扩展为其基础结构。我最终希望能够基于json字符串中的属性进行过滤并返回blob数据。类似于jsonData.foo =“ bar”并返回blobData。目前有可能吗? 问题答案: Spark > = 2.

  • 问题内容: 新手提出的另一个问题。我有一个php变量,用于查询数据库的值。它存储在变量$ publish中,当用户单击超链接时,它的值(在数据库中)将更改。 后台发生的事情是我正在查询数据库表中存储在$ publish变量中的某些数据。如果$ publish为空,则会在弹出窗口中添加publish.html的链接。弹出窗口将处理一个表单,并将数据添加到数据库,这意味着$ publish不再为空。我

  • 如果我在mysql数据库的列中有如下json 如何在mysql中选择同一对象名为'John'且checked=true的所有行。 json对象可能有更多的键,并且键可能没有特定的顺序。

  • 我是JSON新手,正在尝试寻找一种查询JSON数据的方法,并将信息导入适当的

  • 我的准备好的语句有问题,但我无法找出错误所在。我正在尝试将URI链接插入数据库。 错误

  • 想知道如何看待mysql json专栏? category_ids例如) [19, 102, 108] 如果我想搜索包含类别id 102的产品列表,如何使用queryDSL进行查询? 我尝试了JsonNode类型,但它不起作用。