我在dataframe中总共有100列。我试图比较两个数据帧,并找到列名不匹配的记录。我得到了以下代码的输出,但当我运行100列的代码时,作业被中止。
我正在为SCD类型2增量进程错误查找执行此操作。
from pyspark.sql.types import *
from pyspark.sql.functions import *
d2 = sc.parallelize([("A1", 500,1005) ,("A2", 700,10007)])
dataFrame1 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])
d2 = sc.parallelize([("A1", 600,1005),("A2", 700,10007)])
dataFrame2 = sqlContext.createDataFrame(d2, ["ID", "VALUE1", "VALUE2"])
key_id_col_name="ID"
key_id_value="A1"
dataFrame1.select("ID","VALUE1").subtract(dataFrame2.select("ID",col("VALUE1").alias("value"))).show()
def unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value):
chk_fst=True
dataFrame1 = dataFrame1.where(dataFrame1[key_id_col_name] == key_id_value)
dataFrame2 = dataFrame2.where(dataFrame2[key_id_col_name] == key_id_value)
col_names = list(set(dataFrame1.columns).intersection(dataFrame2.columns))
col_names.remove(key_id_col_name)
for col_name in col_names:
if chk_fst == True:
df_tmp = dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name))
chk_fst = False
else:
df_tmp = df_tmp.unionAll(dataFrame1.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE")).subtract(dataFrame2.select(col(key_id_col_name).alias("KEY_ID"),col(col_name).alias("VALUE"))).withColumn("COL_NAME",lit(col_name)))
return df_tmp
res_df = unequalColumnValuesTwoDF(dataFrame1,dataFrame2,key_id_col_name,key_id_value)
res_df.show()
>>> dataFrame1.show()
+---+------+------+
| ID|VALUE1|VALUE2|
+---+------+------+
| A1| 500| 1005|
| A2| 700| 10007|
+---+------+------+
>>> dataFrame2.show()
+---+------+------+
| ID|VALUE1|VALUE2|
+---+------+------+
| A1| 600| 1005|
| A2| 700| 10007|
+---+------+------+
>>> res_df.show()
+------+-----+--------+
|KEY_ID|VALUE|COL_NAME|
+------+-----+--------+
| A1| 500| VALUE1|
+------+-----+--------+
请建议任何其他方式。
这里有另一种方法:
ID
列联接两个数据帧。 pyspark
.sql.函数创建此新列作为键值对映射。1pyspark.sql.functions.when()
,如果两个数据帧之间存在差异,请将该值设置为 dataFrame1
中的相应值(因为从示例中可以看出,这似乎是您想要的)。否则,我们将该值设置为 None
。示例:
import pyspark.sql.functions as f
columns = [c for c in dataFrame1.columns if c != 'ID']
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
.withColumn(
'diffs',
f.create_map(
*reduce(
list.__add__,
[
[
f.lit(c),
f.when(
f.col('r.'+c) != f.col('l.'+c),
f.col('r.'+c)
).otherwise(None)
]
for c in columns
]
)
)
)\
.select([f.col('ID'), f.explode('diffs')])\
.where(~f.isnull(f.col('value')))\
.select(
f.col('ID').alias('KEY_ID'),
f.col('value').alias('VALUE'),
f.col('key').alias('COL_NAME')
)\
.show(truncate=False)
#+------+-----+--------+
#|KEY_ID|VALUE|COL_NAME|
#+------+-----+--------+
#|A1 |500 |VALUE1 |
#+------+-----+--------+
笔记
1 语法 *reduce(list.__add__, [[f.lit(c), ...] 表示列中的 c])
作为create_map() 的
参数是一些有助于动态创建映射的 python-fu。
create_map()
需要偶数个参数-它假设每对中的第一个参数是键,第二个参数是值。为了将参数按顺序排列,列表理解为每个迭代生成一个列表。我们使用<code>list.将列表列表列表缩减为平面列表。__add__。
最后,*
操作符用于解包列表。
下面是中间输出,这可能会使逻辑更清晰:
dataFrame1.alias('r').join(dataFrame2.alias('l'), on='ID')\
.withColumn(
'diffs',
f.create_map(
*reduce(
list.__add__,
[
[
f.lit(c),
f.when(
f.col('r.'+c) != f.col('l.'+c),
f.col('r.'+c)
).otherwise(None)
]
for c in columns
]
)
)
)\
.select('ID', 'diffs').show(truncate=False)
#+---+-----------------------------------+
#|ID |diffs |
#+---+-----------------------------------+
#|A2 |Map(VALUE1 -> null, VALUE2 -> null)|
#|A1 |Map(VALUE1 -> 500, VALUE2 -> null) |
#+---+-----------------------------------+
我想找出使用内部联接联接时两个数据帧的列值之间的差异。 df1有10列,即。key 1, key 2 现在我想比较连接的df3中已经存在的两个数据帧df1和df2的对应列。 现在我对zip(df1.columns,df2.columns)中的每个x,y进行循环,并存储在list < code > un match list . append((df3 . select(df1 . x,df2.y)
我有两个数据帧df1和df2,其中df2是df1的子集。我如何获得一个新的数据帧(df3),它是两个数据帧之间的差值? 换句话说,一个数据帧,它包含了df1中所有的行/列,而不是DF2中的行/列?
我需要按行比较两个不同大小的数据帧,并打印出不匹配的行。让我们看以下两个例子: 在df2上按行打印并打印出不在df1中的行的最有效方法是什么。 重要提示:我不希望有行: 包括在差异中: 我已经尝试过了:逐行比较两个不同长度的数据帧,为每行添加相等值的列,比较两个数据帧,并排输出它们的差异 但是这些和我的问题不匹配。
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
基本上,它应该在步骤中找到指标为43且步骤=1的行,然后将该值放在新列中,在这种情况下,它将是“Gross value Added”。任何帮助都将非常感谢!
我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘