Flink watermark与乱序消息处理机制

郑茂材
2023-12-01

在flink中,watermark用于标识数据当前的进度、触发窗口计算、通过延迟设置容忍部分数据的乱序,详细定义可见:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/

那么,watermark具体如何计算以及怎样对乱序数据起作用?特此通过代码加以解析。

注:下文中所涉及的flink源码版本为 release-1.15,使用flink sql作为例子。

基于timestamp计算watermark

首先,以event time为例,定义watermark的方式为:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );

该table的watermark策略为当前最大的时间戳 - 5 second,如下:

Emits watermarks, which are the maximum observed timestamp minus the specified delay, e.g., WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND is a 5 seconds delayed watermark strategy.

首先,我们来看watermark的生成机制,以BoundedOutOfOrderTimestamps为例:

  1. 从当前与历史输入中获取最大的timestamp
        @Override
        public void nextTimestamp(long timestamp) {
            if (timestamp > maxTimestamp) {
                maxTimestamp = timestamp;
            }
        }
  2.  依据step1中计算得到的timestamp与乱序容忍时间获取watermark
        @Override
        public Watermark getWatermark() {
            return new Watermark(maxTimestamp - delay);
        }

乱序与window trigger

有了watermark之后,就可以 进行窗口相关的操作了。

以常用的tumble滚动窗口为例,什么时候会触发水位的计算呢,我们可以找到如下判断逻辑

        @Override
        public boolean onElement(Object element, long timestamp, W window) throws Exception {
            if (triggerTime(window) <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                return true;
            } else {
                ctx.registerEventTimeTimer(triggerTime(window));
                return false;
            }
        }

可以看到,当watermark超过窗口时,会触发计算。由于我们已经设置了乱序容忍即delay时间,因此,当所有event的最大timestamp超过窗口delay时间后,才会触发计算。

对于那些延迟到达(即timestamp产生了回退的event)有两种可能:

  1. 在窗口计算被触发前到达,仍然会被计算,达到了容忍延迟的效果。
  2. 在窗口计算被触发后到达,直接被丢弃,不参与计算,因为已经超过了delay时间。

watermark广播机制

在实际的flink job中,我们会并行运行多个operator,同时每个operator有多个上下游input/output,此时,针对多个不同的输入watermark会进行合并处理,同时将计算得到的watermark下发至所有的下游operator(即所谓的广播)。

flink中实际处理record与window关系的类是WindowOperator,其继承了AbstractStreamOperator,在AbstractStreamOperator中,通IndexedCombinedWatermarkStatus管理多个input产生的watermark:

    private void processWatermark(Watermark mark, int index) throws Exception {
        if (combinedWatermark.updateWatermark(index, mark.getTimestamp())) {
            processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
        }
    }
  1. 依据record所属的index,更新对应的partial的watermark
  2. 更新combined的watermark,轮询所有非idle的partial,取最小值为总体的watermark
    public boolean updateCombinedWatermark() {
        long minimumOverAllOutputs = Long.MAX_VALUE;

        // if we don't have any outputs minimumOverAllOutputs is not valid, it's still
        // at its initial Long.MAX_VALUE state and we must not emit that
        if (partialWatermarks.isEmpty()) {
            return false;
        }

        boolean allIdle = true;
        for (PartialWatermark partialWatermark : partialWatermarks) {
            if (!partialWatermark.isIdle()) {
                minimumOverAllOutputs =
                        Math.min(minimumOverAllOutputs, partialWatermark.getWatermark());
                allIdle = false;
            }
        }

        this.idle = allIdle;

        if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
            combinedWatermark = minimumOverAllOutputs;
            return true;
        }

        return false;
    }

从上述分析我们可以明晰以下概念:

  1. event的eventime或者processtime与delay一起决定了当前的watermark
  2. watermark用于触发窗口计算且永不回退
  3. 在窗口计算被触发前到达的乱序数据仍然参与计算,否则被丢弃
  4. 窗口一经触发即被丢弃,不会重复计算
  5. 多个非idle的input的watermark最小值确定了整体的watermark
 类似资料: