当前位置: 首页 > 知识库问答 >
问题:

使用pyspark查找两个数据帧上每个对应列的值之差

南宫嘉
2023-03-14

我想找出使用内部联接联接时两个数据帧的列值之间的差异。

df1有10列,即。key 1, key 2

df3=df1.join(df2,'df1.key1==df2.key1和df1.key2==df2.key2','内')

现在我想比较连接的df3中已经存在的两个数据帧df1和df2的对应列。

现在我对zip(df1.columns,df2.columns)中的每个x,y进行循环,并存储在list < code > un match list . append((df3 . select(df1 . x,df2.y))中。过滤器(df1.x

我能避免这个循环吗,因为它在这里大量使用内存。我还在做其他计算,但这只是我展示的一小段代码。这背后的想法是找出对应列中的不同值,以匹配两个数据帧的行。exceptAll不适用于此要求,因为它根据列的位置来查找差异。只有当两个数据帧的键匹配时,我才需要找出不同之处。

数据流1

key1 key2 col1 col2 col3 col4 col5

k11  k21   1    1    1    1    1

k12  k22   2    2    2    2    2

DF2

key1 key2 col1 col2 col3 col4 col5

k11  k21   1    1    2    1    1

k12  k22   2    3    2    3    4

我想要的最终输出是

key1 key2 col  val1 val2

k11  k21  col3 1    2

k12  k22  col2 2    3

k12  k22  col4 2    3

k12  k22  col5 2    4

val1是从df1获取,val2是从df2获取

共有1个答案

凌蕴藉
2023-03-14

这里的问题是,如果DataFrame中的列数很高,那么循环的性能就会下降。它进一步导致输出内存结果。

我们可以使用数据帧,并将每次迭代的结果存储(附加或插入)到某个hdfs位置或hive表中,而不是将结果存储在列表中。

for x,y in zip(df1.columns,df2.columns)
    outputDF=joinedDF.filter(col(x) <> col(y))
                 .withColumns('key1',lit(key1))
                 .withColumns('key2',lit(key2))
                 .withColumns('col',lit(x))
                 .withColumns('val1',col(x))
                 .withColumns('val2',col(y))

    outputDF.partitionBy(x).coalesce(1).write.mode('append').format('hive').saveAsTable('DB.Table')````

#Another approach can be if no of columns are less (10-15):#
    outputDF=outputDF.union(joinedDF.filter(col(x) <> col(y))
                 .withColumns('key1',lit(key1))
                 .withColumns('key2',lit(key2))
                 .withColumns('col',lit(x))
                 .withColumns('val1',col(x))
                 .withColumns('val2',col(y)))

outputDF.partitionBy(x).coalesce(1).write.mode('append').format('hive').saveAsTable('DB.Table')


 类似资料:
  • 我在dataframe中总共有100列。我试图比较两个数据帧,并找到列名不匹配的记录。我得到了以下代码的输出,但当我运行100列的代码时,作业被中止。 我正在为SCD类型2增量进程错误查找执行此操作。 请建议任何其他方式。

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 我有两个数据帧df1和df2,其中df2是df1的子集。我如何获得一个新的数据帧(df3),它是两个数据帧之间的差值? 换句话说,一个数据帧,它包含了df1中所有的行/列,而不是DF2中的行/列?

  • 我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘

  • 我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。

  • 我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。 假设DF1是以下格式, DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键) 我需要合并两个数据框,以便增加现有项目计数并插入新项目。 结果应该是这样的: 我有一种方法可以做到这一点,但不确定这种方法是否有效或正确