[FLIP-126] 优化 Source 的 WatermarkAssigner 接口
新的 WatermarkAssigner 接口将之前的 AssignerWithPunctuatedWatermarks 和 AssignerWithPeriodicWatermarks 的两类 Watermark 的接口进行了整合,从而简化了后续开发支持插入 Watermark 的 Source 实现复杂度。
创建watermark的代码一般如下所示,在flink1.11中创建watermark主要有以下三种方式
stream.map()
.assignTimestampsAndWatermarks()
....
需要注意的是需要将数据指定时间戳,不然无法进行watermark操作,具体代码如下所示
public class UseWatermarkGenerator {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 每秒更新一次watermark
env.getConfig().setAutoWatermarkInterval(1000L);
DataStreamSource<String> source = env.socketTextStream("192.168.177.211", 7777);
SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
data.split(",")[0].trim(),
Long.parseLong(data.split(",")[1].trim()),
Double.parseDouble(data.split(",")[2].trim())
)
).returns(SensorReading.class);
stream.assignTimestampsAndWatermarks(
new WatermarkStrategy<SensorReading>() {
@Override
public WatermarkGenerator<SensorReading> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<SensorReading>() {
private long maxTimesStamp = Long.MIN_VALUE;
// 每来一条数据,将这条数据与maxTimesStamp比较,看是否需要更新watermark
@Override
public void onEvent(SensorReading event, long eventTimestamp, WatermarkOutput output) {
maxTimesStamp = Math.max(event.getTimeStamp(), maxTimesStamp);
}
// 周期性更新watermark
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// 允许乱序数据的最大限度为3s
long maxOutOfOrderness = 3000L;
output.emitWatermark(new Watermark(maxTimesStamp - maxOutOfOrderness));
}
};
}
// 必须指定中的timeStamp,否则报错
}.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp()))
.keyBy(SensorReading::getId)
// 创建长度为5s的事件时间窗口
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<SensorReading, Object, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<SensorReading> input, Collector<Object> out) throws Exception {
System.out.println("window : [" + window.getStart() + ", " + window.getEnd() + "]");
ArrayList<SensorReading> list = new ArrayList<>((Collection<? extends SensorReading>) input);
list.forEach(out::collect);
}
}).print();
env.execute();
}
}
输入以下数据
sensor1,1000000000000,2 // watermark = 999999997000, window : [1000000000000, 1000000005000)
sensor1,1000000005000,9 // watermark = 1000000002000,window : [1000000000000, 1000000005000)
sensor1,1000000010000,3 // watermark = 1000000007000, watermark > 1000000005000, 关闭[0, 5) 这个窗口,并开启[1000000005000, 1000000010000)窗口
sensor1,1000000002000,10// watermark = 1000000007000, 1000000002000 < watermark, 这条数据迟到并被丢弃
sensor1,1000000015000,11// watermark = 1000000010000, watermark = 1000000010000,关闭[0, 10) 这个窗口,并开启[1000000010000, 1000000015000)窗口
sensor1,1000000020000,7 // watermark = 1000000017000 window[1000000015000, 1000000020000)数据还没全到,不关闭窗口
输出为
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
window : [1000000005000, 1000000010000]
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=9.0}
window : [1000000010000, 1000000015000]
SensorReading{id='sensor1', timeStamp=1000000010000, temperature=3.0}
使用固定延时策略生成水印需要调用WatermarkStrategy中的静态方法forBoundedOutOfOrderness,从源码中可以看出,使用调用forBoundedOutOfOrderness底层原理和使用createWatermarkGenerator生成watermark是一样的
public interface WatermarkStrategy<T> extends
TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
......
static <T> WatermarkStrategy<T> forBoundedOutOfOrderness(Duration maxOutOfOrderness) {
return (ctx) -> new BoundedOutOfOrdernessWatermarks<>(maxOutOfOrderness);
}
......
}
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
/** The maximum timestamp encountered so far. */
private long maxTimestamp;
/** The maximum out-of-orderness that this watermark generator assumes. 乱序数据 */
private final long outOfOrdernessMillis;
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
// ------------------------------------------------------------------------
// 每来一条数据,处理一次
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
// 周期性生成watermark
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
测试代码如下
public class UseBoundedOutOfOrdernessWatermark {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 默认是200ms
env.getConfig().setAutoWatermarkInterval(1L);
DataStreamSource<String> source = env.socketTextStream("192.168.177.211", 7777);
SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
data.split(",")[0].trim(),
Long.parseLong(data.split(",")[1].trim()),
Double.parseDouble(data.split(",")[2].trim())
)
).returns(SensorReading.class);
// 间歇性生成 watermark, 设置最长容忍乱序数据时间为6
SingleOutputStreamOperator<SensorReading> result = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
@Override
public long extractTimestamp(SensorReading element, long recordTimestamp) {
return element.getTimeStamp();
}
})).keyBy(SensorReading::getId)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<SensorReading> input, Collector<SensorReading> out) throws Exception {
System.out.println("window : [" + window.getStart() + ", " + window.getEnd() + "]");
ArrayList<SensorReading> list = new ArrayList<>((Collection<? extends SensorReading>) input);
list.forEach(out::collect);
}
});
result.print();
env.execute();
}
}
输入数据如下
sensor1,1000000000000,2
sensor1,1000000005000,9
sensor1,1000000010000,3
sensor1,1000000002000,10
sensor1,1000000015000,11
sensor1,1000000020000,7
输入如下,原理与采用createWatermarkGenerator生成watermark相同
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
window : [1000000005000, 1000000010000]
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=9.0}
window : [1000000010000, 1000000015000]
SensorReading{id='sensor1', timeStamp=1000000010000, temperature=3.0}
通过阅读源码可以发现,通过单调递增的方式生成watermark是不允许有乱序数据的,原因如下
static <T> WatermarkStrategy<T> forMonotonousTimestamps() {
return (ctx) -> new AscendingTimestampsWatermarks<>();
}
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {
/**
* Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
// 调用BoundedOutOfOrdernessWatermarks的构造方法, 允许乱序时间为0
super(Duration.ofMillis(0));
}
}
编写测试代码
public class UseforMonotonous {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 默认是200ms
env.getConfig().setAutoWatermarkInterval(1L);
DataStreamSource<String> source = env.socketTextStream("192.168.177.211", 7777);
SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
data.split(",")[0].trim(),
Long.parseLong(data.split(",")[1].trim()),
Double.parseDouble(data.split(",")[2].trim())
)
).returns(SensorReading.class);
// 间歇性生成 watermark, 设置最长容忍乱序数据时间为6
SingleOutputStreamOperator<SensorReading> result = stream
.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.getTimeStamp()))
.keyBy(SensorReading::getId)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<SensorReading> input, Collector<SensorReading> out) throws Exception {
System.out.println("window : [" + window.getStart() + ", " + window.getEnd() + "]");
ArrayList<SensorReading> list = new ArrayList<>((Collection<? extends SensorReading>) input);
list.forEach(out::collect);
}
});
result.print();
env.execute();
}
}
输入数据为
sensor1,1000000000000,2
sensor1,1000000000000,2
sensor1,1000000005000,9
sensor1,1000000010000,3
sensor1,1000000002000,10
sensor1,1000000015000,11
sensor1,1000000020000,7
输出数据如下,可见相同的输入数据,采用单调递增的方式生成的数据比另外两种方式得到的数据多一个窗口,因为是递增生成watermark,所以如果当前数据大于watermark,那么watermark就更新为当前数据,在最后一条数据进来时,watermark更新为1000000020000,正好到达window[1000000015000, 1000000020000)的windowend,那么就会关闭窗口并输出结果。
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
window : [1000000005000, 1000000010000]
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=9.0}
window : [1000000010000, 1000000015000]
SensorReading{id='sensor1', timeStamp=1000000010000, temperature=3.0}
window : [1000000015000, 1000000020000]
SensorReading{id='sensor1', timeStamp=1000000015000, temperature=11.0}
解决思路:将窗口中的数据进行排序,具体代码如下
public class SolveOutOfOrder {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 默认是200ms
env.getConfig().setAutoWatermarkInterval(1L);
DataStreamSource<String> source = env.socketTextStream("192.168.177.211", 7777);
SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
data.split(",")[0].trim(),
Long.parseLong(data.split(",")[1].trim()),
Double.parseDouble(data.split(",")[2].trim())
)
).returns(SensorReading.class);
SingleOutputStreamOperator<SensorReading> result = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(10))
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
@Override
public long extractTimestamp(SensorReading element, long recordTimestamp) {
return element.getTimeStamp();
}
})).keyBy(SensorReading::getId)
.timeWindow(Time.seconds(10))
.process(new ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void process(String s, Context context, Iterable<SensorReading> elements, Collector<SensorReading> out) throws Exception {
System.out.println("window : [" + context.window().getStart() + ", " + context.window().getEnd() + ")");
ArrayList<SensorReading> list = new ArrayList<SensorReading>((Collection<? extends SensorReading>) elements);
list.sort((o1, o2) -> (int) (o1.getTimeStamp() - o2.getTimeStamp()));
list.forEach(out::collect);
}
});
result.print();
env.execute();
}
}
输入数据如下
sensor1,1000000010000,3 watermark = 1000000000000
sensor1,1000000004000,9 watermark = 1000000000000
sensor1,1000000005000,8 watermark = 1000000000000
sensor1,1000000003000,7 watermark = 1000000000000
sensor1,1000000020000,8 watermark = 1000000010000 关闭[1000000000000, 1000000010000)这个窗口,并对数据进行排序
输出如下,将最终数据按照timeStamp进行排序
window : [1000000000000, 1000000010000)
SensorReading{id='sensor1', timeStamp=1000000003000, temperature=7.0}
SensorReading{id='sensor1', timeStamp=1000000004000, temperature=9.0}
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=8.0}
在生产环境中,使用watermark能够解决数据的乱序问题,但是不能解决数据迟到的情况,如果想要处理迟到的数据,那么需要使用allowedLateness()方法
/** currentWatermark - lateness < currentTime 那么这条数据会被重新计算, 到达水印后超过指定时间的元素将被删除
* Sets the time by which elements are allowed to be late. Elements that
* arrive behind the watermark by more than the specified time will be dropped.
* By default, the allowed lateness is {@code 0L}.
* 某条数据属于某个窗口,但是watermark超过了窗口的结束时间+延迟时间,则该条数据会被丢弃; 未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间,则该条数据会被丢弃;
* <p>Setting an allowed lateness is only valid for event-time windows.
*/
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
final long millis = lateness.toMilliseconds();
checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
this.allowedLateness = millis;
return this;
}
但是我们不可能等待所有的迟到数据,但是我们由不想把这些数据丢弃,那么就需要用到侧输出流
测试代码如下,将最大迟到时间设置为1s
public class SolveLateness {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 默认是200ms
env.getConfig().setAutoWatermarkInterval(1L);
DataStreamSource<String> source = env.socketTextStream("192.168.177.211", 7777);
SingleOutputStreamOperator<SensorReading> stream = source.map(data -> new SensorReading(
data.split(",")[0].trim(),
Long.parseLong(data.split(",")[1].trim()),
Double.parseDouble(data.split(",")[2].trim())
)
).returns(SensorReading.class);
OutputTag<SensorReading> laterTag = new OutputTag<SensorReading>("laterData"){};
// 间歇性生成 watermark, 设置最长容忍乱序数据时间为5
SingleOutputStreamOperator<SensorReading> result = stream.assignTimestampsAndWatermarks(WatermarkStrategy.<SensorReading>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner(new SerializableTimestampAssigner<SensorReading>() {
@Override
public long extractTimestamp(SensorReading element, long recordTimestamp) {
return element.getTimeStamp();
}
})).keyBy(SensorReading::getId)
.timeWindow(Time.seconds(5))
// 允许数据的最大时间
.allowedLateness(Time.seconds(1))
// 采用侧输出流对迟到数据进行处理
.sideOutputLateData(laterTag)
.apply(new WindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
@Override
public void apply(String s, TimeWindow window, Iterable<SensorReading> input, Collector<SensorReading> out) throws Exception {
System.out.println("window : [" + window.getStart() + ", " + window.getEnd() + "]");
ArrayList<SensorReading> list = new ArrayList<>((Collection<? extends SensorReading>) input);
list.forEach(out::collect);
}
});
result.print();
result.getSideOutput(laterTag).print("later data");
env.execute();
}
}
输入数据如下
sensor1,1000000000000,2 watermark = 999999995000 处在window : [1000000000000, 1000000005000)
sensor1,1000000005000,9 watermark = 1000000000000 处在window : [1000000005000, 1000000010000)
sensor1,1000000010000,3 watermark = 1000000005000 关闭window : [1000000000000, 1000000005000),window: [1000000005000, 1000000010000)进行中
sensor1,1000000002000,10 watermark = 1000000005000 1000000002000属于[1000000000000, 1000000005000)这个窗口 1000000005000 + 1000 > watermark,重新触发窗口[0, 5]计算
sensor1,1000000004000,9 watermark = 1000000005000 1000000004000属于[1000000000000, 1000000005000)这个窗口 1000000005000 + 1000 > watermark,重新触发窗口[0, 5]计算
sensor1,1000000005000,8 watermark = 1000000005000 数据属于处在window : [1000000005000, 1000000010000)窗口
sensor1,1000000015000,11 watermark = 1000000010000 关闭window[1000000005000, 1000000010000)
sensor1,1000000004000,11 watermark = 1000000010000 1000000004000属于[1000000000000, 1000000005000)这个窗口 1000000005000 + 1000 < watermark, 侧输出流输出
输出数据如下
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
SensorReading{id='sensor1', timeStamp=1000000002000, temperature=10.0}
window : [1000000000000, 1000000005000]
SensorReading{id='sensor1', timeStamp=1000000000000, temperature=2.0}
SensorReading{id='sensor1', timeStamp=1000000002000, temperature=10.0}
SensorReading{id='sensor1', timeStamp=1000000004000, temperature=9.0}
window : [1000000005000, 1000000010000]
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=9.0}
SensorReading{id='sensor1', timeStamp=1000000005000, temperature=8.0}
later data> SensorReading{id='sensor1', timeStamp=1000000004000, temperature=11.0}
总结:
如果对一个窗口的数据流执行allowedLateness())
方法后,如果由迟到数据
当前watermark > 这个数据所属的窗口的endTime +lateness
那么这条数据被丢弃,不去计算当前watermark < 这个数据所属的窗口的endTime +lateness
那么重新触发这条数据所属的窗口计算