当前位置: 首页 > 知识库问答 >
问题:

在PySpark中合并两个数据帧

万德海
2023-03-14

我有两个数据帧,DF1和DF2,DF1是存储来自DF2的任何附加信息的主机。

假设DF1是以下格式,

Item Id | item      | count
---------------------------
1       | item 1    | 2
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 5

DF2包含DF1中已经存在的2个条目和两个新条目。(itemId和item被视为一个组,可以被视为连接的键)

Item Id | item      | count
---------------------------
1       | item 1    | 2
3       | item 4    | 2
4       | item 4    | 4
5       | item 5    | 2

我需要合并两个数据框,以便增加现有项目计数并插入新项目。

结果应该是这样的:

Item Id | item      | count
---------------------------
1       | item 1    | 4
2       | item 2    | 3
1       | item 3    | 2
3       | item 4    | 7
4       | item 4    | 4
5       | item 5    | 2

我有一种方法可以做到这一点,但不确定这种方法是否有效或正确

temp1 = df1.join(temp,['item_id','item'],'full_outer') \
    .na.fill(0)

temp1\
    .groupby("item_id", "item")\
    .agg(F.sum(temp1["count"] + temp1["newcount"]))\
    .show()

共有3个答案

虞展
2023-03-14

@wandermonk的解决方案是推荐的,因为它不使用连接。尽可能避免连接,因为这会触发洗牌(也称为广泛转换,并导致网络上的数据搬迁,这既昂贵又缓慢)

您还必须查看数据大小(两个表都是大的或一个小的,一个大的等等),因此您可以调整它的性能方面。

我尝试通过使用SparkSQL的解决方案显示组,因为它们做同样的事情,但更容易理解和操作。

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

list_1 = [[1,"item 1" , 2],[2 ,"item 2", 3],[1 ,"item 3" ,2],[3 ,"item 4" , 5]]
list_2 = [[1,"item 1",2],[3 ,"item 4",2],[4 ,"item 4",4],[5 ,"item 5",2]]

my_schema = StructType([StructField("Item_ID",IntegerType(), True),StructField("Item_Name",StringType(), True ),StructField("Quantity",IntegerType(), True)])
df1 = spark.createDataFrame(list_1, my_schema)
df2 = spark.createDataFrame(list_2, my_schema)

df1.createOrReplaceTempView("df1")
df1.createOrReplaceTempView("df2")

df3 = df2.union(df1)
df3.createOrReplaceTempView("df3")
df4 = spark.sql("select Item_ID, Item_Name, sum(Quantity) as Quantity from df3 group by Item_ID, Item_Name")
df4.show(10)

现在,如果您查看SparkUI,您可以看到如此小的数据集,随机操作和阶段#。

我也推荐看看SQL方案,了解一下成本,交易所在这里代表洗牌。

== Physical Plan ==
*(2) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, Quantity#32L])
+- Exchange hashpartitioning(Item_ID#6, Item_Name#7, 200)
   +- *(1) HashAggregate(keys=[Item_ID#6, Item_Name#7], functions=[partial_sum(cast(Quantity#8 as bigint))], output=[Item_ID#6, Item_Name#7, sum#38L])
      +- Union
         :- Scan ExistingRDD[Item_ID#6,Item_Name#7,Quantity#8]
         +- Scan ExistingRDD[Item_ID#0,Item_Name#1,Quantity#2]
颜祖鹤
2023-03-14

有几种方法可以做到这一点。

根据你所描述的,最直接的解决方案是使用RDD - SparkContext.union

rdd1 = sc.parallelize(DF1)
rdd2 = sc.parallelize(DF2)

union_rdd = sc.union([rdd1, rdd2])

另一种解决方案是使用< code>pyspark.sql中的< code>DataFrame.union

注意:我之前建议过unionAll,但在Spark 2.0中已弃用

卢皓轩
2023-03-14

因为两个数据帧的模式是相同的,所以您可以执行< code>union,然后执行< code>groupby id和< code>aggregate计数。

step1: df3 = df1.union(df2);
step2: df3.groupBy("Item Id", "item").agg(sum("count").as("count"));
 类似资料:
  • 我有两个数据帧,我需要连接一列,如果id包含在第二个数据帧的同一列中,则只从第一个数据帧中获取行: df1: 断续器: 期望输出: 我已经用df1.join(df2("id ")," left ")试过了,但是给我错误:“Dataframe”对象是不可调用的。

  • 问题内容: 我正在使用下面的代码合并两个csv(数据帧): 我有以下CSV文件 文件1: 文件2: 合并后 如果您注意到student_id的开头附加了0,应该将其视为文本,但是在合并并使用函数后,它将其转换为数字并删除了前导0。 即使在to_csv之后,如何将列保持为“文本”? 我认为它的to_csv函数可以再次保存为数字添加了dtype = {‘student_id’:str}。 问题答案:

  • 假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:

  • 在索引上合并是不好的做法吗?不可能吗?如果是,如何将索引转换为名为“index”的新列?

  • 问题内容: 我在加入熊猫方面遇到问题,并且试图找出问题所在。假设我有一个x: 我应该能够通过简单的连接命令在y = x上将y与索引上的y联接,除了同名具有+2。 我希望决赛对双方都有1941个非值。我也尝试过合并,但是我有同样的问题。 我以为正确的答案是pandas.concat([x,y]),但这也不符合我的预期。 编辑:如果您在加入方面遇到问题,请阅读下面的韦斯答案。我有一个重复的时间戳。 问

  • 我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘