当前位置: 首页 > 知识库问答 >
问题:

Flink SQL: Casting time中跳跃窗口上的指数衰减移动平均值

酆阳煦
2023-03-14

现在我们在 Flink 中拥有了带有花哨窗口的 SQL,我正在尝试从他们的 SQL 路线图/预览版 2017-03 帖子中引用衰减的移动平均线“,以”未来 Flink 版本对 Table API 和 SQL 的发布将会发生什么“:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

这是我的尝试(也受到方解石腐烂的例子的启发):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

时间是处理时间,通过从AppendStream表创建write_position,我们得到处理时间,如下所示:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

我收到此错误:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

我已经尝试过将程序时间转换为我所知道的所有其他类型(试图到达NUMERIC应许之地),但我就是找不到如何使它工作。

我错过了什么吗?时间是否是您无法转换的非常特殊的“系统更改数字”时间?如果是这样,仍然必须有某种方法将其与HOP_START(proctime,...)值进行比较。

共有1个答案

富凯旋
2023-03-14

您可以使用timestampDiff减去两个时间点(参见文档)。你这样使用它

TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)

其中时间点单位可以是秒、分钟、小时、天、月或年。

我还没有尝试过处理时间,但它确实适用于事件时间字段,所以希望它会。

 类似资料:
  • 问题内容: 我的数据集如下: 仅使用MySQL窗口函数可以做到这一点吗? 环境详细信息: 服务器版本:8.0.12 MySQL Community Server-GPL 问题答案: 您可以将Window Functions与Frames一起使用 : DB小提琴演示 细节: 表示当前行上方的两行(当前行除外)。我们在上明确定义了升序。因此,这意味着两个最接近的日期,低于当前行的日期 表示当前行。 使

  • 我也看过Pyspark中的加权移动平均线,但我需要一个Spark/Scala的方法,以及10天或30天的均线。 有什么想法吗?

  • 问题内容: 我基本上有一个像这样的值数组: 上面的数组过于简化,我在实际代码中每毫秒收集1个值,我需要使用编写的算法处理输出,以找到某个时间点之前最接近的峰值。我的逻辑失败了,因为在上面的示例中,它是真正的峰值,但是我的算法会向后看,并看到最后一个数字是峰值,因为之前的数值减少了。 目标是获取这些值,并对它们应用一种算法,该算法将使它们“平滑”一些,以便获得更多的线性值。(即:我希望自己的成绩是弯

  • 问题内容: 我有一个日期范围,并且每个日期都有一个度量值。我想计算每个日期的指数移动平均值。有人知道怎么做这个吗? 我是python的新手。似乎没有将平均值内置到标准python库中,这让我感到有些奇怪。也许我找的地方不对。 因此,给定以下代码,如何计算日历日期的IQ点的移动加权平均值? (可能是一种更好的数据结构方式,任何建议将不胜感激) 问题答案: 编辑:看来SciKits(补充SciPy的附

  • 我正在拼命寻找解决熊猫问题的办法。也许你能帮我。 我正在寻找一个滚动平均值,考虑到之前的平均值。 df看起来像这样: 现在,使用函数我会得到如下结果: 我想从第一个计算中考虑平均值,如下所示: 哪里 提前谢谢你!

  • 公式链接:https://sciencing.com/calculate-exponential-moving-averages-8221813.html