val df1 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds1"),
("a1",20,"ACTIVE","ds1"),
("a2",50,"ACTIVE","ds1"),
("a3",60,"ACTIVE","ds1"))
).toDF("c1","c2","c3","c4")`
val df2 = sc.parallelize(Seq(
("a1",10,"ACTIVE","ds2"),
("a1",20,"ACTIVE","ds2"),
("a1",30,"ACTIVE","ds2"),
("a1",40,"ACTIVE","ds2"),
("a4",20,"ACTIVE","ds2"))
).toDF("c1","c2","c3","c5")`
df1.show()
// +---+---+------+---+
// | c1| c2| c3| c4|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds1|
// | a1| 20|ACTIVE|ds1|
// | a2| 50|ACTIVE|ds1|
// | a3| 60|ACTIVE|ds1|
// +---+---+------+---+
df2.show()
// +---+---+------+---+
// | c1| c2| c3| c5|
// +---+---+------+---+
// | a1| 10|ACTIVE|ds2|
// | a1| 20|ACTIVE|ds2|
// | a1| 30|ACTIVE|ds2|
// | a1| 40|ACTIVE|ds2|
// | a4| 20|ACTIVE|ds2|
// +---+---+------+---+
我的要求是:我需要连接两个数据帧。我的输出数据帧应该包含来自df1的所有记录,以及来自df2的不在df1中的记录,只用于匹配“c1”。我从df2中提取的记录应该在“c3”列更新为“非活动”。
在此示例中,只有匹配的“c1”值是 a1。所以我需要从 df2 中提取 c2=30 和 40 条记录并使它们处于非活动状态。
这是输出。
df_output.show()
// +---+---+--------+---+
// | c1| c2| c3 | c4|
// +---+---+--------+---+
// | a1| 10|ACTIVE |ds1|
// | a1| 20|ACTIVE |ds1|
// | a2| 50|ACTIVE |ds1|
// | a3| 60|ACTIVE |ds1|
// | a1| 30|INACTIVE|ds1|
// | a1| 40|INACTIVE|ds1|
// +---+---+--------+---+
有人能帮我做这个吗?
享受挑战,这是我的解决方案。
val c1keys = df1.select("c1").distinct
val df2_in_df1 = df2.join(c1keys, Seq("c1"), "inner")
val df2inactive = df2_in_df1.join(df1, Seq("c1", "c2"), "leftanti").withColumn("c3", lit("INACTIVE"))
scala> df1.union(df2inactive).show
+---+---+--------+---+
| c1| c2| c3| c4|
+---+---+--------+---+
| a1| 10| ACTIVE|ds1|
| a1| 20| ACTIVE|ds1|
| a2| 50| ACTIVE|ds1|
| a3| 60| ACTIVE|ds1|
| a1| 30|INACTIVE|ds2|
| a1| 40|INACTIVE|ds2|
+---+---+--------+---+
这是肮脏的解决方案 -
from pyspark.sql import functions as F
# find the rows from df2 that have matching key c1 in df2
df3 = df1.join(df2,df1.c1==df2.c1)\
.select(df2.c1,df2.c2,df2.c3,df2.c5.alias('c4'))\
.dropDuplicates()
df3.show()
:
+---+---+------+---+
| c1| c2| c3| c4|
+---+---+------+---+
| a1| 10|ACTIVE|ds2|
| a1| 20|ACTIVE|ds2|
| a1| 30|ACTIVE|ds2|
| a1| 40|ACTIVE|ds2|
+---+---+------+---+
:
# Union df3 with df1 and change columns c3 and c4 if c4 value is 'ds2'
df1.union(df3).dropDuplicates(['c1','c2'])\
.select('c1','c2',\
F.when(df1.c4=='ds2','INACTIVE').otherwise('ACTIVE').alias('c3'),
F.when(df1.c4=='ds2','ds1').otherwise('ds1').alias('c4')
)\
.orderBy('c1','c2')\
.show()
:
+---+---+--------+---+
| c1| c2| c3| c4|
+---+---+--------+---+
| a1| 10| ACTIVE|ds1|
| a1| 20| ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
| a2| 50| ACTIVE|ds1|
| a3| 60| ACTIVE|ds1|
+---+---+--------+---+
首先,一件小事。我对df2
中的列使用不同的名称:
val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4")
没什么大不了的,但这让我更容易推理。
现在是有趣的事情。为了清楚起见,我会说得详细一点:
val join = df1
.join(df2, df1("c1") === df2("d1"), "inner")
.select($"d1", $"d2", $"d3", lit("ds1").as("d4"))
.dropDuplicates
在这里,我做了以下工作:
这基本上只是过滤掉在< code>df1中的< code>c1中没有对应键的< code>df2中的所有内容。
接下来我不同了:
val diff = join
.except(df1)
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4")
这是一个基本的集合操作,查找< code>join中不在< code>df1中的所有内容。这些是要停用的项目,所以我选择所有的列,但是用硬编码的< code>INACTIVE值替换第三列。
剩下的就是把它们放在一起:
df1.union(diff)
这只是将< code>df1与我们之前计算的停用值表相结合,以产生最终结果:
+---+---+--------+---+
| c1| c2| c3| c4|
+---+---+--------+---+
| a1| 10| ACTIVE|ds1|
| a1| 20| ACTIVE|ds1|
| a2| 50| ACTIVE|ds1|
| a3| 60| ACTIVE|ds1|
| a1| 30|INACTIVE|ds1|
| a1| 40|INACTIVE|ds1|
+---+---+--------+---+
同样,您不需要所有这些中间值。我只是啰嗦地帮助追踪整个过程。
本文向大家介绍如何更改 pandas dataframe 中两列的位置,包括了如何更改 pandas dataframe 中两列的位置的使用技巧和注意事项,需要的朋友参考一下 如何更改 pandas dataframe 中两列的位置: 把其中的某列移到第一列的位置。 原来的 df 是: 要将 Mid 这一列移动到第一列? 解决办法:(使用 ix ) 法一: 法二: 最终的处理结果: 以上这篇如何更
问题内容: 尝试更改列的数据类型并设置新的默认值时遇到以下错误: 错误1064(42000):您的SQL语法有错误;检查与您的MySQL服务器版本相对应的手册,以在第1行的’VARCHAR(255)NOT NULL SET DEFAULT’{}’‘附近使用正确的语法 问题答案: 同样的第二种可能性(感谢juergen_d):
在尝试更改列的数据类型并设置新的默认值时,我遇到以下错误: 错误1064(42000):您的SQL语法中有错误;查看与您的MySQL server版本相对应的手册,以了解第1行“varchar(255)NOT NULL SET DEFAULT”{}“附近使用的正确语法
我正在使用齐柏林飞艇0.6.2和火花2.0。 我尝试在循环中执行查询,但效果不是很好。 我需要循环一个数据帧的每一行,大约5000行,并执行一个查询,这将在另一个数据帧中增加一个值。 以下是我的尝试: 我试着从两个数据帧中提取一小部分,但仍然很慢。我觉得我做得不对。 知道如何快速更新数据帧吗?
我有一堆CSV文件,它们是作为数据流读取的。对于每个dataframe,我希望更改一些列名,如果某个dataframe中存在特定列: column_name_update_map={'aa':'xx';'bb':'yy'}
我有以下(): 我按分配添加更多列: 如何将列移到前面,即将其设置为第一列,其他列的顺序保持不变?