当前位置: 首页 > 知识库问答 >
问题:

聚合,同时在pyspark中删除重复项

左宁
2023-03-14

我想通过聚合pyspark数据帧来分组,同时基于此数据帧的另一列删除重复项(保留最后一个值)。

总之,我想将dropDuplicates应用于GroupeData对象。所以,对于每个组,我只能动态地保留一行。

对于下面的数据帧,直接的组聚合是:

from pyspark.sql import functions

dataframe = spark.createDataFrame(
    [
        (1, "2020-01-01", 1, 1),
        (2, "2020-01-01", 2, 1),
        (3, "2020-01-02", 1, 1),
        (2, "2020-01-02", 1, 1)
    ],
    ("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))

# +---+-------------------+-------+---+
# | id|                 ts|feature| h3|
# +---+-------------------+-------+---+
# |  1|2020-01-01 00:00:00|      1|  1|
# |  2|2020-01-01 00:00:00|      2|  1|
# |  3|2020-01-02 00:00:00|      1|  1|
# |  2|2020-01-02 00:00:00|      1|  1|
# +---+-------------------+-------+---+

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.sum("feature")
)
aggregated.show(truncate=False)

导致以下数据帧:

+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

我希望聚合只使用每个id的最新状态。在这种情况下,id=2已在ts=2020-01-02 00:00:00更新为feature=1,因此当id=2时,所有基本时间戳大于2020-01-02 00:00:00的聚合应仅对列功能使用此状态。预期的聚合数据帧是:

+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

我怎样才能用pyspark做到这一点?

我假设MapType变量在Spark中不应该有重复的键。有了这个假设,我想我可以聚合这个列,创建一个mapid-

所以我做到了:

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.map_from_entries(
    functions.collect_list(
      functions.struct("id","feature")
    )
  ).alias("id_feature")
)
aggregated.show(truncate=False)

但后来我发现地图可能有重复的键:

+---+------------------------------------------+--------------------------------+
|h3 |window                                    |id_feature                      |
+---+------------------------------------------+--------------------------------+
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2]                |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1]                |
+---+------------------------------------------+--------------------------------+

所以这解决不了我的问题。相反,我只是发现了另一个问题。在数据库记事本中使用显示功能时,它显示MapType列而不显示重复的键。


共有2个答案

葛玉堂
2023-03-14

由于您使用的是Spark 2.4,您可以尝试使用Spark SQL聚合函数,请参见以下内容:

aggregated = dataframe.groupby("h3",
   functions.window( 
     timeColumn="ts", 
     windowDuration="3 days", 
     slideDuration="1 day", 
   ) 
 ).agg( 
     functions.sort_array(functions.collect_list( 
       functions.struct("ts", "id", "feature") 
     ), False).alias("id_feature") 
 )   

我将ts字段添加到函数生成的结构数组中。收集清单。使用函数。sort_array可按ts降序对列表进行排序(如果存在重复记录,则保留最新记录)。在下面的聚合函数中,我们使用包含两个字段的命名结构设置零值:ids(MapType)缓存所有已处理的id,total仅在缓存的ids中不存在新id时进行求和。

aggregated.selectExpr("h3", "window", """
  aggregate(
    id_feature,
    /* zero_value */
    (map() as ids, 0L as total), 
    /* merge */
    (acc, y) -> named_struct(
      /* add y.id into the ids map */
      'ids', map_concat(acc.ids, map(y.id,1)), 
      /* sum to total only when y.id doesn't exist in acc.ids map */
      'total', acc.total + IF(acc.ids[y.id] is null,y.feature,0)
    ), 
    /* finish, take only acc.total, discard acc.ids map */
    acc -> acc.total
  ) as id_features

""").show()
+---+--------------------+----------+
| h3|              window|id_feature|
+---+--------------------+----------+
|  1|[2020-01-01 00:00...|         3|
|  1|[2019-12-31 00:00...|         3|
|  1|[2019-12-30 00:00...|         3|
|  1|[2020-01-02 00:00...|         2|
+---+--------------------+----------+
方献
2023-03-14

首先,您可以找到每个id和时间窗口的最新记录,然后使用最新记录与原始数据帧连接。

time_window = window(timeColumn="ts", windowDuration="3 days", slideDuration="1 day")

df2 = df.groupBy("h3", time_window, "id").agg(max("ts").alias("latest"))

df2.alias("a").join(df.alias("b"), (col("a.id") == col("b.id")) & (col("a.latest") == col("b.ts")), "left") \
   .select("a.*", "feature") \
   .groupBy("h3", "window") \
   .agg(sum("feature")) \
   .orderBy("window") \
   .show(truncate=False)

然后,结果和你预期的一样。

+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-29 00:00:00, 2020-01-01 00:00:00]|3           |
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|2           |
+---+------------------------------------------+------------+
 类似资料:
  • 从这里开始,根据RDD/Spark DataFrame中的特定列从行中删除重复项,我们学习了如何根据一些特定变量删除重复的观察。如果我想以RDD的形式保存这些重复的观察结果,我该怎么做?我想如果RDD包含数十亿个观察值可能效率不高。那么除了使用之外,还有其他方法吗?

  • 问题内容: 我在本地弄乱了pyspark 1.4中的数据帧,并且在使dropplicates方法起作用时遇到了问题。不断返回错误。我不太确定为什么这样做,因为我似乎遵循最新文档中的语法。似乎我缺少该功能的导入。 问题答案: 这不是导入问题。您只是调用了错误的对象。虽然类为,但应用后为纯Python ,列表不提供方法。您想要的是这样的:

  • 我正在使用MongoDB聚合框架展开一个数组,该数组有重复项,我需要在进一步分组时忽略这些重复项。 我如何才能做到这一点?

  • 我正在使用读取NetCDF文件,其中包含重复的时间。对于每一次重复,我只想保留第一次,而放弃第二次(它将永远不会出现得更频繁)。这个问题与这个熊猫问题非常相似,但这里提供的解决方案似乎都不适用于Xarray。 要重现问题,请执行以下操作: 生成的

  • 问题内容: 我有这样的桌子 我想执行一个查询,以除去所有最新的重复项。我希望你有个主意吗? 例如,查询后的表必须是这样的 问题答案: 语法可能需要调整,但是应该做到这一点。此外,您可能希望将子查询预查询到其自己的表FIRST中,然后对该结果集运行DELETE FROM。

  • 我试图通过合并排序对数组进行排序,并在排序时删除我认为相等的元素。我递归调用合并排序,然后合并。 到了这一点,我发现a和c是重复的。 我根据特定的标准决定我想要哪一个,我选择c。我递增右手计数器和左手计数器,比较b和d。假设我选择d,然后我选择b。我希望我的最终列表只有元素 但是,发生的事情是在下一个递归调用中,和是0和3,因此d在下一次调用时在数组中列出两次。合并过程使用的数组是: 这是代码。提