我想合并火花中的多个数组类型[字符串类型]列以创建一个数组类型[字符串类型]。为了组合两列,我在这里找到了解决方案:
将Array[string]类型的两个spark sql列合并成一个新的Array[string]列
但是如果我不知道编译时的列数,我该如何进行组合呢?在运行时,我将知道所有要组合的列的名称。
一种选择是使用上面stackoverflow问题中定义的UDF,在循环中多次添加两列。但这涉及对整个数据框架的多次读取。有没有办法一次完成?
+------+------+---------+
| col1 | col2 | combined|
+------+------+---------+
| [a,b]| [i,j]|[a,b,i,j]|
| [c,d]| [k,l]|[c,d,k,l]|
| [e,f]| [m,n]|[e,f,m,n]|
| [g,h]| [o,p]|[g,h,o,p]|
+------+----+-----------+
> < li>
处理dataframe架构并获取< code > array type[string type]类型的所有列。
使用函数创建新的数据帧。前两列的array_union
遍历其余列并将它们中的每一个添加到组合列中
>>>from pyspark import Row
>>>from pyspark.sql.functions import array_union
>>>df = spark.createDataFrame([Row(col1=['aa1', 'bb1'],
col2=['aa2', 'bb2'],
col3=['aa3', 'bb3'],
col4= ['a', 'ee'], foo="bar"
)])
>>>df.show()
+----------+----------+----------+-------+---+
| col1| col2| col3| col4|foo|
+----------+----------+----------+-------+---+
|[aa1, bb1]|[aa2, bb2]|[aa3, bb3]|[a, ee]|bar|
+----------+----------+----------+-------+---+
>>>cols = [col_.name for col_ in df.schema
... if col_.dataType == ArrayType(StringType())
... or col_.dataType == ArrayType(StringType(), False)
... ]
>>>print(cols)
['col1', 'col2', 'col3', 'col4']
>>>
>>>final_df = df.withColumn("combined", array_union(cols[:2][0], cols[:2][1]))
>>>
>>>for col_ in cols[2:]:
... final_df = final_df.withColumn("combined", array_union(col('combined'), col(col_)))
>>>
>>>final_df.select("combined").show(truncate=False)
+-------------------------------------+
|combined |
+-------------------------------------+
|[aa1, bb1, aa2, bb2, aa3, bb3, a, ee]|
+-------------------------------------+
val arrStr: Array[String] = Array("col1", "col2")
val arrCol: Array[Column] = arrString.map(c => df(c))
val assembleFunc = udf { r: Row => assemble(r.toSeq: _*)}
val outputDf = df.select(col("*"), assembleFunc(struct(arrCol:
_*)).as("combined"))
def assemble(rowEntity: Any*):
collection.mutable.WrappedArray[String] = {
var outputArray =
rowEntity(0).asInstanceOf[collection.mutable.WrappedArray[String]]
rowEntity.drop(1).foreach {
case v: collection.mutable.WrappedArray[String] =>
outputArray ++= v
case null =>
throw new SparkException("Values to assemble cannot be
null.")
case o =>
throw new SparkException(s"$o of type ${o.getClass.getName}
is not supported.")
}
outputArray
}
outputDf.show(false)
问题内容: 我有一个带有2 ArrayType字段的PySpark DataFrame: 我想将它们合并为一个ArrayType字段: 适用于字符串的语法在这里似乎不起作用: 谢谢! 问题答案: 火花 > = 2.4 您可以使用功能(SPARK-23736): 要保留其中一个值时的数据,可以使用: 火花 <2.4 不幸的是,一般情况下要串联列,您将需要一个UDF,例如: 可以用作:
我在pyspark数据帧中有一个StringType()列和一个ArrayType(StringType())列。我想将StringType()列与ArrayType(StringType())列的每个元素合并 示例: 谢谢:)
我在Scala的Spark数据框架中有一列,它是使用 我想将此列传递给UDF,以便进一步处理,以处理此聚合列中的一个索引。 当我将参数传递给我的UDF时: UDF-类型为Seq[Row]:val removeUnstableActivations:UserDefinedFunction=UDF((xyz:java.util.Date,def:Seq[Row]) 我收到错误: 我应该如何传递这些列,
我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。 我可以将字符串转换为JSON,但如何编写模式来处理数组? 没有数组的字符串-工作得很好 带数组的字符串 - 无法弄清楚这个
是否可以将StringType列强制转换为spark dataframe中的ArrayType列? A:数组(nullable=true)
我试图在我的数据集上运行PySpark中的FPGrowth算法。 我得到以下错误: 我的数据帧df格式如下: 如果“名称”列中的数据形式为: 如何在这个形式中从StringType转换为ArrayType 我从我的RDD形成了Dataframe: