当前位置: 首页 > 知识库问答 >
问题:

在Flink的聚合原语中具有与HOP_START等价的

糜淇
2023-03-14

我正试图用Flink SQL在一个跳跃窗口上做一个指数衰减移动平均。我需要访问窗口的一个边框,在下面的HOP_START:

    SELECT                                                                              
      lb_index one_key,
    -- I have access to this one:
      HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,
    -- Aggregation primitive:
      SUM(
        Y * EXP(TIMESTAMPDIFF(
          SECOND, 
          proctime, 
    -- This one throws:
          HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      )))
    FROM write_position                                                                
    GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

我得到了以下堆栈跟踪:

11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Aggregate(groupBy: (lb_index), window: (SlidingGroupWindow('w$, 'proctime, 5000.millis, 50.millis)), select: (lb_index, SUM($f2) AS Y, start('w$) AS w$start, end('w$) AS w$end, proctime('w$) AS w$proctime))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using Calc(select: (lb_index, proctime, *(payload.Y, EXP(/(CAST(/INT(Reinterpret(-(HOP_START(PROCTIME(proctime), 50, 5000), PROCTIME(proctime))), 1000)), 1000))) AS $f2))
11:55:37.011 [main] DEBUG o.a.c.p.RelOptPlanner - For final plan, using rel#459:DataStreamScan.DATASTREAM.true.Acc(table=[_DataStreamTable_0])
Exception in thread "main" org.apache.flink.table.codegen.CodeGenException: Unsupported call: HOP_START 
If you think this function should be supported, you can create an issue and start a discussion for it.
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
    at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)

它确实说它在聚合和之外工作时未实现。这就是为什么我认为这是一个范围问题。

现在,问题是:我可以转换这个表达式并在聚合之外进行最后的处理,如exp(x,y)=exp(x)*exp(y);但我坚持使用TIMESTAMPDIFF(这在我上一期中创造了奇迹)。我还没有找到将时间属性转换为数字类型的方法;此外,我不喜欢对UNIX时间戳进行幂运算,即使我缩小了它们。

无论如何,这项工作会有点笨拙,可能会有另一种方式。我不知道如何在不抛出的情况下,将此SQL片段中的作用域推送到窗口作用域中并具有开始时间。

共有1个答案

魏康安
2023-03-14

我建议你尝试使用HOP_PROCTIME()而不是HOP_START()。这里解释了这些差异,但效果将是您将拥有一个proctime属性而不是时间戳,我希望这将使TIMESTAMPDIFF满意。

 类似资料:
  • 我使用复合和术语聚合来获得基于给定字段的分组结果。我还使用基数聚合来获取聚合桶的总计数。 下面是我发送的请求查询,以获得相应的响应: 请求: 答复: 我使用Kibana检查查询,它对我来说很好。 但是,我不确定如何在我的NEST对象语法中使用这个基数聚合器。 这是我的代码: 我将非常感谢任何帮助。

  • 我正在尝试按照此处的步骤创建一个基本的 Flink 聚合 UDF。我已经添加了依赖项()并实现了 我已经实现了强制方法和其他一些方法:< code>accumulate,merge等。所有这些构建都没有错误。根据文件,我应该可以注册为 但是,似乎只需要作为输入。我收到一个不兼容的类型错误: 任何帮助都会很好。

  • 我在读一篇关于OOP中的关系、关联、组合、聚合等的文章。有些事情令人困惑 因此,在PHP中,我们调用以下代码组合 在阅读了几篇关于作文的文章后 以下是组成示例: 因此,根据我的理解,聚合意味着A类的对象可以存在于B类之外,而组合意味着A类生命周期的对象取决于B类。 我正确理解了吗?

  • 抱歉,如果已经问过了,但一直潜伏在SO周围,找不到任何适合我需要的东西。 基本上,我在使用ES的第一次快速尝试中试图实现的是在术语聚合中添加更多计数器。 快速尝试一下,我将以下请求发送给ES。 我现在得到的只是样本在文档中显示的内容。 但是,我真的不知道如何在桶中包含更多的内部聚合。会导致这样的文档的东西。 我应该如何构造聚合,以便按桶包含这些聚合?

  • 我有一个窗口化的每小时聚合的数据流。 Datastreamds=.....

  • 问题内容: 我试图理解这些术语的含义。我举了一些例子,例如: 汇总:Facebook 有一个 用户 组成:facebook 中的 每个用户 都有一个 会话。 协会:人们 使用 浏览器 但是我对 具有 和 使用我的 示例感到困惑。为什么不能是用户 使用 Facebook帐户或Facebook 使用 会话来认证用户? 就OOP而言,这是错误的吗?我在哪里想念这个概念? 问题答案: 该 使用 关系意味着