当前位置: 首页 > 知识库问答 >
问题:

如何从JSON编码的列中提取值?[重复]

薄高懿
2023-03-14

我有一个具有以下模式的Spark数据帧。

[{ "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-plan",
    "status": {
        "state": "SUCCESS"
    }
  },
  { "map": {
    "completed-stages": 1,
    "total-stages": 1 },
    "rec": "test-proc",
    "status": {
        "state": "FAILED"
  }
}]

我想将其转换为另一个具有以下模式的DF[{"rec":"test-plan","status":"SUCCESS"},{"rec":"test-pROC","status":"FAILED"}]

我已经编写了以下代码,但它不编译,并抱怨编码错误。

val fdf = DF.map(f => {
        val listCommands = f.get(0).asInstanceOf[WrappedArray[Map[String, Any]]]
        val m = listCommands.map(h => {
            var rec = "none"
            var status = "none"

            if(h.exists("status" == "state" -> _)) {
                status = (h.get("status") match {
                    case Some(x) => x.asInstanceOf[HashMap[String, String]].getOrElse("state", "none")
                    case _ => "none"
                })

                if(h.contains("rec")) {
                    rec = (h.get("rec") match {
                        case Some(x: String) => x
                        case _ => "none"
                    })
                }
            }

          Map("status"->status, "rec"->rec)
        })

      val rm = m.flatten
      rm
    })

请建议正确的方法。

共有2个答案

郎正初
2023-03-14

您没有为df提供模式,因此下面的内容可能不适合您。我将json示例保存在test.json文件中,并用val < code > df = spark . read . option(" multiLine ",true)读取它。json("test.json")在这种情况下,您只需< code>df.select($"rec ",$"status.state ")、write.json("test1.json")

缑嘉玉
2023-03-14

这将会很棘手,因为JSONs的顶级元素是不一样的,也就是说,您有< code>map1和< code>map2,因此模式是不一致的。我将与“数据生产者”交谈并请求更改,以便用单独的元素来描述命令的名称。

给定数据帧的模式如下:

scala> commands.printSchema
root
 |-- commands: array (nullable = true)
 |    |-- element: string (containsNull = true)

以及其中元素(行)的数量:

scala> commands.count
res1: Long = 1

您必须首先分解元素的< code>commands数组,然后访问感兴趣的字段。

// 1. Explode the array
val commandsExploded = commands.select(explode($"commands") as "command")
scala> commandsExploded.count
res2: Long = 2

让我们创建JSON编码记录的模式。可以如下所示。

// Note that it accepts map1 and map2 fields
import org.apache.spark.sql.types._
val schema = StructType(
  StructField("map1",
    StructType(
      StructField("completed-stages", LongType, true) ::
      StructField("total-stages", LongType, true) :: Nil), true) ::
  StructField("map2",
    StructType(
      StructField("completed-stages", LongType, true) ::
      StructField("total-stages", LongType, true) :: Nil), true) ::
  StructField("rec", StringType,true) ::
  StructField("status", StructType(
    StructField("state", StringType, true) :: Nil), true
  ) :: Nil)

因此,您应该使用from_json标准函数,该函数接受包含json编码字符串和模式的列。

val commands = commandsExploded.select(from_json($"command", schema) as "command")
scala> commands.show(truncate = false)
+-------------------------------+
|command                        |
+-------------------------------+
|[[1, 1],, test-plan, [SUCCESS]]|
|[, [1, 1], test-proc, [FAILED]]|
+-------------------------------+

让我们看看命令数据集的架构。

scala> commands.printSchema
root
 |-- command: struct (nullable = true)
 |    |-- map1: struct (nullable = true)
 |    |    |-- completed-stages: long (nullable = true)
 |    |    |-- total-stages: long (nullable = true)
 |    |-- map2: struct (nullable = true)
 |    |    |-- completed-stages: long (nullable = true)
 |    |    |-- total-stages: long (nullable = true)
 |    |-- rec: string (nullable = true)
 |    |-- status: struct (nullable = true)
 |    |    |-- state: string (nullable = true)

复杂字段如<code>rec

val recs = commands.select(
  $"command.rec" as "rec",
  $"command.status.state" as "status")

scala> recs.show
+---------+-------+
|      rec| status|
+---------+-------+
|test-plan|SUCCESS|
|test-proc| FAILED|
+---------+-------+

将其转换为单记录JSON编码数据集需要Dataset.toJSONcollect_list标准函数。

val result = recs.toJSON.agg(collect_list("value"))
scala> result.show(truncate = false)
+-------------------------------------------------------------------------------+
|collect_list(value)                                                            |
+-------------------------------------------------------------------------------+
|[{"rec":"test-plan","status":"SUCCESS"}, {"rec":"test-proc","status":"FAILED"}]|
+-------------------------------------------------------------------------------+
 类似资料:
  • 问题内容: 我有一个传递参数的url使用json_encode每个值,如下所示: 返回的结果为: 我可以用来解析每个值以进行进一步的数据处理吗? 谢谢。 问题答案: 如果第二个值为true,将返回一个对象或数组:

  • 问题内容: 我从如下服务器获取响应字符串 我想获取名字的值。我怎样才能做到这一点?提前致谢。 问题答案: 看到此代码是我在应用程序中使用的代码 我像这样找回

  • 问题内容: 首先,我将自由地让自己成为一个笨拙的文科专家,他完全可以自学此脚本。就是说,我正在尝试使用以下代码从USGS水数据服务获取值: 尽管我找到了一些有关如何从JSON响应中提取所需值的教程,但大多数教程都非常简单。我遇到的困难是从该服务返回的看起来非常复杂的响应中提取出来的。查看响应,我可以看到我想要的是来自两个不同部分的值和一个时间值。因此,我可以查看响应并查看所需的内容,但我一生无法解

  • 这是我的 mongodb 集合的架构的一部分: 我已经获取了集合并将其存储在Spark数据框中,现在正在尝试提取变量列中的最内层值。 这工作得非常好,我能够得到数据结构的内部结构。 然而,一旦我尝试进一步深入: 我得到一个无效的语法错误。 df _ temp = df1 . select(df1 . variables . actives . data . 0 . active)< br > ^

  • 我正在以JSON字符串的形式从数据库中获取数据: 如何从给定的JSON字符串中提取公司名称?

  • 我在Cassandra列族的动态列中插入了字符串和整数值。当我查询CQL的值时,它们显示为十六进制编码的位。 我能以某种方式告诉查询将值解码为字符串或整数吗? 如果这更容易,我也很乐意在CLI中执行此操作。在那里,我看到你可以指定