我有一个Cassandra表,为简单起见,它类似于:
key: text
jsonData: text
blobData: blob
我可以使用spark和spark-cassandra-connector为此创建一个基本的数据框架:
val df = sqlContext.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "ks1"))
.load()
我正在努力将JSON数据扩展为其基础结构。我最终希望能够基于json字符串中的属性进行过滤并返回blob数据。类似于jsonData.foo =“ bar”并返回blobData。目前有可能吗?
Spark > = 2.4
如果需要,可以使用schema_of_json函数来确定模式(请注意,这假定任意行都是该模式的有效代表)。
import org.apache.spark.sql.functions.{lit, schema_of_json, from_json}
import collection.JavaConverters._
val schema = schema_of_json(lit(df.select($"jsonData").as[String].first))
df.withColumn("jsonData", from_json($"jsonData", schema, Map[String, String]().asJava))
Spark > = 2.1
您可以使用from_json功能:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("k", StringType, true), StructField("v", DoubleType, true)
))
df.withColumn("jsonData", from_json($"jsonData", schema))
Spark > = 1.6
您可以使用get_json_object带列和路径的方法:
import org.apache.spark.sql.functions.get_json_object
val exprs = Seq("k", "v").map(
c => get_json_object($"jsonData", s"$$.$c").alias(c))
df.select($"*" +: exprs: _*)
并将字段提取为单个字符串,然后可以将其进一步转换为预期的类型。
该path参数使用点语法$.表示,并以引号表示文档根目录(因为上面的代码使用字符串插值法$,因此必须转义$$.)。
Spark <= 1.5:
目前有可能吗?
据我所知,这不是直接可能的。您可以尝试类似以下操作:
val df = sc.parallelize(Seq(
("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"),
("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2")
)).toDF("key", "jsonData", "blobData")
我假设该blob字段不能用JSON表示。否则,您将忽略拆分和合并:
import org.apache.spark.sql.Row
val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey")
val jsons = sqlContext.read.json(df.drop("blobData").map{
case Row(key: String, json: String) =>
s"""{"key": "$key", "jsonData": $json}"""
})
val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey")
parsed.printSchema
// root
// |-- jsonData: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: double (nullable = true)
// |-- key: long (nullable = true)
// |-- blobData: string (nullable = true)
另一种(更便宜的方法,虽然更复杂)方法是使用UDF解析JSON并输出structor或mapcolumn。例如这样的事情:
import net.liftweb.json.parse
case class KV(k: String, v: Int)
val parseJson = udf((s: String) => {
implicit val formats = net.liftweb.json.DefaultFormats
parse(s).extract[KV]
})
val parsed = df.withColumn("parsedJSON", parseJson($"jsonData"))
parsed.show
// +---+--------------------+------------------+----------+
// |key| jsonData| blobData|parsedJSON|
// +---+--------------------+------------------+----------+
// | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]|
// | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]|
// +---+--------------------+------------------+----------+
parsed.printSchema
// root
// |-- key: string (nullable = true)
// |-- jsonData: string (nullable = true)
// |-- blobData: string (nullable = true)
// |-- parsedJSON: struct (nullable = true)
// | |-- k: string (nullable = true)
// | |-- v: integer (nullable = false)
我有一个卡桑德拉表,为了简单起见,看起来像: 我可以使用spark和spark cassandra连接器为此创建一个基本数据框,使用: 不过,我正在努力将JSON数据扩展到它的底层结构中。我最终希望能够根据json字符串中的属性进行筛选并返回blob数据。类似于jsonData.foo="bar"并返回blob数据。这目前可能吗?
问题内容: 新手提出的另一个问题。我有一个php变量,用于查询数据库的值。它存储在变量$ publish中,当用户单击超链接时,它的值(在数据库中)将更改。 后台发生的事情是我正在查询数据库表中存储在$ publish变量中的某些数据。如果$ publish为空,则会在弹出窗口中添加publish.html的链接。弹出窗口将处理一个表单,并将数据添加到数据库,这意味着$ publish不再为空。我
如果我在mysql数据库的列中有如下json 如何在mysql中选择同一对象名为'John'且checked=true的所有行。 json对象可能有更多的键,并且键可能没有特定的顺序。
我是JSON新手,正在尝试寻找一种查询JSON数据的方法,并将信息导入适当的
我的准备好的语句有问题,但我无法找出错误所在。我正在尝试将URI链接插入数据库。 错误
想知道如何看待mysql json专栏? category_ids例如) [19, 102, 108] 如果我想搜索包含类别id 102的产品列表,如何使用queryDSL进行查询? 我尝试了JsonNode类型,但它不起作用。