当前位置: 首页 > 知识库问答 >
问题:

Flink窗口边界、水印、事件时间戳和处理时间

武友樵
2023-03-14
    null
  • 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。
  • 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。

在此场景中,几个事件到达Flink运算符时具有跨越12:01-12:09的不同事件时间戳。此外,事件时间戳与我们的处理时间相对一致(如下面的X轴所示)。由于我们处理的是EVENT_TIME特性,因此偶数是否属于特定事件应该通过其事件时间戳来确定。

在此输入图像说明

    null

前一个问题的答案也将涉及这一点,但我认为在此明确提及会有所帮助。假设我有一个大小为5分钟的TumblingEventTimeWindow。然后在12:00,我启动一个回填作业,该作业在许多事件中冲到Flink运算符,其时间戳覆盖10:02-10:59的范围;但由于这是一个回填作业,整个执行大约需要3分钟才能完成。

作业是否会分配12个单独的窗口并根据事件的事件时间戳正确地填充它们?那12扇窗户的界限是什么?最后会不会有12个输出事件,每个输出事件都有每个分配窗口的求和值?

对于这种逻辑和运算符的自动化测试,我也有一些顾虑。操作处理时间的最佳方法,以这样一种方式触发某些行为,以形成所需窗口的边界以用于测试目的。特别是,我到目前为止读到的关于利用测试工具的东西似乎有点混乱,可能会导致一些不太容易阅读的杂乱代码:

    null
  • 时间映射提取器和水印发射器
  • 事件时间处理和水印
  • 处理后期数据和Spark中的水印
    • 星火博士那部分的图片是非常有帮助和教育意义的。但同时,Windows的边界与处理时间而不是事件时间戳对齐的方式给我带来了一些困惑。
    • 同样,在可视化中,水印似乎每5分钟计算一次,因为这是窗口的滑动规格。这是计算水印频率的决定因素吗?对于不同的窗口(例如翻滚滑动会话等等),这在Flink中是如何工作的?!

    非常感谢您的帮助,如果您知道任何更好的参考有关这些概念和他们的内部工作,请让我知道。

    如果运行带有事件时间语义的作业,则窗口运算符处的处理时间完全不相关

    窗口的边界是如何计算的?!

    此外,非常感谢您澄清了out-of-ordernesslateness之间的区别。我正在处理的代码由于名称错误(从BoundedOutoFordernesTimeStampExtractor继承的类的构造函数参数被命名为MaxLatency):/

    考虑到这一点,让我看看我是否能够正确地理解水印是如何计算的,以及什么时候事件将被丢弃(或旁输出):

      null
      null
    • current-watermark=max-event-time-seen-so-far

    并且在这些情况中,任何事件的时间戳小于或等于current-watermark的事件都将被丢弃(侧输出),对吗?!

    这就提出了一个新的问题。您什么时候想使用out of orderness而不是lateness?因为当前的水印计算(数学上)在这些情况下可以是相同的。当你同时使用这两种方法时会发生什么(这有意义吗)?!

      null
    • 窗口应用于具有BoundedOutoFordernesTimeStampExtractor数据流,该数据流具有2分钟MaxOutoForderness

    注意:如果不能同时具有out of ordernesslateness或没有意义,请只考虑上面示例中的out of orderness

    最后,你可以请布局窗口,将有一些事件分配给他们,请指定那些窗口的边界(窗口的开始和结束时间戳)。我假设边界也是由事件的时间戳决定的,但是在具体的例子中要想弄清楚它们就有点棘手了,比如这个例子。

    再次感谢您的帮助:)

共有1个答案

纪畅
2023-03-14

水印:据我的理解,在Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件将被丢弃并在结果计算中被忽略。

这是不正确的,可能是混乱的根源。在Flink中,无序和迟到是不同的概念。使用boundedoutofordernestimestampextractor时,水印为max-event-timestamp-seen-so-far-max-out-of-orderness。有关允许迟到的更多信息,请参阅Flink文档[1]。

如果运行带有事件时间语义的作业,则窗口运算符处的处理时间完全不相关:

  • 事件将根据其事件时间戳分配给其窗口
  • 一旦水印达到其最大时间戳(窗口结束时间-1)时,将触发时间窗口。
  • 时间戳早于当前允许的水印延迟的事件将被丢弃或发送到延迟数据端输出[1]

这意味着,如果您在12:00PM(处理时间)开始一个作业,并且开始摄取过去的数据,那么水印也将是(甚至更远)过去的。因此,配置的allowedlateness是不相关的,因为数据相对于偶数时间而言并不晚。

另一方面,如果您首先从12:00pm摄取一些数据,然后从10:00pm摄取数据,那么在您摄取旧数据之前,水印将已经提前到~12:00pm。在这种情况下,从10:00PM开始的数据将“迟到”。如果它晚于已配置的allowedlateness(default=0),它将被丢弃(默认值)或发送到侧输出(如果已配置)[1]。

事件时间窗口的时间线如下:

  1. 窗口中具有时间戳的第一个元素到达->创建此窗口的状态(&key)
  2. watermark>=window_endtime-1到达->窗口被激发(发出结果),但状态不被丢弃
  3. 水印>=window_endtime+allowed_latenes到达->状态被丢弃

介于2。和3。此窗口的事件延迟,但在允许的延迟范围内。事件被添加到现有状态中,并且默认情况下,窗口会在每个记录上激发,发出精炼的结果。

  • (1,8:29)添加到窗口(8:25-8:29:59:999)
  • (5,8:26)添加到窗口(8:25-8:29:59:999)
  • (3,9:48)添加到窗口(9:45-9:49:59:999)
  • (8:25-8:29:59:999)被激发,因为水印已提前到9:48-0:02=9:46,大于窗口的最后时间戳。窗口状态也被丢弃,因为水印已经提前到9:46,也大于窗口结束时间+允许的迟到时间(1分钟)
  • (7,9:46)被添加到窗口被添加到窗口(9:45-9:49:59:999)

希望这有帮助。

康斯坦丁

 类似资料:
  • 我试图使用process函数对一组事件进行处理。我正在使用事件时间和键控流。我面临的问题是,水印值始终为9223372036854725808。我已经对print语句进行了调试,它显示如下: 时间戳------1583128014000提取时间戳1583128014000当前水印------9223372036854775808 时间戳------1583128048000提取时间戳1583128

  • 我正在阅读《Stream Processing with Apache Flink》一书,书中说:“从版本0.10.0开始,Kafka支持消息时间戳。当从Kafka版本0.10或更高版本读取时,如果应用程序以事件时间模式运行,使用者将自动提取消息时间戳作为事件时间戳*“因此在函数中,调用将默认返回Kafka消息时间戳?请提供一个简单的示例,说明如何实现AssignerWithPeriodicalW

  • 窗口正在进行无限循环或其他操作,我正在处理后打印数据流,但看起来根本没有达到那个点。 下面是我的伪代码。 我在reduce函数中添加了要打印的日志。正在从reduce函数打印日志。但是这条流没有被打印出来。 并且流源数据是历史数据,即。。超过2个月的旧数据。如果是历史数据流,还需要专门设置其他内容吗? 任何输入都会大有裨益。。

  • 我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在

  • 我正在研究一个Flink流式处理器,它可以从Kafka读取事件。这些事件由其中一个字段键控,并且在减少和输出之前应该在一段时间内加窗。我的处理器使用事件时间作为时间特性,因此从它所消耗的事件中读取时间戳。以下是它目前的样子: 我所知道的事件如下: null null

  • 我正在尝试加入Flink中的两种类型(比如A和B)。我想确认我的理解是否正确。事件的某些属性- 事件A立即流入flink,延迟几分钟(5-10分钟) 事件B以15-30分钟的轻微延迟流动 事件a和事件B之间存在1:1连接 我已将事件A的数据流配置为10分钟的BoundedAutofordernessTimestampExtractor,将事件B的数据流配置为30分钟。稍后,我使用表API进行时间窗