假设我有一个 inputStream,我对它执行了一些窗口操作。通过对事件执行某些窗口操作而创建的事件的时间戳是什么?
....
DataStream<Integer> inputStream = // ...
DataStream<Integer> countStream = inputStream.keyBy(0)
.timeWindow(time.Seconds(1))
.sum();
DataStream<Integer> maxStream = inputStream.keyBy(0)
.timeWindow(time.Seconds(1))
.max();
现在我想组合流countStream和maxStream,以找到最后一秒的countStream等于maxStream的所有时间戳。
注意:这并不是我试图解决的问题,但这是一个代表性的例子。解决这个问题将帮助我解决我需要解决的真正问题。
更新:
如果您希望来自同一上游窗口的元素最终位于同一下游窗口中。您可以使用连续窗口操作。
步骤:
并
集将您的 countStream 与 maxStream 合并。窗口全部
。流程
功能跟随窗口所有
。在时间窗口是事件时间窗口的情况下,它们发出的事件将被标记为在窗口结束时发生。在处理时间窗口的情况下,事件没有时间戳,CPU时间时钟将用作计时信息的来源。
更新:
Flink中的时间窗口与历元对齐——它们与第一个事件或类似事件无关。我们保证两个具有相同持续时间和偏移量的事件时间窗口(例如两个1秒长的翻滚窗口)将在完全相同的时间间隔内收集事件。
事件时间窗口发出的事件流本身就是一个带有事件时间戳的流,可以像任何其他带时间戳的事件流一样进一步加窗。请记住,一个窗口实例(即同一秒)生成的所有事件都具有相同的时间戳。因此,如果你在一个1秒的窗口中使用一个更短的窗口,例如100毫秒,那么10次中有9次更短的窗口不会看到任何事件。
我在创建用于聚合数据的SerDes时遇到了一些问题,需要通过“”发送到另一个主题。然而,我需要为窗口化数据创建一个SerDes,我不知道该怎么做。
我有一些历史数据,每条记录都有它们的时间戳。我想阅读它们并将它们输入到Kafka主题中,并使用Kafka流以时间窗口的方式处理它们。 现在的问题是,当我创建kafka流时间窗口聚合处理器时,我如何告诉kafka使用记录中的时间戳字段来创建时间窗口,而不是真正的实时时间?
为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?
我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。
我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问