当前位置: 首页 > 知识库问答 >
问题:

如何联接两个DataFrame并更改缺少值的列?

何乐
2023-03-14
val df1 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds1"),
   ("a1",20,"ACTIVE","ds1"),
   ("a2",50,"ACTIVE","ds1"),
   ("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`

val df2 = sc.parallelize(Seq(
   ("a1",10,"ACTIVE","ds2"),
   ("a1",20,"ACTIVE","ds2"),
   ("a1",30,"ACTIVE","ds2"),
   ("a1",40,"ACTIVE","ds2"),
   ("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`


df1.show()

// +---+---+------+---+
// | c1| c2|    c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+

df2.show()
// +---+---+------+---+
// | c1| c2|    c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+

我的要求是:我需要连接两个数据帧。我的输出数据帧应该包含来自df1的所有记录,以及来自df2的不在df1中的记录,只用于匹配“c1”。我从df2中提取的记录应该在“c3”列更新为“非活动”。

在此示例中,只有匹配的“c1”值是 a1。所以我需要从 df2 中提取 c2=30 和 40 条记录并使它们处于非活动状态。

这是输出。

df_output.show()

// +---+---+--------+---+
// | c1| c2|    c3  | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE  |ds1|
// | a1| 20|ACTIVE  |ds1|
// | a2| 50|ACTIVE  |ds1|
// | a3| 60|ACTIVE  |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+

有人能帮我做这个吗?

共有3个答案

习斌
2023-03-14

享受挑战,这是我的解决方案

val c1keys = df1.select("c1").distinct
val df2_in_df1 = df2.join(c1keys, Seq("c1"), "inner")
val df2inactive = df2_in_df1.join(df1, Seq("c1", "c2"), "leftanti").withColumn("c3", lit("INACTIVE"))
scala> df1.union(df2inactive).show
+---+---+--------+---+
| c1| c2|      c3| c4|
+---+---+--------+---+
| a1| 10|  ACTIVE|ds1|
| a1| 20|  ACTIVE|ds1|
| a2| 50|  ACTIVE|ds1|
| a3| 60|  ACTIVE|ds1|
| a1| 30|INACTIVE|ds2|
| a1| 40|INACTIVE|ds2|
+---+---+--------+---+
杨星纬
2023-03-14

这是肮脏的解决方案 -

from pyspark.sql import functions as F


# find the rows from df2 that have matching key c1 in df2
df3 = df1.join(df2,df1.c1==df2.c1)\
.select(df2.c1,df2.c2,df2.c3,df2.c5.alias('c4'))\
.dropDuplicates()

df3.show()

:

+---+---+------+---+
| c1| c2|    c3| c4|
+---+---+------+---+
| a1| 10|ACTIVE|ds2|
| a1| 20|ACTIVE|ds2|
| a1| 30|ACTIVE|ds2|
| a1| 40|ACTIVE|ds2|
+---+---+------+---+

:

# Union df3 with df1 and change columns c3 and c4 if c4 value is 'ds2'

df1.union(df3).dropDuplicates(['c1','c2'])\
.select('c1','c2',\
        F.when(df1.c4=='ds2','INACTIVE').otherwise('ACTIVE').alias('c3'),
        F.when(df1.c4=='ds2','ds1').otherwise('ds1').alias('c4')
       )\
.orderBy('c1','c2')\
.show()

:

+---+---+--------+---+
| c1| c2|      c3| c4|
+---+---+--------+---+
| a1| 10|  ACTIVE|ds1|
| a1| 20|  ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
| a2| 50|  ACTIVE|ds1|
| a3| 60|  ACTIVE|ds1|
+---+---+--------+---+
何兴邦
2023-03-14

首先,一件小事。我对df2中的列使用不同的名称:

val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")

没什么大不了的,但这让我更容易推理。

现在是有趣的事情。为了清楚起见,我会说得详细一点:

val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates

在这里,我做了以下工作:

    < li >在< code>c1和< code>d1列上的< code>df1和< code>df2之间的内部联接 < li >选择< code>df2列,只需在最后一列中“硬编码”< code>ds1即可替换< code>ds2 < li >删除重复项

这基本上只是过滤掉在< code>df1中的< code>c1中没有对应键的< code>df2中的所有内容。

接下来我不同了:

val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")

这是一个基本的集合操作,查找< code>join中不在< code>df1中的所有内容。这些是要停用的项目,所以我选择所有的列,但是用硬编码的< code>INACTIVE值替换第三列。

剩下的就是把它们放在一起:

df1.union(diff)

这只是将< code>df1与我们之前计算的停用值表相结合,以产生最终结果:

+---+---+--------+---+
| c1| c2|      c3| c4|
+---+---+--------+---+
| a1| 10|  ACTIVE|ds1|
| a1| 20|  ACTIVE|ds1|
| a2| 50|  ACTIVE|ds1|
| a3| 60|  ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+

同样,您不需要所有这些中间值。我只是啰嗦地帮助追踪整个过程。

 类似资料:
  • 本文向大家介绍如何更改 pandas dataframe 中两列的位置,包括了如何更改 pandas dataframe 中两列的位置的使用技巧和注意事项,需要的朋友参考一下 如何更改 pandas dataframe 中两列的位置: 把其中的某列移到第一列的位置。 原来的 df 是: 要将 Mid 这一列移动到第一列? 解决办法:(使用 ix ) 法一: 法二: 最终的处理结果: 以上这篇如何更

  • 问题内容: 尝试更改列的数据类型并设置新的默认值时遇到以下错误: 错误1064(42000):您的SQL语法有错误;检查与您的MySQL服务器版本相对应的手册,以在第1行的’VARCHAR(255)NOT NULL SET DEFAULT’{}’‘附近使用正确的语法 问题答案: 同样的第二种可能性(感谢juergen_d):

  • 在尝试更改列的数据类型并设置新的默认值时,我遇到以下错误: 错误1064(42000):您的SQL语法中有错误;查看与您的MySQL server版本相对应的手册,以了解第1行“varchar(255)NOT NULL SET DEFAULT”{}“附近使用的正确语法

  • 我正在使用齐柏林飞艇0.6.2和火花2.0。 我尝试在循环中执行查询,但效果不是很好。 我需要循环一个数据帧的每一行,大约5000行,并执行一个查询,这将在另一个数据帧中增加一个值。 以下是我的尝试: 我试着从两个数据帧中提取一小部分,但仍然很慢。我觉得我做得不对。 知道如何快速更新数据帧吗?

  • 我有一堆CSV文件,它们是作为数据流读取的。对于每个dataframe,我希望更改一些列名,如果某个dataframe中存在特定列: column_name_update_map={'aa':'xx';'bb':'yy'}

  • 我有以下(): 我按分配添加更多列: 如何将列移到前面,即将其设置为第一列,其他列的顺序保持不变?