当前位置: 首页 > 知识库问答 >
问题:

Spark Scala-将结构数组拆分为数据帧列

阎单鹗
2023-03-14

我有一个包含结构数组的嵌套源json文件。结构的数量因行而异,我想使用Spark(scala)从结构的键/值动态创建新的数据框架列,其中键是列名,值是列值。

{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}
scala> val df = spark.read.json("file:///tmp/nested_test.json")
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)
df.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")).
    show(truncate=false)

+----+----+----+----+----------------------------------------------------------------------------+
|key3|key4|key6|key7|values                                                                      |
+----+----+----+----+----------------------------------------------------------------------------+
|AK  |EU  |001 |N   |[[valuesColumn1, 9.876], [valuesColumn2, 1.2345], [valuesColumn3, 8.675309]]|
+----+----+----+----+----------------------------------------------------------------------------+

这里有一个由3个结构组成的数组,但这3个结构需要动态地拆分为3个单独的列(3个的数量可能会有很大的变化),我不知道如何做到这一点。

请注意,数组中的每个数组元素都产生了3个新列。

+----+----+----+----+-----------------------------------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-----------------------------------------+
|AK  |EU  |001 |N   |9.876        |1.2345        |8.675309    |
+----+----+----+----+-----------------------------------------+

我认为理想的解决方案与本SO帖子中讨论的类似,但有两个主要区别:

  1. 在SO post中,列数硬编码为3,但在我的情况下,数组元素数未知
  2. 列名需要由名称列驱动,列值需要由值驱动
...
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

共有2个答案

米俊喆
2023-03-14

我发现这种方法的性能要好得多,使用分解和透视更容易理解:

val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""

val df = spark.read.json(Seq(json).toDS())

// schema
df.printSchema
root
 |-- key1: struct (nullable = true)
 |    |-- key2: struct (nullable = true)
 |    |    |-- key3: string (nullable = true)
 |    |    |-- key4: string (nullable = true)
 |    |    |-- key5: struct (nullable = true)
 |    |    |    |-- key6: string (nullable = true)
 |    |    |    |-- key7: string (nullable = true)
 |    |    |    |-- values: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- value: string (nullable = true)

// create final df
val finalDf = df.
    select(
      $"key1.key2.key3".as("key3"),
      $"key1.key2.key4".as("key4"),
      $"key1.key2.key5.key6".as("key6"),
      $"key1.key2.key5.key7".as("key7"),
      explode($"key1.key2.key5.values").as("values")
    ).
    groupBy(
      $"key3", $"key4", $"key6", $"key7"
    ).
    pivot("values.name").
    agg(min("values.value")).alias("values.name")

// result
finalDf.show
+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|  AK|  EU| 001|   N|        9.876|       1.2345|     8.675309|
+----+----+----+----+-------------+-------------+-------------+
夏侯承恩
2023-03-14

您可以这样做:

val sac = new SparkContext("local[*]", " first Program");
val sqlc = new SQLContext(sac);
import sqlc.implicits._;
import org.apache.spark.sql.functions.split
import scala.math._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{ min, max }

val json = """{"key1":{"key2":{"key3":"AK","key4":"EU","key5":{"key6":"001","key7":"N","values":[{"name":"valuesColumn1","value":"9.876"},{"name":"valuesColumn2","value":"1.2345"},{"name":"valuesColumn3","value":"8.675309"}]}}}}"""

val df1 = sqlc.read.json(Seq(json).toDS())

val df2 = df1.select(
    ($"key1.key2.key3").as("key3"),
    ($"key1.key2.key4").as("key4"),
    ($"key1.key2.key5.key6").as("key6"),
    ($"key1.key2.key5.key7").as("key7"),
    ($"key1.key2.key5.values").as("values")
)

val numColsVal = df2
    .withColumn("values_size", size($"values"))
    .agg(max($"values_size"))
    .head()
    .getInt(0)

val finalDFColumns = df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null))).columns
val finalDF = df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)
finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

最终输出结果为:

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

希望我答对了你的问题!

-----------带说明编辑----------

此块获取要为数组结构创建的列数。

val numColsVal = df2
        .withColumn("values_size", size($"values"))
        .agg(max($"values_size"))
        .head()
        .getInt(0)

finalDFColumns是创建的DF,所有预期列都作为空值的输出。

块返回需要从数组结构创建的不同列。

df2.select(explode($"values").as("values")).select("values.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect

下面的块将上述新列与df2中的其他列组合在一起,并用空/空值初始化。

foldLeft(df2.limit(0))((cdf, c) => cdf.withColumn(c, lit(null)))

如果您打印将获得的输出,则组合这两个块:

+----+----+----+----+------+-------------+-------------+-------------+
|key3|key4|key6|key7|values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+------+-------------+-------------+-------------+
+----+----+----+----+------+-------------+-------------+-------------+

现在我们已经准备好了结构。这里需要相应列的值。下面的块为我们提供了值:

df2.select($"*" +: (0 until numColsVal).map(i => $"values".getItem(i)("value").as($"values".getItem(i)("name").toString)): _*)

结果如下:

+----+----+----+----+--------------------+---------------+---------------+---------------+
|key3|key4|key6|key7|              values|values[0][name]|values[1][name]|values[2][name]|
+----+----+----+----+--------------------+---------------+---------------+---------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|          9.876|         1.2345|       8.675309|
+----+----+----+----+--------------------+---------------+---------------+---------------+

现在我们需要像上面第一个块中那样重命名这些列。因此,我们将使用zip函数合并列,然后使用foldLeft方法重命名输出列,如下所示:

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).show(false)

这将导致以下结构:

+----+----+----+----+--------------------+-------------+-------------+-------------+
|key3|key4|key6|key7|              values|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+--------------------+-------------+-------------+-------------+
|  AK|  EU| 001|   N|[[valuesColumn1, ...|        9.876|       1.2345|     8.675309|
+----+----+----+----+--------------------+-------------+-------------+-------------+

我们就快到了。我们现在只需要删除不需要的值,如下所示:

finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf, column) => fdf.withColumnRenamed(column._1, column._2)).drop($"values").show(false)

从而产生如下预期输出-

+----+----+----+----+-------------+-------------+-------------+
|key3|key4|key6|key7|valuesColumn1|valuesColumn2|valuesColumn3|
+----+----+----+----+-------------+-------------+-------------+
|AK  |EU  |001 |N   |9.876        |1.2345       |8.675309     |
+----+----+----+----+-------------+-------------+-------------+

我不确定我是否能解释清楚。但是,如果您尝试打破上述语句/代码并尝试打印它,您将了解我们是如何达到输出的。您可以在internet上找到此逻辑中使用的不同函数的示例说明。

 类似资料:
  • 我有一个包含(我认为是)对的数据帧。 它看起来像这样: < code>Col2曾经包含一个< code>Map[String,String],我在上面做了一个< code>toList(),然后做了< code>explode()以获得原始Map中每个映射的一行。 我想将< code>Col2分成2列,并获得以下数据帧: 有谁知道如何做到这一点? 或者,有人知道如何将一个映射分解成多行(每个映射一

  • 我有两个数据帧df1和df2。df1就像一个具有以下值的字典 df2具有以下值: 我想基于df1数据帧中的,将df2拆分为3个新的数据帧。 日期,TLRA_权益栏应位于数据框 预期产出: > 数据帧 消费者,非周期性数据帧 请让我知道如何有效地做。我想做的是连接列名,例如,然后根据列名的前半部分分割数据帧。 代码: 但这很复杂。需要更好的解决方案。

  • 我有列。 如何根据值将其拆分为2? 第一个将包含

  • 我有下面的spark数据框架。 我必须将上面的数据帧列拆分为多个列,如下所示。 我尝试使用分隔符进行拆分;和限制。但是它也将主题拆分为不同的列。姓名和年龄被组合在一起成一列。我要求所有主题在一列中,只有姓名和年龄在单独的列中。 这在Pyspark有可能实现吗?

  • 问题内容: 想象一下,我有一个这样的JS数组: 我想要的是将该数组拆分为N个较小的数组。例如: 对于Python,我有这个: 对于JS,我可以提出的最佳解决方案是递归函数,但我不喜欢它,因为它既复杂又丑陋。这个内部函数返回一个像这样的数组[1,2,3,null,4,5,6,null,7,8],然后我必须再次循环并手动拆分它。(我的第一次尝试是返回此:[1、2、3,[4、5、6,[7、8、9]]],

  • 问题内容: 假设我有一个如下所示的Javascript数组: 什么方法适合将数组分块(拆分)为更小的数组,最多可以有10个元素? 问题答案: 该array.slice方法可以从一开始,中间,或自己需要的任何目的数组的结束提取切片,在不改变原来的数组。