我想通过聚合pyspark数据帧来分组,同时基于此数据帧的另一列删除重复项(保留最后一个值)。
总之,我想将dropDuplicates应用于GroupeData对象。所以,对于每个组,我只能动态地保留一行。
对于下面的数据帧,直接的组聚合是:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sum("feature")
)
aggregated.show(truncate=False)
导致以下数据帧:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
我希望聚合只使用每个id
的最新状态。在这种情况下,id=2
已在ts=2020-01-02 00:00:00
更新为feature=1
,因此当id=2
时,所有基本时间戳大于2020-01-02 00:00:00
的聚合应仅对列功能使用此状态。预期的聚合数据帧是:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
我怎样才能用pyspark做到这一点?
我假设MapType变量在Spark中不应该有重复的键。有了这个假设,我想我可以聚合这个列,创建一个mapid-
所以我做到了:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.map_from_entries(
functions.collect_list(
functions.struct("id","feature")
)
).alias("id_feature")
)
aggregated.show(truncate=False)
但后来我发现地图可能有重复的键:
+---+------------------------------------------+--------------------------------+
|h3 |window |id_feature |
+---+------------------------------------------+--------------------------------+
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2] |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1] |
+---+------------------------------------------+--------------------------------+
所以这解决不了我的问题。相反,我只是发现了另一个问题。在数据库记事本中使用显示功能时,它显示MapType列而不显示重复的键。
由于您使用的是Spark 2.4,您可以尝试使用Spark SQL聚合函数,请参见以下内容:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sort_array(functions.collect_list(
functions.struct("ts", "id", "feature")
), False).alias("id_feature")
)
我将ts
字段添加到函数生成的结构数组中。收集清单。使用函数。sort_array可按ts
降序对列表进行排序(如果存在重复记录,则保留最新记录)。在下面的聚合函数中,我们使用包含两个字段的命名结构设置零值:ids(MapType)缓存所有已处理的id,total仅在缓存的ids
中不存在新id时进行求和。
aggregated.selectExpr("h3", "window", """
aggregate(
id_feature,
/* zero_value */
(map() as ids, 0L as total),
/* merge */
(acc, y) -> named_struct(
/* add y.id into the ids map */
'ids', map_concat(acc.ids, map(y.id,1)),
/* sum to total only when y.id doesn't exist in acc.ids map */
'total', acc.total + IF(acc.ids[y.id] is null,y.feature,0)
),
/* finish, take only acc.total, discard acc.ids map */
acc -> acc.total
) as id_features
""").show()
+---+--------------------+----------+
| h3| window|id_feature|
+---+--------------------+----------+
| 1|[2020-01-01 00:00...| 3|
| 1|[2019-12-31 00:00...| 3|
| 1|[2019-12-30 00:00...| 3|
| 1|[2020-01-02 00:00...| 2|
+---+--------------------+----------+
首先,您可以找到每个id和时间窗口的最新记录,然后使用最新记录与原始数据帧连接。
time_window = window(timeColumn="ts", windowDuration="3 days", slideDuration="1 day")
df2 = df.groupBy("h3", time_window, "id").agg(max("ts").alias("latest"))
df2.alias("a").join(df.alias("b"), (col("a.id") == col("b.id")) & (col("a.latest") == col("b.ts")), "left") \
.select("a.*", "feature") \
.groupBy("h3", "window") \
.agg(sum("feature")) \
.orderBy("window") \
.show(truncate=False)
然后,结果和你预期的一样。
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-29 00:00:00, 2020-01-01 00:00:00]|3 |
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|2 |
+---+------------------------------------------+------------+
从这里开始,根据RDD/Spark DataFrame中的特定列从行中删除重复项,我们学习了如何根据一些特定变量删除重复的观察。如果我想以RDD的形式保存这些重复的观察结果,我该怎么做?我想如果RDD包含数十亿个观察值可能效率不高。那么除了使用之外,还有其他方法吗?
问题内容: 我在本地弄乱了pyspark 1.4中的数据帧,并且在使dropplicates方法起作用时遇到了问题。不断返回错误。我不太确定为什么这样做,因为我似乎遵循最新文档中的语法。似乎我缺少该功能的导入。 问题答案: 这不是导入问题。您只是调用了错误的对象。虽然类为,但应用后为纯Python ,列表不提供方法。您想要的是这样的:
我正在使用MongoDB聚合框架展开一个数组,该数组有重复项,我需要在进一步分组时忽略这些重复项。 我如何才能做到这一点?
我正在使用读取NetCDF文件,其中包含重复的时间。对于每一次重复,我只想保留第一次,而放弃第二次(它将永远不会出现得更频繁)。这个问题与这个熊猫问题非常相似,但这里提供的解决方案似乎都不适用于Xarray。 要重现问题,请执行以下操作: 生成的
问题内容: 我有这样的桌子 我想执行一个查询,以除去所有最新的重复项。我希望你有个主意吗? 例如,查询后的表必须是这样的 问题答案: 语法可能需要调整,但是应该做到这一点。此外,您可能希望将子查询预查询到其自己的表FIRST中,然后对该结果集运行DELETE FROM。
我试图通过合并排序对数组进行排序,并在排序时删除我认为相等的元素。我递归调用合并排序,然后合并。 到了这一点,我发现a和c是重复的。 我根据特定的标准决定我想要哪一个,我选择c。我递增右手计数器和左手计数器,比较b和d。假设我选择d,然后我选择b。我希望我的最终列表只有元素 但是,发生的事情是在下一个递归调用中,和是0和3,因此d在下一次调用时在数组中列出两次。合并过程使用的数组是: 这是代码。提