当前位置: 首页 > 知识库问答 >
问题:

PySpark:使用timeseries数据进行滚动平均

严正初
2023-03-14
%pyspark
import datetime
from pyspark.sql import functions as F

df1 = sc.parallelize([(17,"2017-03-11T15:27:18+00:00"), (13,"2017-03-11T12:27:18+00:00"), (21,"2017-03-17T11:27:18+00:00")]).toDF(["dollars", "datestring"])
df2 = df1.withColumn('timestampGMT', df1.datestring.cast('timestamp'))

w = df2.groupBy(F.window("timestampGMT", "7 days")).agg(F.avg("dollars").alias('avg'))
w.select(w.window.start.cast("string").alias("start"), w.window.end.cast("string").alias("end"), "avg").collect()

这将产生两个记录:

|        start        |          end         | avg |
|---------------------|----------------------|-----|
|'2017-03-16 00:00:00'| '2017-03-23 00:00:00'| 21.0|
|---------------------|----------------------|-----|
|'2017-03-09 00:00:00'| '2017-03-16 00:00:00'| 15.0|
|---------------------|----------------------|-----|

窗口函数对时间序列数据进行分类,而不是执行滚动平均。

是否有一种方法来执行滚动平均值,在此方法中,我将为每行返回一个周平均值,时间周期结束于该行的timestampGMT?

%pyspark
from pyspark.sql import functions as F
df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))
df = df.withColumn('rolling_average', F.avg("dollars").over(Window.partitionBy(F.window("timestampGMT", "7 days"))))
dollars timestampGMT            rolling_average
25      2017-03-18 11:27:18.0   25
17      2017-03-10 15:27:18.0   15
13      2017-03-15 12:27:18.0   15
dollars timestampGMT            rolling_average
17      2017-03-10 15:27:18.0   17
13      2017-03-15 12:27:18.0   15
25      2017-03-18 11:27:18.0   19

在上面的结果中,2017-03-10的rolling_average是17,因为没有之前的记录。2017-03-15的滚动平均值是15,因为它是2017-03-15的13和2017-03-10的17的平均值,后者与前7天窗口相一致。2017-03-18的滚动平均值是19,因为它是2017-03-18的25和2017-03-10的13的平均值,该平均值与前7天窗口相比有所下降,它不包括2017-03-10的17,因为该平均值与前7天窗口相比没有下降。

有没有一种方法可以做到这一点,而不是每周窗口不重叠的装箱窗口?

共有1个答案

韩景胜
2023-03-14

我找到了使用StackOverflow计算移动/滚动平均值的正确方法:

火花窗口函数-日期之间的范围

基本思想是将timestamp列转换为seconds,然后可以使用Pyspark.sql.Window类中的rangeBetween函数在窗口中包含正确的行。

%pyspark
from pyspark.sql import functions as F
from pyspark.sql.window import Window


#function to calculate number of seconds from number of days
days = lambda i: i * 86400

df = spark.createDataFrame([(17, "2017-03-10T15:27:18+00:00"),
                        (13, "2017-03-15T12:27:18+00:00"),
                        (25, "2017-03-18T11:27:18+00:00")],
                        ["dollars", "timestampGMT"])
df = df.withColumn('timestampGMT', df.timestampGMT.cast('timestamp'))

#create window by casting timestamp to long (number of seconds)
w = (Window.orderBy(F.col("timestampGMT").cast('long')).rangeBetween(-days(7), 0))

df = df.withColumn('rolling_average', F.avg("dollars").over(w))
dollars   timestampGMT            rolling_average
17        2017-03-10 15:27:18.0   17.0
13        2017-03-15 12:27:18.0   15.0
25        2017-03-18 11:27:18.0   19.0
 类似资料:
  • 我可以在JavaFX中创建一个仅水平滚动的滚动窗格,如下所示: 但是,鼠标滚轮在这种情况下仍然尝试垂直滚动而不是水平滚动(除非我专门在水平滚动条上滚动。)

  • 我试图在一个libgdx游戏中实现触摸滚动。我有一个很宽的图像,是一个房间的全景。我希望能够滚动图像,让用户可以看到房间的四周。我有它,所以我可以滚动一定的距离,但当一个新的触摸拖动事件被注册的图像被移回到原来的位置。 这就是我实现它的方式 } 在InputProcessor中 在这个问题LibGdx如何使用OrthographicCamera?滚动的帮助下,我做到了这一步?。然而,这并没有真正解

  • 我们在Glassfish 3.1.2集群上部署了一个Java EE应用程序,该集群使用JAX-RS提供REST API。我们定期通过将EAR部署到重复的集群实例来部署新版本的应用程序,然后更新HTTP负载平衡器,以将流量发送到更新的实例,而不是旧实例。 这使我们能够在不损失可用性的情况下进行升级,如下所述:http://docs.oracle.com/cd/E18930_01/html/821-2

  • 问题内容: 我一直在做一个项目,内容已经完成。但是对于设计,我正在考虑使用视差滚动技术。 但是,我所能找到的全部还是JavaScript或Jquery,而我只精通CSS3。 可以仅使用CSS3(如果需要使用HTML5)而不是使用jquery插件来实现视差滚动吗?如果我能指出一些相同的教程,那就太好了。 注意:这接近于我想要产生的效果 问题答案: 要产生非常基本的视差滚动效果,下面的示例就足够了。

  • 是否可以混合事务程序化和基于注释的管理?默认情况下,@Transactional会在任何运行时进行回滚并重新抛出它。 我不想重播它,但返回可选。空()有可能吗?使用事务编程管理很容易实现:(我从Spring文档中获取了示例) 有可能以一种好的方式将它们结合起来吗?让我们说: 你认为,混合使用两种管理事务的方法是一种代码气味吗? 谢谢你。

  • 我想根据最近3天的数字计算每个customer_id和日期的移动平均值。为了计算5月4日的移动平均数,我们需要计算5月1-3日的平均购买量 输出火花DF