当前位置: 首页 > 知识库问答 >
问题:

Spark:遍历每行中的列以创建新的数据表

梁丘琛
2023-03-14

假设我有这样一个数据frame:

+-----------+-----------+-----------+-----------+------------+--+
|   ColA    |   ColB    |   ColC    |   ColD    |    ColE    |  |
+-----------+-----------+-----------+-----------+------------+--+
| ''        | sample_1x | sample_1y | ''        | sample_1z  |  |
| sample2_x | sample2_y | ''        | ''        | ''         |  |
| sample3_x | ''        | ''        | ''        | sample3_y  |  |
| sample4_x | sample4_y | ''        | sample4_z | sample4_zz |  |
| sample5_x | ''        | ''        | ''        | ''         |  |
+-----------+-----------+-----------+-----------+------------+--+

我希望创建另一个dataframe,在每行中从左到右显示关系,同时跳过具有空值的列。此外,只有1个有效列记录的行将被排除。例如:

+-----------+------------+-----------+
|   From    |     To     |   Label   |
+-----------+------------+-----------+
| sample1_x | sample1_y  | ColB_ColC |
| sample1_y | sample1_z  | ColC_ColE |
| sample2_x | sample2_y  | ColA_ColB |
| sample3_x | sample3_y  | ColA_ColE |
| sample4_x | sample4_y  | ColA_ColB |
| sample4_y | sample4_z  | ColB_ColD |
| sample4_z | sample4_zz | ColD_ColE |
+-----------+------------+-----------+

我认为方法应该是编写一个包含此逻辑的UDF,但我不完全确定如何返回一个全新的DF,因为我习惯了UDF只是在同一个DF中创建另一列。或者是否有另一个spark函数可以比创建UDF更容易地处理这种情况?如果有关系,就使用pyspark。

共有1个答案

韦晟睿
2023-03-14

您可以使用接受数组参数并返回结构数组的udf,例如:

from pyspark.sql import functions as F

df.show()
+---------+---------+---------+---------+----------+
|     ColA|     ColB|     ColC|     ColD|      ColE|
+---------+---------+---------+---------+----------+
|     null|sample_1x|sample_1y|     null| sample_1z|
|sample2_x|sample2_y|     null|     null|      null|
|sample3_x|     null|     null|     null| sample3_y|
|sample4_x|sample4_y|     null|sample4_z|sample4_zz|
|sample5_x|     null|     null|     null|      null|
+---------+---------+---------+---------+----------+

# columns that get involved, will group them into an array using F.array(cols)
cols = df.columns

# defind function to convert array into array of structs
def find_route(arr, cols):
    d = [ (cols[i],e) for i,e in enumerate(arr) if e is not None ]
    return [ {'From':d[i][1], 'To':d[i+1][1], 'Label':d[i][0]+'_'+d[i+1][0]} for i in range(len(d)-1) ]

# set up the UDF and add cols as an extra argument
udf_find_route = F.udf(lambda a: find_route(a, cols), 'array<struct<From:string,To:string,Label:string>>')

# retrive the data from the array of structs after array-explode
df.select(F.explode(udf_find_route(F.array(cols))).alias('c1')).select('c1.*').show()
+---------+----------+---------+
|     From|        To|    Label|
+---------+----------+---------+
|sample_1x| sample_1y|ColB_ColC|
|sample_1y| sample_1z|ColC_ColE|
|sample2_x| sample2_y|ColA_ColB|
|sample3_x| sample3_y|ColA_ColE|
|sample4_x| sample4_y|ColA_ColB|
|sample4_y| sample4_z|ColB_ColD|
|sample4_z|sample4_zz|ColD_ColE|
+---------+----------+---------+
 类似资料:
  • 本文向大家介绍Yii遍历行下每列数据的方法,包括了Yii遍历行下每列数据的方法的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Yii遍历行下每列数据的方法。分享给大家供大家参考,具体如下: 效果图如下: 控制器(1种): (2种): 视图层: 更多关于Yii相关内容感兴趣的读者可查看本站专题:《Yii框架入门及常用技巧总结》、《php优秀开发框架总结》、《smarty模板入门基础教程》、《

  • 我正在尝试查看我们是否可以使用 spark/scala 从 dataFrame 中某个列中的值创建新列。我有一个数据帧,其中包含以下数据 在上面的数据中,col1/col2/col3是列名,后跟它的值。列名和值由< code >,分隔。每组由< code>|分隔。 现在,我想做到这一点 感谢任何帮助。

  • 我有如下数据集: 这是我们数据库中每个帐户的备忘录集合。“1abc”中的1表示帐户ID,字母表示一些文本。总共有177列和1866行,但并非每行都有177列之前的值,有些行可能只有两列数据。 我需要每一列下拉到A列的底部,以便所有数据只占用A列。例如,B列中的所有数据都将插入A列中的最后一个值之后。C列中的数据将插入A列中的最后一个值之后,它已填充了来自B的数据,依此类推。 最后,它应该是这样的(

  • 问题内容: 我有一个熊猫DataFrame,其中包含多个列: 我想基于对数据框的每一行应用一个函数为此数据框创建两个新列。我不想多次调用该函数(例如,通过执行两次单独的调用),因为它占用大量计算资源。我尝试通过两种方式来执行此操作,但它们都不起作用: 使用: 我编写了一个函数,该函数接受a并返回我想要的值的元组: 尝试将此应用于DataFrame会出现错误: 然后,我将使用此问题中显示的方法将从返

  • 问题内容: 所以我有2张桌子,和。 具有列,具有呼叫者的ID和进行呼叫的时间。然后,我有另一台已,,,,,有一个人,他们的位置的ID ,而且它们在该位置的时间。 我想写一个看起来像这样的查询: 基本上,这是在寻找给定值的最接近值。之后,最终,我想根据数据查找呼叫的位置。 我该怎么做?我知道有一种基于集合的方法,但是我不想那样做。我调查了一下,但是我感到在那方面的表现很糟糕……所以循环中是否有这样做

  • 问题内容: 如何进行for循环或列表理解,以便每次迭代都给我两个元素? 输出: 问题答案: 你需要一个实施。 对于Python 2: 或更笼统地说: 在Python 3中,你可以替换为内置函数,然后删除import。 所有信贷蒂诺对他的回答到我的问题,我发现这是非常有效的,因为它只是在列表上循环一次,并在此过程中不会产生任何不必要的名单。 注意:不要将其与Python自己的文档中的pairwise