我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。
假设DF1是以下格式,
Item Id | item | count
---------------------------
1 | item 1 | 2
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 5
DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键)
Item Id | item | count
---------------------------
1 | item 1 | 2
3 | item 4 | 2
4 | item 4 | 4
5 | item 5 | 2
我需要合并两个数据框,以便增加现有项目计数并插入新项目。
结果应该是这样的:
Item Id | item | count
---------------------------
1 | item 1 | 4
2 | item 2 | 3
1 | item 3 | 2
3 | item 4 | 7
4 | item 4 | 4
5 | item 5 | 2
我有一种方法可以做到这一点,但不确定这种方法是否有效或正确
temp1 = df1.join(temp,['item_id','item'],'full_outer') \
.na.fill(0)
temp1\
.groupby("item_id", "item")\
.agg(F.sum(temp1["count"] + temp1["newcount"]))\
.show()
@wandermonk的解决方案是推荐的,因为它不使用连接。尽可能避免连接,因为这会触发洗牌(也称为广泛转换,并导致网络上的数据搬迁,这既昂贵又缓慢)
您还必须查看数据大小(两个表都是大的或一个小的,一个大的等等),因此您可以调整它的性能方面。
我尝试通过使用SparkSQL的解决方案显示组,因为它们做同样的事情,但更容易理解和操作。
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]
my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)
df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")
df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)
现在,如果您查看SparkUI,您可以看到如此小的数据集,随机操作和阶段#。
我也推荐看看SQL方案,了解一下成本,交易所在这里代表洗牌。
== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
+- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
+- Union
:- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
+- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]
有几种方法可以做到这一点。
根据你所描述的,最直接的解决方案是使用RDD - SparkContext.union
:
rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)
union_rdd = sc.union([rdd1, rdd2])
另一种解决方案是使用< code>pyspark.sql中的< code>DataFrame.union
注意:我之前建议过unionAll
,但在Spark 2.0中已弃用
因为两个数据帧的模式是相同的,所以您可以执行< code>union,然后执行< code>groupby id和< code>aggregate计数。
step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));
我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。
问题内容: 我正在使用下面的代码合并两个csv(数据帧): 我有以下CSV文件 文件1: 文件2: 合并后 如果您注意到student_id的开头附加了0,应该将其视为文本,但是在合并并使用函数后,它将其转换为数字并删除了前导0。 即使在to_csv之后,如何将列保持为“文本”? 我认为它的to_csv函数可以再次保存为数字添加了dtype = {‘student_id’:str}。 问题答案:
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
在索引上合并是不好的做法吗?不可能吗?如果是,如何将索引转换为名为“index”的新列?
问题内容: 我在加入熊猫方面遇到问题,并且试图找出问题所在。假设我有一个x: 我应该能够通过简单的连接命令在y = x上将y与索引上的y联接,除了同名具有+2。 我希望决赛对双方都有1941个非值。我也尝试过合并,但是我有同样的问题。 我以为正确的答案是pandas.concat([x,y]),但这也不符合我的预期。 编辑:如果您在加入方面遇到问题,请阅读下面的韦斯答案。我有一个重复的时间戳。 问
我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘