当前位置: 首页 > 知识库问答 >
问题:

Pyspark数据帧:找出两个数据帧(值和列名)之间的差异

左宁
2023-03-14

我在dataframe中总共有100列。我试图比较两个数据帧,并找到列名不匹配的记录。我得到了以下代码的输出,但当我运行100列的代码时,作业被中止。

我正在为SCD类型2增量进程错误查找执行此操作。

from pyspark.sql.types import *
from pyspark.sql.functions import *

d2 = sc.parallelize([("A1", 500,1005) ,("A2", 700,10007)])
dataFrame1 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

d2 = sc.parallelize([("A1", 600,1005),("A2", 700,10007)])
dataFrame2 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])

key_id_col_name="ID"
key_id_value="A1"
dataFrame1.select("ID","VALUE1").subtract(dataFrame2.select("ID",col("VALUE1").alias("value"))).show()

def unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value):
    chk_fst=True
    dataFrame1 = dataFrame1.where(dataFrame1[key_id_col_name] == key_id_value)
    dataFrame2 = dataFrame2.where(dataFrame2[key_id_col_name] == key_id_value)
    col_names = list(set(dataFrame1.columns).intersection(dataFrame2.columns))
    col_names.remove(key_id_col_name)
    for col_name in col_names:
        if chk_fst == True:
            df_tmp = dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name))
            chk_fst = False
        else:
            df_tmp = df_tmp.unionAll(dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name)))
    return df_tmp

res_df = unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value)

res_df.show() 

   >>> dataFrame1.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   500|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> dataFrame2.show()
    +---+------+------+
    | ID|VALUE1|VALUE2|
    +---+------+------+
    | A1|   600|  1005|
    | A2|   700| 10007|
    +---+------+------+

    >>> res_df.show()
    +------+-----+--------+
    |KEY_ID|VALUE|COL_NAME|
    +------+-----+--------+
    |    A1|  500|  VALUE1|
    +------+-----+--------+

请建议任何其他方式。

共有1个答案

武卓
2023-03-14

这里有另一种方法:

  • 使用 ID 列联接两个数据帧。
  • 然后,为每一行创建一个新列,其中包含存在差异的列。
    • 使用 pyspark.sql.函数创建此新列作为键值对映射。1
    • 映射的键将是列名。
    • 使用 pyspark.sql.functions.when(),如果两个数据帧之间存在差异,请将该值设置为 dataFrame1 中的相应值(因为从示例中可以看出,这似乎是您想要的)。否则,我们将该值设置为 None

    示例:

    import pyspark.sql.functions as f
    columns = [c for c in dataFrame1.columns if c != 'ID']
    dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
        .withColumn(
            'diffs',
            f.create_map(
                *reduce(
                    list.__add__,
                    [
                        [
                            f.lit(c),
                            f.when(
                                f.col('r.'+c) != f.col('l.'+c),
                                f.col('r.'+c)
                            ).otherwise(None)
                        ] 
                     for c in columns
                    ]
                )
            )
        )\
        .select([f.col('ID'), f.explode('diffs')])\
        .where(~f.isnull(f.col('value')))\
        .select(
            f.col('ID').alias('KEY_ID'),
            f.col('value').alias('VALUE'),
            f.col('key').alias('COL_NAME')
        )\
        .show(truncate=False)
    #+------+-----+--------+
    #|KEY_ID|VALUE|COL_NAME|
    #+------+-----+--------+
    #|A1    |500  |VALUE1  |
    #+------+-----+--------+
    

    笔记

    1 语法 *reduce(list.__add__, [[f.lit(c), ...] 表示列中的 c]) 作为create_map() 的参数是一些有助于动态创建映射的 python-fu。

    create_map()需要偶数个参数-它假设每对中的第一个参数是键,第二个参数是值。为了将参数按顺序排列,列表理解为每个迭代生成一个列表。我们使用<code>list.将列表列表列表缩减为平面列表。__add__。

    最后,*操作符用于解包列表。

    下面是中间输出,这可能会使逻辑更清晰:

    dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
        .withColumn(
            'diffs',
            f.create_map(
                *reduce(
                    list.__add__,
                    [
                        [
                            f.lit(c),
                            f.when(
                                f.col('r.'+c) != f.col('l.'+c),
                                f.col('r.'+c)
                            ).otherwise(None)
                         ] 
                         for c in columns
                    ]
                )
            )
        )\
        .select('ID', 'diffs').show(truncate=False)
    #+---+-----------------------------------+
    #|ID |diffs                              |
    #+---+-----------------------------------+
    #|A2 |Map(VALUE1 -> null, VALUE2 -> null)|
    #|A1 |Map(VALUE1 -> 500, VALUE2 -> null) |
    #+---+-----------------------------------+
    

 类似资料:
  • 我想找出使用内部联接联接时两个数据帧的列值之间的差异。 df1有10列,即。key 1, key 2 现在我想比较连接的df3中已经存在的两个数据帧df1和df2的对应列。 现在我对zip(df1.columns,df2.columns)中的每个x,y进行循环,并存储在list < code > un match list . append((df3 . select(df1 . x,df2.y)

  • 我有两个数据帧df1和df2,其中df2是df1的子集。我如何获得一个新的数据帧(df3),它是两个数据帧之间的差值? 换句话说,一个数据帧,它包含了df1中所有的行/列,而不是DF2中的行/列?

  • 我需要按行比较两个不同大小的数据帧,并打印出不匹配的行。让我们看以下两个例子: 在df2上按行打印并打印出不在df1中的行的最有效方法是什么。 重要提示:我不希望有行: 包括在差异中: 我已经尝试过了:逐行比较两个不同长度的数据帧,为每行添加相等值的列,比较两个数据帧,并排输出它们的差异 但是这些和我的问题不匹配。

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 基本上,它应该在步骤中找到指标为43且步骤=1的行,然后将该值放在新列中,在这种情况下,它将是“Gross value Added”。任何帮助都将非常感谢!

  • 我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘