当前位置: 首页 > 知识库问答 >
问题:

如何使用带有时间戳和水印分配器的Flink流式时间窗口?

商棋
2023-03-14

我正在研究一个Flink流式处理器,它可以从Kafka读取事件。这些事件由其中一个字段键控,并且在减少和输出之前应该在一段时间内加窗。我的处理器使用事件时间作为时间特性,因此从它所消耗的事件中读取时间戳。以下是它目前的样子:

source
    .map(new MapEvent())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(10)) {
        @Override public long extractTimestamp(Event event) {
                return event.getTimestamp();
            }
        })
    .keyBy(new KeySelector())
    .timeWindow(Time.minutes(1))
    .reduce(new EventReducer())
    .map(new MapToResult());

我所知道的事件如下:

    null
    null

共有1个答案

柴星津
2023-03-14

我觉得应该从水印概念说起。简单地说,watermark表示具有较早时间戳的大多数事件已经到达。基于该假设,当水印经过窗口结束时,时间窗口可以发出窗口。当然还有可能会出现迟到的情况,这可能是一个人想要处理的。这里有allowedlatency的概念,它指定在发出窗口后多长时间,我们应该跟踪存在的元素,以便例如可以使用那些迟来的事件更新接收器(但必须记住,窗口已经发出,而没有该元素)。希望这能回答你的第二个问题。

回到您的第一个问题,如果您有许多事件可以延迟20秒,我认为BoundedoutoFordernesTimeStampExtractor是最好的选择。这样,虽然发射,但每个窗口都将延迟20秒。如果晚到是相当零星的,你可以处理重复的,那么你可以考虑另一个。

您提到的AssignerWithPunctuatedWatermarks(正如文档所说的,在流中的某些特定事件已经充当水印的情况下应该使用。所以不要认为它适合您的用例。

关于水印的更多信息,你可以阅读这个文件或这个和那个

 类似资料:
  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。 我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和Tu

  • 我正在尝试加入Flink中的两种类型(比如A和B)。我想确认我的理解是否正确。事件的某些属性- 事件A立即流入flink,延迟几分钟(5-10分钟) 事件B以15-30分钟的轻微延迟流动 事件a和事件B之间存在1:1连接 我已将事件A的数据流配置为10分钟的BoundedAutofordernessTimestampExtractor,将事件B的数据流配置为30分钟。稍后,我使用表API进行时间窗

  • 目前是否有一种方法可以将流量模式合并到OptaPlanner中并解决包装和交付VRP问题? 比如说,我需要在30辆车中优化500辆今天和明天的提货和交付,其中每辆提货都有1-4小时的时间窗口。如果可能的话,我想在高峰时间避开城市的繁忙地区。 还可以添加(或同时取消)新的皮卡。 我相信这是一个常见的问题。OptaPlanner中是否有合适的解决方案? 谢啦!