当前位置: 首页 > 知识库问答 >
问题:

Pyspark:从Struct中识别arrayType列,并调用udf将数组转换为字符串

邹嘉荣
2023-03-14

我正在创建一个加速器,将数据从源迁移到目标。例如,我将从API中选择数据,并将数据迁移到csv。在将数据转换为csv时,我遇到了处理arraytype的问题。我使用了withColumn和concat_ws方法(即df1=df.WithColum('薄膜',F.concat_ ws(':',F.col('薄膜“))薄膜是arraytype列)进行转换,并且它起了作用。现在我希望这是动态发生的。我的意思是,在不指定列名的情况下,是否有一种方法可以从具有arraytype的结构中选择列名,然后调用udf?

感谢您的宝贵时间!

共有1个答案

督灿
2023-03-14

您可以使用df.schema获取列的类型。根据列的类型,您可以应用concat_ws,也可以不应用:

data = [["test1", "test2", [1,2,3], ["a","b","c"]]]
schema= ["col1", "col2", "arr1", "arr2"]
df = spark.createDataFrame(data, schema)

array_cols = [F.concat_ws(":", c.name).alias(c.name) \
    for c in df.schema if isinstance(c.dataType, T.ArrayType) ]
other_cols = [F.col(c.name) \
    for c in df.schema if not isinstance(c.dataType, T.ArrayType) ]

df = df.select(other_cols + array_cols)

结果:

+-----+-----+-----+-----+
| col1| col2| arr1| arr2|
+-----+-----+-----+-----+
|test1|test2|1:2:3|a:b:c|
+-----+-----+-----+-----+
 类似资料:
  • 我试图在我的数据集上运行PySpark中的FPGrowth算法。 我得到以下错误: 我的数据帧df格式如下: 如果“名称”列中的数据形式为: 如何在这个形式中从StringType转换为ArrayType 我从我的RDD形成了Dataframe:

  • 问题内容: 我有一个带有2 ArrayType字段的PySpark DataFrame: 我想将它们合并为一个ArrayType字段: 适用于字符串的语法在这里似乎不起作用: 谢谢! 问题答案: 火花 > = 2.4 您可以使用功能(SPARK-23736): 要保留其中一个值时的数据,可以使用: 火花 <2.4 不幸的是,一般情况下要串联列,您将需要一个UDF,例如: 可以用作:

  • 问题内容: 我有以下代码,我试图通过测试,但似乎无法理解Java世界中各种编码形式。 我想我的问题是:将任意字节的字节数组转换为Java字符串,然后将同一Java String转换为另一个字节数组的正确方法是什么,该字节数组将具有与原始字节相同的长度和相同的内容数组? 问题答案: 尝试特定的编码: ideone链接

  • 问题内容: 如何将已作为字符串读取的列转换为数组列?即从下面的模式转换 至: 如果可能,请同时共享scala和python实现。在相关说明中,从文件本身读取时如何处理它?我有约450列的数据,我想以这种格式指定的列很少。目前,我正在pyspark中阅读以下内容: 谢谢。 问题答案: 有各种各样的方法, 最好的方法是使用函数并强制转换为 您也可以创建简单的udf来转换值 希望这可以帮助!

  • 问题内容: 我想得到一个表示像这样的json的字符串: {“投票”:{“ option_A”:“ 3”}} 并在其中包含一个“计数”键, 这样结束: {“投票”:{“ option_A”:“ 3”},“ count”:“ 1”} 这就是为什么我计划将其转换为json以便可以添加计数然后再次将其设置为字符串的原因。 问题是我不知道该json的结构 ,所以我无法使用,因为该结构有所不同。我怎样才能做到

  • 是否可以将StringType列强制转换为spark dataframe中的ArrayType列? A:数组(nullable=true)