当前位置: 首页 > 知识库问答 >
问题:

分解Spark SQL表中的多列

方焱
2023-03-14

这里有一个关于这个问题的问题:

爆炸(移调?)Spark SQL表中的多个列

假设我们有如下额外的列:

**userId    someString      varA     varB      varC    varD**
   1        "example1"    [0,2,5]   [1,2,9]    [a,b,c] [red,green,yellow]
   2        "example2"    [1,20,5]  [9,null,6] [d,e,f] [white,black,cyan]

结束如下输出:

userId    someString      varA     varB   varC     varD
   1      "example1"       0         1     a       red
   1      "example1"       2         2     b       green
   1      "example1"       5         9     c       yellow
   2      "example2"       1         9     d       white
   2      "example2"       20       null   e       black
   2      "example2"       5         6     f       Cyan

答案是将< code>udf定义为:

val zip = udf((xs: Seq[Long], ys: Seq[Long]) => xs.zip(ys))

并定义“与列”。

df.withColumn("vars", explode(zip($"varA", $"varB"))).select(
   $"userId", $"someString",
   $"vars._1".alias("varA"), $"vars._2".alias("varB")).show

如果我们需要扩展上面的答案,有更多的栏目,修改上面代码的最简单方法是什么。任何帮助请。

共有3个答案

乜坚成
2023-03-14

如果要将 UDF 扩展到更多列,请执行以下操作:

val zip = udf((xs: Seq[String], ys: Seq[String], zs: Seq[String]) =>
  for (((xs,ys),zs) <- xs zip ys zip zs) yield (xs,ys,zs))

df.withColumn("vars", explode(zip($"varA", $"varB", $"varC"))).select(
  $"userId", $"someString", $"vars._1".alias("varA"),
  $"vars._2".alias("varB"),$"vars._3".alias("varC")).show

该逻辑可以根据需要应用于n列。

公良向阳
2023-03-14

我假设varA、varB、varC、varD的大小与您的示例保持相同。

scala> case class Input(user_id : Integer,someString : String, varA : Array[Integer],varB : Array[Integer],varC : Array[String], varD : Array[String])
defined class Input

scala> case class Result(user_id : Integer,someString : String , varA : Integer,varB : Integer,varC : String, varD : String)
defined class Result

scala> val obj1 = Input(1,"example1",Array(0,2,5),Array(1,2,9),Array("a","b","c"),Array("red","green","yellow"))
obj1: Input = Input(1,example1,[Ljava.lang.Integer;@77c43ec2,[Ljava.lang.Integer;@3a332d08,[Ljava.lang.String;@5c1222da,[Ljava.lang.String;@114e051a)

scala> val obj2 = Input(2,"example2",Array(1,20,5),Array(9,null,6),Array("d","e","f"),Array("white","black","cyan"))
obj2: Input = Input(2,example2,[Ljava.lang.Integer;@326db38,[Ljava.lang.Integer;@50914458,[Ljava.lang.String;@339b73ae,[Ljava.lang.String;@1567ee0a)

scala> val input_df = sc.parallelize(Seq(obj1,obj2)).toDS
input_df: org.apache.spark.sql.Dataset[Input] = [user_id: int, someString: string ... 4 more fields]

scala> input_df.show
+-------+----------+----------+------------+---------+--------------------+
|user_id|someString|      varA|        varB|     varC|                varD|
+-------+----------+----------+------------+---------+--------------------+
|      1|  example1| [0, 2, 5]|   [1, 2, 9]|[a, b, c]|[red, green, yellow]|
|      2|  example2|[1, 20, 5]|[9, null, 6]|[d, e, f]|[white, black, cyan]|
+-------+----------+----------+------------+---------+--------------------+

scala> def getResult(row : Input) : Iterable[Result] = {
     |             val user_id = row.user_id
     |             val someString = row.someString
     |             val varA = row.varA
     |             val varB = row.varB
     |             val varC = row.varC
     |             val varD = row.varD
     |             val seq = for( i <- 0 until varA.size) yield {Result(user_id,someString,varA(i),varB(i),varC(i),varD(i))}
     |             seq.toSeq
     |         }
getResult: (row: Input)Iterable[Result]

scala> val resdf = input_df.flatMap{row => getResult(row)}
resdf: org.apache.spark.sql.Dataset[Result] = [user_id: int, someString: string ... 4 more fields]

scala> resdf.show
+-------+----------+----+----+----+------+
|user_id|someString|varA|varB|varC|  varD|
+-------+----------+----+----+----+------+
|      1|  example1|   0|   1|   a|   red|
|      1|  example1|   2|   2|   b| green|
|      1|  example1|   5|   9|   c|yellow|
|      2|  example2|   1|   9|   d| white|
|      2|  example2|  20|null|   e| black|
|      2|  example2|   5|   6|   f|  cyan|
+-------+----------+----+----+----+------+

如果列的大小变量 A、varB、varC 或 varD 不同,则需要处理这些方案。

如果值不存在于任何列中,您可以通过处理异常遍历最大大小并输出空值。

顾永福
2023-03-14

使用 zip udf 的方法似乎没问题,但是如果用于更多集合,则需要扩展。不幸的是,没有真正好的方法来压缩4 Seqs,但这应该有效:

def assertSameSize(arrs:Seq[_]*) = {
 assert(arrs.map(_.size).distinct.size==1,"sizes differ") 
}

val zip4 = udf((xa:Seq[Long],xb:Seq[Long],xc:Seq[String],xd:Seq[String]) => {
    assertSameSize(xa,xb,xc,xd)
    xa.indices.map(i=> (xa(i),xb(i),xc(i),xd(i)))
  }
)
 类似资料:
  • 这些查询是:select*from t1 where col1='123'[t1由col1 bucketted]select*from t1 where col2='123'[col2不是bucketting列]我的问题是 > 如何确定在查询执行期间正在进行全表扫描还是正在进行相关的部分表扫描? 我能从DAG或物理计划中得到任何信息吗?我两个都看过,但我看不出有什么不同,就像我在物理计划中看到的那

  • <code>Spark</code>版本为1.3.0。 来自< code > sqlcontext . Scala (https://github . com/Apache/spark/blob/master/SQL/core/src/main/Scala/org/Apache/spark/SQL/sqlcontext . Scala)的源代码: 我真的不能理解上面的代码。 是如何工作的? (_)

  • 问题内容: 我将RDD [myClass]转换为数据框,然后将其注册为SQL表 该表是可调用的,可以用以下命令演示 但是下一步给出了错误,说表未找到:my_rdd Spark的新手。 不明白为什么会这样。有人可以帮我吗? 问题答案: 确保从相同的SQLContext导入hidden._。临时表在一个特定的SQLContext中保留在内存中。

  • 我将RDD[myClass]转换为dataframe,然后将其注册为SQL表 此表是可调用的,可以使用以下命令演示 对Spark来说是个新手。不明白为什么会这样。有谁能帮我摆脱这一切吗?

  • 我这里有一段简单的代码: 我收到一个错误,上面写着: 文件 根据提供的解决方案,我尽了最大努力。有趣的是,我在另一个表上有另一个查询,效果很好。非常感谢您的帮助。提前谢谢。 这是表的架构: ;

  • 问题内容: 我有一个关于将数据框列中的列表分成多行的问题。 假设我有这个数据框: 我想要数字的每个单一组合,因此最终结果将是: 因为现在我得到以下结果: 为了得到上面的结果,我做了: 问题答案: 与斯科特·波士顿(Scott Boston)的建议类似,我建议您分别展开各列,然后将它们合并在一起。 例如,对于“职位”: 并且,一起: