当前位置: 首页 > 知识库问答 >
问题:

Apache Spark-处理时态RDDs上的滑动窗口

暴才俊
2023-03-14

我最初的想法是简单地迭代,如果我们想要平均每X天,X次,每次只需按日期分组元素,并有一个偏移量。

所以如果我们有这样的场景:

天数:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

第2组:(6,F)(7,G)(8,H)(9,I)(10,J)

第3组:(11,K)(12,L)(13,M)(14,N)(15,O)

第二次迭代:

共有1个答案

周鸿光
2023-03-14

如果您转换为DataFrame,这将变得简单得多--您只需将数据自联接到自身并找到平均值即可。假设我有这样一系列数据:

tsDF.show
date       amount
1970-01-01 10.0
1970-01-01 5.0
1970-01-01 7.0
1970-01-02 14.0
1970-01-02 13.9
1970-01-03 1.0
1970-01-03 5.0
1970-01-03 9.0
1970-01-04 9.0
1970-01-04 5.8
1970-01-04 2.8
1970-01-04 8.9
1970-01-05 8.1
1970-01-05 2.1
1970-01-05 2.78
1970-01-05 20.78

它卷起为:

tsDF.groupBy($"date").agg($"date", sum($"amount"), count($"date")).show
date       SUM(amount) COUNT(date)
1970-01-01 22.0        3
1970-01-02 27.9        2
1970-01-03 15.0        3
1970-01-04 26.5        4
1970-01-05 33.76       4

然后,我将需要创建一个UDF来为联接条件移位日期(注意,我使用偏移量=-2),仅使用2天窗口:

def dateShift(myDate: java.sql.Date): java.sql.Date = {
  val offset = -2;
  val cal = Calendar.getInstance;
  cal.setTime(myDate);
  cal.add(Calendar.DATE, offset);
  new java.sql.Date(cal.getTime.getTime)
}
val udfDateShift = udf[java.sql.Date,java.sql.Date](dateShift)
val windowDF = tsDF.select($"date")
  .groupBy($"date")
  .agg($"date")
  .join(
    tsDF.select($"date" as "r_date", $"amount" as "r_amount"),
    $"r_date" > udfDateShift($"date") and $"r_date" <= $"date"
  )
  .groupBy($"date")
  .agg($"date",avg($"r_amount") as "2 day avg amount / record")

val windowDF.show
date       2 day avg amount / record
1970-01-01 7.333333333333333
1970-01-02 9.98
1970-01-03 8.58
1970-01-04 5.928571428571429
1970-01-05 7.5325
 类似资料:
  • 我正在制作一个应用程序,其中通过每5秒一次的间隔HTTP请求收集来自虚拟机的与CPU使用、内存使用、磁盘使用等相关的数据。收集的数据如下所示: 我在Drools Fusion上创建了一些规则,试图看到以下内容:例如,当CPU使用率在过去10秒内达到10%以上时,然后在屏幕上打印一些东西,但我的问题是,即使我在规则中输入了命令,即使尚未通过,规则仍然被触发。这是CPU使用率的规则: 是从HTTP响应

  • 假设我有一个每1分钟开始的2小时窗口。下一步是应用GroupBy转换。 如果能解释这一点,我将不胜感激。无法真正找到相关信息

  • 我正在尝试flink的一些网络监控工作。我的目标是计算每个的不同。 我下面的代码工作,但性能真的很糟糕。似乎每个滑动窗口都重新计算所有事件,但这不应该是必要的。 例如,我们有时间秒1-600的事件。Flink可以得到每秒的累加器,所以我们每秒有600个累加器。当第一个滑动窗口过期时,flink只合并1-300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒前预合并1-299。当第二个滑动窗

  • 问题内容: 我有一张图片。 我想为图像中的每个像素获得一个3x3的窗口(相邻像素)。 我有这个Python代码: mask5是3x3窗口。分是中心像素。 有没有一种更有效的方法可以做到这一点-即使用地图,迭代器-除了我使用的两个嵌套循环以外的任何方法? 问题答案: 可以通过重塑和交换轴,然后在所有内核元素上重复这样来更快地完成操作,如下所示: 这为您提供了一个3 * 3的瓷砖阵列,这些瓷砖在整个表

  • 目前准备用redis实现分布式限流策略, 抛开那些redis插件,我觉得zset实现理论上可行, 具体逻辑可以是这样: 抽象限流逻辑: 针对某个action,需要在一段时间(即为窗口),只能容许N个操作 redis伪代码 问题: 假设某个行为有大量的不同样本或者不同实例在调用, 也就是key不一样, 假设有百万以上,但是每个key的行为发生次数有限,例如就一次, 这个时候就会出现很大冷数据 处理该

  • 介绍 将TCP与UDP这样的简单传输协议区分开来的是它传输数据的质量。TCP对于发送数据进行跟踪,这种数据管理需要协议有以下两大关键功能: 可靠性:保证数据确实到达目的地。如果未到达,能够发现并重传。 数据流控:管理数据的发送速率,以使接收设备不致于过载。 要完成这些任务,整个协议操作是围绕滑动窗口确认机制来进行的。因此,理解了滑动窗口,也就是理解了TCP。 更多信息 TCP面向流的滑动窗口确认机