在flink中,watermark用于标识数据当前的进度、触发窗口计算、通过延迟设置容忍部分数据的乱序,详细定义可见:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/sql/create/
那么,watermark具体如何计算以及怎样对乱序数据起作用?特此通过代码加以解析。
注:下文中所涉及的flink源码版本为 release-1.15,使用flink sql作为例子。
首先,以event time为例,定义watermark的方式为:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
该table的watermark策略为当前最大的时间戳 - 5 second,如下:
Emits watermarks, which are the maximum observed timestamp minus the specified delay, e.g.,
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
is a 5 seconds delayed watermark strategy.
首先,我们来看watermark的生成机制,以BoundedOutOfOrderTimestamps为例:
@Override
public void nextTimestamp(long timestamp) {
if (timestamp > maxTimestamp) {
maxTimestamp = timestamp;
}
}
@Override
public Watermark getWatermark() {
return new Watermark(maxTimestamp - delay);
}
有了watermark之后,就可以 进行窗口相关的操作了。
以常用的tumble滚动窗口为例,什么时候会触发水位的计算呢,我们可以找到如下判断逻辑
@Override
public boolean onElement(Object element, long timestamp, W window) throws Exception {
if (triggerTime(window) <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return true;
} else {
ctx.registerEventTimeTimer(triggerTime(window));
return false;
}
}
可以看到,当watermark超过窗口时,会触发计算。由于我们已经设置了乱序容忍即delay时间,因此,当所有event的最大timestamp超过窗口delay时间后,才会触发计算。
对于那些延迟到达(即timestamp产生了回退的event)有两种可能:
在实际的flink job中,我们会并行运行多个operator,同时每个operator有多个上下游input/output,此时,针对多个不同的输入watermark会进行合并处理,同时将计算得到的watermark下发至所有的下游operator(即所谓的广播)。
flink中实际处理record与window关系的类是WindowOperator,其继承了AbstractStreamOperator,在AbstractStreamOperator中,通IndexedCombinedWatermarkStatus管理多个input产生的watermark:
private void processWatermark(Watermark mark, int index) throws Exception {
if (combinedWatermark.updateWatermark(index, mark.getTimestamp())) {
processWatermark(new Watermark(combinedWatermark.getCombinedWatermark()));
}
}
public boolean updateCombinedWatermark() {
long minimumOverAllOutputs = Long.MAX_VALUE;
// if we don't have any outputs minimumOverAllOutputs is not valid, it's still
// at its initial Long.MAX_VALUE state and we must not emit that
if (partialWatermarks.isEmpty()) {
return false;
}
boolean allIdle = true;
for (PartialWatermark partialWatermark : partialWatermarks) {
if (!partialWatermark.isIdle()) {
minimumOverAllOutputs =
Math.min(minimumOverAllOutputs, partialWatermark.getWatermark());
allIdle = false;
}
}
this.idle = allIdle;
if (!allIdle && minimumOverAllOutputs > combinedWatermark) {
combinedWatermark = minimumOverAllOutputs;
return true;
}
return false;
}
从上述分析我们可以明晰以下概念: