当前位置: 首页 > 面试题库 >

如何使用来自另一个数据框的新值更新pyspark数据框?

元嘉木
2023-03-14
问题内容

我有两个Spark数据框:

数据框A:

|col_1 | col_2 | ... | col_n |
|val_1 | val_2 | ... | val_n |

和数据框B:

|col_1 | col_2 | ... | col_m |
|val_1 | val_2 | ... | val_m |

数据框B可以包含来自数据框A的重复行,更新行和新行。我想在spark中编写操作,在其中可以创建一个新数据框,其中包含数据框A的行以及数据框B的更新行和新行。

我首先创建一个仅包含不可更新列的哈希列。这是唯一的ID。所以我们可以说col1,并col2可以改变值(可更新),但是col3,..,coln是唯一的。我创建了一个哈希函数为hash(col3,..,coln)

A=A.withColumn("hash", hash(*[col(colname) for colname in unique_cols_A]))
B=B.withColumn("hash", hash(*[col(colname) for colname in unique_cols_B]))

现在,我想编写一些火花代码,基本上从B中选择哈希值不在A中的 行(因此,新行和更新后的行) ,并将它们与A中的行一起加入新的数据帧中。
pyspark?

编辑:数据框B可以有来自数据框A的额外列,因此无法进行联合。

样例

数据框A:

+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|  www|
|    b|  eee|
|    c|  rrr|
+-----+-----+

数据框B:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    d|  yyy|    2|
|    c|  rer|    3|
+-----+-----+-----+

结果:数据框C:

+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    a|  wew|    1|
|    b|  eee| null|
|    c|  rer|    3|
|    d|  yyy|    2|
+-----+-----+-----+

问题答案:

这与用新值更新数据框列密切相关,除了您还想添加数据框B中的行。一种方法是首先执行链接的问题中概述的操作,然后将结果与数据框B合并并删除重复。

例如:

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        'col_1',
        f.when(
            ~f.isnull(f.col('b.col_2')),
            f.col('b.col_2')
        ).otherwise(f.col('a.col_2')).alias('col_2'),
        'b.col_3'
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()
#+-----+-----+-----+
#|col_1|col_2|col_3|
#+-----+-----+-----+
#|    a|  wew|    1|
#|    b|  eee| null|
#|    c|  rer|    3|
#|    d|  yyy|    2|
#+-----+-----+-----+

如果您有很多要替换的列并且不想对它们全部进行硬编码,则可以更一般地使用列表推导:

cols_to_update = ['col_2']

dfA.alias('a').join(dfB.alias('b'), on=['col_1'], how='left')\
    .select(
        *[
            ['col_1'] + 
            [
                f.when(
                    ~f.isnull(f.col('b.{}'.format(c))),
                    f.col('b.{}'.format(c))
                ).otherwise(f.col('a.{}'.format(c))).alias(c)
                for c in cols_to_update
            ] + 
            ['b.col_3']
        ]
    )\
    .union(dfB)\
    .dropDuplicates()\
    .sort('col_1')\
    .show()


 类似资料:
  • 如何使用Pandas更新/组合/合并数据帧(df1)和来自另一个数据帧(df2)的值,其中df1有一个新列(col3)和来自df2的值。可乐2?换句话说,df1是当前月份的值,我希望df1也有一个来自df2的列,它是上个月的值。 任何关于这方面的见解都是值得赞赏的;非常感谢你。 DF1: DF2: 所需df:

  • 问题内容: 是否可以在带有子选择的mysql 5.0上运行UPDATE命令。 我要运行的命令是这样的: ISBN13当前存储为字符串。 这应该更新10k +行。 谢谢, 威廉 问题答案: 只需更改一下即可:

  • 我有两个数据帧。我需要用第二列中的平均值更新第一列中的一列,并按索引分组。这里是示例df1(col1是索引) df2(col1是索引) 我需要df2的col2(a=2,d=3)的平均值,并且只更新col3=X的行的df1 我试过这个 只有在我不使用loc的情况下,它才有效。 我试图得到的结果是df1(col1是索引)

  • 问题内容: 我有两个数据框,第一个有1000行,看起来像: 该列具有不同的值,有时会重复,但通常大约有50个唯一值。 第二个数据框包含所有这50个唯一值(50行)以及与这些值关联的酒店: 我的目标是用第二个数据帧的列的相应值替换第一个数据帧的列中的值,或者用相应的值创建该列。当我尝试通过像 我有一个错误,即数据帧的大小不相等,因此无法进行比较 问题答案: 如果将索引设置为另一个df上的“组”列,则

  • 问题内容: 我遇到问题,找到了解决方案,但我觉得这是错误的方法。也许,有一种更“规范”的方式来做到这一点。 问题 我有两个要合并的数据框,而没有多余的列,也没有擦除现有的信息。范例: 现有数据框(df) 要合并的数据框(df2) 我想更新与是否列“A”和“A2”相对应。结果将是(: 这是我的解决方案,但我认为这不是一个很好的解决方案。 有谁有更好的方法吗?谢谢 ! 问题答案: 是的,无需合并即可完

  • 问题内容: 我有两个数据库,我想用另一个数据库表中的值更新一个表。我正在使用以下查询,但它不起作用。 我也尝试了以下查询,但它也不起作用: 问题答案: 更新1 根据您的评论,应成为联接的一部分。这是正确的: 您甚至可以添加来简化语句,