当前位置: 首页 > 知识库问答 >
问题:

具有动态滞后的窗函数

西门振
2023-03-14

我正在查看Spark SQL中的Spark DataFrame的窗口幻灯片函数。

我有一个包含列IDmontevolume的数据表。

id       month   volume new_col
1        201601  100     0
1        201602  120   100
1        201603  450   220
1        201604  200   670
1        201605  121   870
window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.lag(col('volume'), 1).over(window) + F.lag(col('new_col'), 1).over(window))

共有1个答案

仲浩旷
2023-03-14

您可以在窗口上使用lagsum来实现这一点。sum将自动计算在窗口上使用的cumsum。下面的代码将首先延迟volume列,然后获取其cumsum,但是也可以按照相反的顺序执行操作。

window = Window.partitionBy(F.col('id')).orderBy(F.col('month').asc())
df.withColumn('new_col', F.sum(F.lag(col('volume'), 1, 0).over(window)).over(window))
 类似资料:
  • 我在Scala中查看幻灯片函数中的Spark。

  • 在设置模拟器并创建android虚拟设备之后。Emulator启动设备,但屏幕保持静止,不显示任何图标,在虚拟设备上启动应用程序失败,出现以下错误: 执行时出现意外错误:am start-n“com.[应用程序启动活动]”-Android系统。意图行动MAIN-cAndroid。意图类别启动活动时发生启动器错误 虚拟设备的屏幕 该设备的详细信息如下: 名称:Galaxy_Nexus_API_24_

  • 我注意到,当我在DataFrame上使用窗口函数后,如果我用函数调用map()时,Spark会返回一个“Task not serializable”异常这是我的代码: 这是堆栈跟踪: 异常:任务不可序列化在org.apache.spark.util.ClosureCleaner$.EnsureClealizable(ClosureCleaner.scala:304)在org.apache.spar

  • 我正在用Apache Storm 1.1.2和Kafka0.11在Java9中构建一个Spring应用程序 我注意到,在高负载(每秒2500条消息)下,Kafka喷口有一个非常高的滞后。Kafka喷口有一个平行性提示3。滞后几乎等于喷口提交的偏移。 这个滞后设置了拓扑每秒可以摄取的最大消息量的上限,这并不是很大。有人知道解决这个问题的办法吗? 更新:我还注意到,即使有10个工作者和4个并行性提示,

  • 问题内容: 我有一个特定的问题。我有一个包含无效值的表。我需要用大于的先前值替换无效值(此处)。 困难在于,使用Update或insert(游标和update可以做到)对我来说是不合适的。我唯一的方法是使用Select语句。 当我将-函数与when一起使用时,我只会得到一列具有正确值的列。 内容: 预期查询结果: 问题答案: 结果 :

  • 问题内容: 我需要创建一个没有构造函数参数的不完整对象。像这样 我希望这个Bean是Spring管理的,以便以后可以使用Spring AOP。 但是我的bean需要将超时作为动态值传递-是否有一种方法可以创建在构造函数中注入了动态值的spring托管bean? 问题答案: 有一个方法,根据javadoc,它允许您指定构造函数参数,该参数用于覆盖bean定义自己的参数。因此,您可以在bean文件中放