我想使用会话
窗口聚合,然后在Table API/FlinkSQL
中生成的结果之上运行Tumble
窗口聚合。
是否可以在第一次会话
聚合后修改rowtime
属性以使其等于会话中最后观察到的事件的. rowtime
?
我正在尝试这样做:
table
.window(Session withGap 2.minutes on 'rowtime as 'w)
.groupBy('w, 'userId)
.select(
'userId,
('w.end.cast(Types.LONG) - 'w.start.cast(Types.LONG)) as 'sessionDuration,
('w.rowtime - 2.minutes) as 'rowtime
)
.window(Tumble over 5.minutes on 'rowtime as 'w)
.groupBy('w)
.select(
'w.start,
'w.end,
'sessionDuration.avg as 'avgSession,
'sessionDuration.count as 'numberOfSession
)
关键部分是:
('w.rowtime - 2.minutes) as 'rowtime
因此,我想将会话中最新事件的 .rowtime
重新分配给记录,而没有会话间隔(在本例中为 2.分钟
)。
这在BatchTable中运行良好,但在StreamTable中不起作用:
Exception in thread "main" org.apache.flink.table.api.ValidationException: TumblingGroupWindow('w, 'rowtime, 300000.millis) is invalid: Tumbling window expects a time attribute for grouping in a stream environment.
是的,我知道,感觉上我不想发明时间机器来改变时间顺序。但实际上有可能以某种方式实现所描述的行为吗?
不可以,很遗憾,您无法使用当前版本 (1.6.0) 中的 SQL 或表 API 执行此操作。修改时间属性(rowtime 或 proctime)后,它就会立即成为常规 TIMESTAMP
属性并失去其特殊时间特征。
对于行时间属性,原因是我们不能保证时间戳仍然与水印对齐。原则上,我们可以将水印延迟减去的时间间隔,但这还不支持。
我已经能够创建一个“会话开始信号”流,如本答案所述。 是否可以在每次窗口聚合结束时创建一个“会话结束信号”流?
我们计划将Apache Flink与一个巨大的IOT设置一起使用。客户将向我们发送某种结构化的传感器数据(如sensor_id、sensor_type、sensor_value、timestamp)。我们没有控制每个客户何时发送这些数据,最有可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:我们可以假设每个传感器的事件是按顺序来的。 在开始实施可能的流式管道之
我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些
我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi
我有两个流,希望将第二个流连接到窗口内的第一个流,因为我需要对与会话相关的两个流的连接进行一些计算(流的连接控制会话)。 实际上,当从留档读取时,(会话)窗口只允许在单个流上进行计算,而不允许在连接中进行计算。 我曾尝试使用会话窗口和协处理器函数的组合,但结果并不完全符合我的预期。 有没有办法合并Flink中与会话窗口相关的两个流?
我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情