aggregatedTuple
.keyBy( 0).timeWindow(Time.milliseconds(8))
.reduce(new ReduceFunction<Tuple2<Long, JSONObject>>()
Flink有两个不同的、相关的抽象,它们处理在具有事件时间戳的流上计算窗口分析的不同方面:水印和允许迟到。
首先是水印,每当处理事件时数据(无论是否使用windows)时,水印就会发挥作用。水印提供了关于事件时间进度的信息,并为应用程序编写者提供了一种处理无序数据的方法。水印随数据流流动,每个水印标记流中的一个位置并携带一个时间戳。水印作为一种断言,即在流中的那个点,流现在(可能)完成到那个时间戳--或者换句话说,水印之后的事件不太可能来自水印所指示的时间之前。最常见的水印策略是使用BoundedEutoFordernesTimeStampExtractor,它假设事件在某个固定的、有界的延迟内到达。
这现在提供了一个迟到的定义--在水印之后的事件的时间戳小于水印的时间戳被认为是迟到。
How can I filter data stream as it wants to enter the window and check
if the data created at the right timestamp for the window?
Flink的窗口分配器负责将事件分配给适当的窗口--正确的事情将自动发生。将根据需要创建新的窗口实例。
How can I gather such late data in a variable to do some processing on them?
您可以在您的水印中充分慷慨,以避免有任何迟到的数据,和/或配置允许的迟到足够长,以适应迟到的事件。但请注意,Flink将被迫保持所有仍在接受后期事件的窗口打开,这将延迟垃圾收集旧窗口,并可能消耗相当大的内存。
注意,本讨论假设您希望使用时间窗口--例如,您正在使用的8msec长窗口。Flink还支持计数窗口(例如,将事件分组为100个批次)、会话窗口和自定义窗口逻辑。例如,如果您使用计数窗口,水印和迟到不会起任何作用。
如果您希望为您的分析提供每个键的结果,那么在应用窗口化之前,使用keyBy按键(例如,按用户ID)对流进行分区。例如:
stream
.keyBy(e -> e.userId)
.timeWindow(Time.seconds(10))
.reduce(...)
更新:注意,在最近版本的Flink中,windows现在可以将后期事件收集到边输出。
一些相关文件:
事件时间和水印
允许的延迟
我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。 当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。 流程示例: 我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期
我们计划将Apache Flink与一个巨大的IOT设置一起使用。客户将向我们发送某种结构化的传感器数据(如sensor_id、sensor_type、sensor_value、timestamp)。我们没有控制每个客户何时发送这些数据,最有可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:我们可以假设每个传感器的事件是按顺序来的。 在开始实施可能的流式管道之
具有Kafka Streams应用,其通过例如1天的流连接来执行开窗(使用原始事件时间,而不是挂钟时间)。 如果启动此拓扑,并从头开始重新处理数据(如在 lambda 样式的体系结构中),此窗口是否会将旧数据保留在那里?da 例如:如果今天是2022-01-09,而我收到来自2021-03-01的数据,那么这个旧数据会进入表格,还是会从一开始就被拒绝? 在这种情况下,可以采取什么策略来重新处理这些
我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。
我有一个用例,需要以不同的方式处理延迟事件和正常事件:如果事件在其窗口关闭后到达,则应将其发送到另一个路径。 我想是这样的。sideOutputLateData(..) 可以帮我解决这个问题。在正常情况下(即使用真实世界的数据)也是如此。但如果我想用伪造的数据来测试它,它就会停止工作。 我希望类似于: 将导致: 相反,我得到了: 如果我使用socketTextStream作为具有相同数据的源,它将
下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。