WaterMark是Flink用来处理时间乱序的一种机制。用来过滤掉由于网络或者其他原因,而迟来的脏数据。
WaterMark绝大部分时候是和eventTime配合使用,可能有的同学非要用Processing Time,那也是可以的,只要加上env.getConfig().setAutoWatermarkInterval(200); 这句话就可以了,是没有任何效果的,下文就讲到。
先上用法,本文都是按照时间进行处理的,暂时不介绍按照事件处理的,因为用到的极少极少。
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import javax.annotation.Nullable;
public class MyWaterMark implements AssignerWithPeriodicWatermarks<TestAA> {
Long currentMaxTimestamp = 0L;
Long maxOutOfOrderness = 10000L;//允许的最大乱序时间是10s
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp-maxOutOfOrderness);
}
/**
* 每来一条数据,就会调用这个方法
* 其中TestAA是我个人定义的一个pojo类,里面只有三个属性name,id,times
* @param element
* @param previousElementTimestamp
* @return
*/
@Override
public long extractTimestamp(TestAA element, long previousElementTimestamp) {
System.out.println("mark:"+element);
Long timestamp = element.getTimes();
currentMaxTimestamp = Math.max(timestamp,currentMaxTimestamp);
return timestamp;
}
}
先自定义一个 AssignerWithPeriodicWatermarks类,然后使用的话
DataStream<TestAA> marksStream = dataSource.map(new RichMapFunction<String, TestAA>() {
@Override
public TestAA map(String value) throws Exception {
String[] values = value.split(",");
TestAA aa = new TestAA();
aa.setName(values[0]);
aa.setId(values[1]);
aa.setTimes(Long.parseLong(values[2]));
return aa;
}
}).assignTimestampsAndWatermarks(new MyWaterMark());
这样用即可,将数据线转换成TestAA,这个pojo类,然后再注册上watermark即可,这样marksStream,你就可以当做一个普通的dataStram来使用。
1.先来说说,怎么以当前时间为基准来过滤数据,首先
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);或者是env.getConfig().setAutoWatermarkInterval(200);
如果使用的是EventTime,那么后面这句可以不用,如果使用的是Processing Time(默认就是Processing Time),那么后面那句一定要加上。这个后续说原因。接下来,只要将上面MyWaterMark 这个类中的getCurrentWatermark方法中的内容改为
return new Watermark(currentMaxTimestamp-System.currentTimeMillis());即可,这样用的就是当前处理时间了。
2.为什么Processing Time不能直接使用?我们来看看源码,下面是env.setStreamTimeCharacteristic()这个的方法。
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
他会设置autoWatermarkInterval这个属性的值,这个是用来设置获取watermark的时间间隔。如果是ProcessingTime的话,那么这个时间间隔就是0,其他时间是200ms。
那么有同学会问了,如果是0的话,那么不是时时刻刻在获取吗?其实不是的。
我们继续看源码,追踪这个属性。因为我们在注册我们自己定义的markwarks类的使用,使用的是dataStream的assignTimestampsAndWatermarks,这个方法。下面是这个方法的源码
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
// match parallelism to input, otherwise dop=1 sources could lead to some strange
// behaviour: the watermark will creep along very slowly because the elements
// from the source go to each extraction operator round robin.
final int inputParallelism = getTransformation().getParallelism();
final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
TimestampsAndPeriodicWatermarksOperator<T> operator =
new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
.setParallelism(inputParallelism);
}
这里可以看到,他会初始化一个TimestampsAndPeriodicWatermarksOperator,这个类,而在这个类是一个Function的子类。有我们常见的open以及close方法。open方法,总所周知,是在程序初始化的时候,运行的,大家看open方法的源码
@Override
public void open() throws Exception {
super.open();
currentWatermark = Long.MIN_VALUE;
watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
if (watermarkInterval > 0) {
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
}
看到这里,相信大家都知道了吧。这里会初始化watermark,为Long的最小值,也就是-9223372036854775808。并且,如果时间间隔为0的话,就不会启动下面的定时任务。下面的定时任务,源码如下:
@Override
public void onProcessingTime(long timestamp) throws Exception {
// register next timer
Watermark newWatermark = userFunction.getCurrentWatermark();
if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
currentWatermark = newWatermark.getTimestamp();
// emit watermark
output.emitWatermark(newWatermark);
}
long now = getProcessingTimeService().getCurrentProcessingTime();
getProcessingTimeService().registerTimer(now + watermarkInterval, this);
}
他会去调用我们自定义类MyWaterMark的getCurrentWatermark方法,来设定当前watermark,如果原来的watermark小于现在的wakermark值,那么就不生效,还是用原来大的值。最后再次注册定时任务,来获取watermark。
这里面now,就是System.currentTimeMillis(); 所以如果时间间隔不为0,那么下一次调用的时间就是 当前时间 + 方法运行的时间 + 时间间隔,由于方法运行的时间约等于0ms,所以基本就是每个时间间隔(默认200ms),运行一次获取wakermark的方法。
所以如果是ProcessingTime,那么默认时间间隔是0,所以matermarks时间就是一直-9223372036854775808,所以就一直不会过滤时间。
题外话,flink使用的定时任务使用的是java的ScheduledFuture。