当前位置: 首页 > 知识库问答 >
问题:

将Spark中的多个ArrayType列合并为一个ArrayType列

慕高格
2023-03-14

我想合并火花中的多个数组类型[字符串类型]列以创建一个数组类型[字符串类型]。为了组合两列,我在这里找到了解决方案:

将Array[string]类型的两个spark sql列合并成一个新的Array[string]列

但是如果我不知道编译时的列数,我该如何进行组合呢?在运行时,我将知道所有要组合的列的名称。

一种选择是使用上面stackoverflow问题中定义的UDF,在循环中多次添加两列。但这涉及对整个数据框架的多次读取。有没有办法一次完成?

+------+------+---------+
| col1 | col2 | combined|
+------+------+---------+
| [a,b]| [i,j]|[a,b,i,j]|
| [c,d]| [k,l]|[c,d,k,l]|
| [e,f]| [m,n]|[e,f,m,n]|
| [g,h]| [o,p]|[g,h,o,p]|
+------+----+-----------+

共有2个答案

乌甫
2023-03-14

> < li>

处理dataframe架构并获取< code > array type[string type]类型的所有列。

使用函数创建新的数据帧。前两列的array_union

遍历其余列并将它们中的每一个添加到组合列中

>>>from pyspark import Row
>>>from pyspark.sql.functions import array_union
>>>df = spark.createDataFrame([Row(col1=['aa1', 'bb1'], 
                                col2=['aa2', 'bb2'],
                                col3=['aa3', 'bb3'], 
                                col4= ['a', 'ee'], foo="bar"
                               )])
>>>df.show()
+----------+----------+----------+-------+---+
|      col1|      col2|      col3|   col4|foo|
+----------+----------+----------+-------+---+
|[aa1, bb1]|[aa2, bb2]|[aa3, bb3]|[a, ee]|bar|
+----------+----------+----------+-------+---+
>>>cols = [col_.name for col_ in df.schema 
...       if col_.dataType == ArrayType(StringType()) 
...        or col_.dataType == ArrayType(StringType(), False)
...       ]
>>>print(cols)
['col1', 'col2', 'col3', 'col4']
>>>
>>>final_df = df.withColumn("combined", array_union(cols[:2][0], cols[:2][1]))
>>>
>>>for col_ in cols[2:]:
...    final_df = final_df.withColumn("combined", array_union(col('combined'), col(col_)))
>>>
>>>final_df.select("combined").show(truncate=False)
+-------------------------------------+
|combined                             |
+-------------------------------------+
|[aa1, bb1, aa2, bb2, aa3, bb3, a, ee]|
+-------------------------------------+ 
沈思博
2023-03-14
val arrStr: Array[String] = Array("col1", "col2")

val arrCol: Array[Column] = arrString.map(c => df(c))

val assembleFunc = udf { r: Row => assemble(r.toSeq: _*)}

val outputDf = df.select(col("*"), assembleFunc(struct(arrCol: 
_*)).as("combined"))

def assemble(rowEntity: Any*): 
                    collection.mutable.WrappedArray[String] = {

 var outputArray = 
 rowEntity(0).asInstanceOf[collection.mutable.WrappedArray[String]]

  rowEntity.drop(1).foreach {
    case v: collection.mutable.WrappedArray[String] =>
      outputArray ++= v
    case null =>
      throw new SparkException("Values to assemble cannot be 
      null.")
    case o =>
      throw new SparkException(s"$o of type ${o.getClass.getName} 
      is not supported.")
 }

outputArray
}

outputDf.show(false)    
 类似资料:
  • 问题内容: 我有一个带有2 ArrayType字段的PySpark DataFrame: 我想将它们合并为一个ArrayType字段: 适用于字符串的语法在这里似乎不起作用: 谢谢! 问题答案: 火花 > = 2.4 您可以使用功能(SPARK-23736): 要保留其中一个值时的数据,可以使用: 火花 <2.4 不幸的是,一般情况下要串联列,您将需要一个UDF,例如: 可以用作:

  • 我在pyspark数据帧中有一个StringType()列和一个ArrayType(StringType())列。我想将StringType()列与ArrayType(StringType())列的每个元素合并 示例: 谢谢:)

  • 我在Scala的Spark数据框架中有一列,它是使用 我想将此列传递给UDF,以便进一步处理,以处理此聚合列中的一个索引。 当我将参数传递给我的UDF时: UDF-类型为Seq[Row]:val removeUnstableActivations:UserDefinedFunction=UDF((xyz:java.util.Date,def:Seq[Row]) 我收到错误: 我应该如何传递这些列,

  • 我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。 我可以将字符串转换为JSON,但如何编写模式来处理数组? 没有数组的字符串-工作得很好 带数组的字符串 - 无法弄清楚这个

  • 是否可以将StringType列强制转换为spark dataframe中的ArrayType列? A:数组(nullable=true)

  • 我试图在我的数据集上运行PySpark中的FPGrowth算法。 我得到以下错误: 我的数据帧df格式如下: 如果“名称”列中的数据形式为: 如何在这个形式中从StringType转换为ArrayType 我从我的RDD形成了Dataframe: