当前位置: 首页 > 知识库问答 >
问题:

爆炸(移调?)Spark SQL表中的多个列

岳枫
2023-03-14

我正在使用Spark SQL(我提到它是在Spark中,以防影响SQL语法-我还不太熟悉,还不能确定),我有一个表,我正在尝试重新构造,但我在尝试同时转置多个列时遇到了麻烦。

基本上,我有看起来像这样的数据:

userId    someString      varA     varB
   1      "example1"    [0,2,5]   [1,2,9]
   2      "example2"    [1,20,5]  [9,null,6]

并且我想同时分解varA和varB(长度将始终保持一致) - 因此最终输出如下所示:

userId    someString      varA     varB
   1      "example1"       0         1
   1      "example1"       2         2
   1      "example1"       5         9
   2      "example2"       1         9
   2      "example2"       20       null
   2      "example2"       5         6

但是我似乎只能让一个explee(var)语句在一个命令中工作,如果我尝试链接它们(即在第一个分解命令之后创建一个临时表),那么我显然会得到大量重复的,不必要的行。

非常感谢!

共有3个答案

汝和裕
2023-03-14

这将工作,即使我们有超过3列

case class Input(user_id: Integer, someString: String, varA: Array[Integer], varB: Array[Integer], varC: Array[String], varD: Array[String])

val obj1 = Input(1, "example1", Array(0,2,5), Array(1,2,9), Array("a","b","c"), Array("red","green","yellow"))
val obj2 = Input(2, "example2", Array(1,20,5), Array(9,null,6), Array("d","e","f"), Array("white","black","cyan"))
val obj3 = Input(3, "example3", Array(10,11,12), Array(5,8,7), Array("g","h","i"), Array("blue","pink","brown"))

val input_df = sc.parallelize(Seq(obj1, obj2, obj3)).toDS
input_df.show()

val zip = udf((a: Seq[String], b: Seq[String], c: Seq[String], d: Seq[String]) => {a.indices.map(i=> (a(i), b(i), c(i), d(i)))})

val output_df = input_df.withColumn("vars", explode(zip($"varA", $"varB", $"varC", $"varD"))).
                         select($"user_id", $"someString", $"vars._1".alias("varA"), $"vars._2".alias("varB"), $"vars._3".alias("varC"), $"vars._4".alias("varD"))
output_df.show()
弓明亮
2023-03-14

你也可以试试

case class Input(
 userId: Integer,
 someString: String,
 varA: Array[Integer],
 varB: Array[Integer])

case class Result(
 userId: Integer,
 someString: String,
 varA: Integer,
 varB: Integer)

def getResult(row : Input) : Iterable[Result] = {
 val user_id = row.user_id
 val someString = row.someString
 val varA = row.varA
 val varB = row.varB
 val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i))}
 seq
 }

val obj1 = Input(1, "string1", Array(0, 2, 5), Array(1, 2, 9))
val obj2 = Input(2, "string2", Array(1, 3, 6), Array(2, 3, 10))
val input_df = sc.parallelize(Seq(obj1, obj2)).toDS

val res = input_df.flatMap{ row => getResult(row) }
res.show
// +------+----------+----+-----+
// |userId|someString|varA|varB |
// +------+----------+----+-----+
// |     1|  string1 |   0|   1 |
// |     1|  string1 |   2|   2 |
// |     1|  string1 |   5|   9 |
// |     2|  string2 |   1|   2 |
// |     2|  string2 |   3|   3 |
// |     2|  string2 |   6|   10|
// +------+----------+----+-----+
侯和惬
2023-03-14

火花

您可以跳过 zip udf 并使用arrays_zip功能

df.withColumn("vars", explode(arrays_zip($"varA", $"varB"))).select(
  $"userId", $"someString",
  $"vars.varA", $"vars.varB").show

火花

没有定制的UDF,你想要的是不可能的。在Scala中,你可以这样做:

val data = sc.parallelize(Seq(
    """{"userId": 1, "someString": "example1",
        "varA": [0, 2, 5], "varB": [1, 2, 9]}""",
    """{"userId": 2, "someString": "example2",
        "varA": [1, 20, 5], "varB": [9, null, 6]}"""
))

val df = spark.read.json(data)

df.printSchema
// root
//  |-- someString: string (nullable = true)
//  |-- userId: long (nullable = true)
//  |-- varA: array (nullable = true)
//  |    |-- element: long (containsNull = true)
//  |-- varB: array (nullable = true)
//  |    |-- element: long (containsNull = true)

现在我们可以定义zipudf:

import org.apache.spark.sql.functions.{udf, explode}

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

// +------+----------+----+----+
// |userId|someString|varA|varB|
// +------+----------+----+----+
// |     1|  example1|   0|   1|
// |     1|  example1|   2|   2|
// |     1|  example1|   5|   9|
// |     2|  example2|   1|   9|
// |     2|  example2|  20|null|
// |     2|  example2|   5|   6|
// +------+----------+----+----+

SQL:

sqlContext.udf.register("zip", (xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))
df.registerTempTable("df")

sqlContext.sql(
  """SELECT userId, someString, explode(zip(varA, varB)) AS vars FROM df""")
 类似资料:
  • 我有一个模式,其中每行包含多个数组列,我想独立于每个数组列爆炸。 假设我们有列: 我想要一个输出: 想法? (哦,我正在尝试这样做,所以我不必随着模式的变化而更新代码,也因为实际的模式有点大…) PS -支持这个非常相似但不同的问题,我无耻地从这个问题中窃取了示例数据。 编辑:@oliik赢了,但是,如果能用<code>df来实现这一点,那也太棒了。flatMap(主要是因为我仍然不去摸索<cod

  • 问题内容: 我有一个问题,我有一个字符串数组,并且我想在其他定界符中爆炸。例如 我需要一个在@或vs中爆炸的数组。 我已经写了一个解决方案,但是如果每个人都有更好的解决方案,请在此处发布。 问题答案: 怎么用

  • 问题内容: 我想从包含单词列表的DataFrame转换为每个单词都在其自己行中的DataFrame。 如何在DataFrame中的列上爆炸? 这是我尝试的一些示例,您可以在其中取消注释每个代码行并获取以下注释中列出的错误。我在带有Spark 1.6.1的Python 2.7中使用PySpark。 请指教 问题答案: 和是SQL函数。两者都在SQL上运行。将Java正则表达式作为第二个参数。如果要在

  • 问题内容: 我正在将多个JSON对象读取到一个DataFrame中。问题在于某些列是列表。而且,数据非常大,因此我无法使用互联网上可用的解决方案。它们非常慢并且内存效率低下 这是我的数据的样子: 这就是我的数据的形状:(441079,12) 我想要的输出是: 编辑:标记为重复后,我想强调一个事实,在这个问题中,我正在寻找一种爆炸多列的 有效 方法。因此,批准的答案能够有效地爆炸非常大的数据集上的任

  • 问题内容: 如何用一个或多个空格或制表符爆炸字符串? 例: 我想使它成为一个数组。 问题答案:

  • 我一直在努力研究如何使用maven overlay插件将项目排除在爆炸战争之外。 我有以下内容: xml和applicationcontext.xml可以很好地排除,但它们位于:${basedir}/src/main/webapp/web-inf/下 无论我尝试什么,这些文件是静止的,覆盖,尽管排除。