我们正在接收来自多个独立数据源的事件,因此,到达我们Flink拓扑(通过Kafka)的数据将是无序的。
我们正在Flink拓扑中创建1分钟的事件时间窗口,并在源操作符处生成事件时间水印(当前事件时间-某些阈值(30秒))。
如果一些事件在设置的阈值之后到达,那么这些事件将被忽略(在我们的例子中这是可以的,因为属于该分钟的大多数事件都已经到达并在相应的窗口中得到处理)。
现在的问题是,如果程序崩溃(无论出于何种原因),然后从最后一个成功的检查点再次恢复,无序到达的事件将触发过去(已处理)窗口的执行(该窗口中只有一小部分事件)覆盖上一个检查点的结果。该窗口的计算。
如果Flink检查了事件时间水印,则不会发生此问题。
所以,我想知道是否有一种方法可以在Flink中强制执行事件时间水印的检查点...
我认为最简单的解决方案是在窗口操作符之后插入ProcessFunction
。
ProcessFunction
可通过其上下文
对象访问当前水印,并可将其存储在联合运算符状态。如果出现故障,ProcessFunction
将水印从其状态恢复,并过滤时间戳小于水印的所有记录(时间戳也可通过上下文
obejct访问)。
虽然这是一个老问题,但我也有同样的问题。应用程序正在重新启动,具有事件时间窗口的连接函数不再触发,因为来自其中一个流的事件在崩溃前完成。连接可以恢复状态,但是由于其中一个流不再有水印,事件在重启后永远不会连接。
我找到的解决方案是在源操作符之后为最新的水印创建一个检查点。由于没有UDF来保存水印的快照,我必须创建自己的操作符,该操作符不会更改事件(标识函数),并将最新的水印保存为其状态。当Flink从崩溃中恢复时,WatermarkStreamOperator。InitializeEstate()
发出列表状态上的最后一个水印检查点
public class WatermarkStreamOperator<IN> extends AbstractUdfStreamOperator<IN, WatermarkFunction<IN>>
implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
private ListState<Long> latestWatermark;
public WatermarkStreamOperator(WatermarkFunction<IN> mapper) {
super(mapper);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@Override
public void initializeState(StateInitializationContext context) throws Exception { System.out.println("WatermarkStreamOperator.initializeState");
super.initializeState(context);
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("latest-watermark", Long.class);
latestWatermark = context.getOperatorStateStore().getListState(descriptor);
List<Long> watermarkList = new ArrayList<>();
latestWatermark.get().forEach(watermarkList::add);
Long maxWatermark = watermarkList.stream().max(Long::compare).orElse(0L);
if (!maxWatermark.equals(Long.valueOf(0l))) {
System.out.println("watermarkList recovered max: " + maxWatermark);
processWatermark(new Watermark(maxWatermark));
}
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
System.out.println("processing watermark: " + mark.getTimestamp()); latestWatermark.update(Arrays.asList(mark.getTimestamp()));
super.processWatermark(mark);
}
}
以及操作员的标识UDF:
public interface WatermarkFunction<T> extends Function, Serializable {
T process(T value) throws Exception;
}
最后,我使用
。转换()
调用我的Watermark Stream算子
与MyTupleWatermark Func
。
DataStream<Tuple2<String, Integer>> dataStream = env
.addSource(new MySource(sentence))
.transform("myStatefulWatermarkOperator",
TypeInformation.of(String.class),
new WatermarkStreamOperator<>(new MyTupleWatermarkFunc()))
...
...
public class MyTupleWatermarkFunc implements WatermarkFunction<String> {
private static final long serialVersionUID = 1L;
@Override
public String process(String value) throws Exception {
return value;
}
}
下面是我为这个https://github.com/felipegutierrez/explore-flink/blob/master/docker/ops-playground-image/java/explore-flink/src/test/java/org/sense/flink/examples/stream/operator/watermark/WatermarkStreamOperatorTest.java创建的单元和集成测试
我试图使用process函数对一组事件进行处理。我正在使用事件时间和键控流。我面临的问题是,水印值始终为9223372036854725808。我已经对print语句进行了调试,它显示如下: 时间戳------1583128014000提取时间戳1583128014000当前水印------9223372036854775808 时间戳------1583128048000提取时间戳1583128
我想在flink中测试一次端到端的处理。我的工作是: Kafka资料来源- 我在mapper1中放了一个< code > thread . sleep(100000),然后运行了这个作业。我在停止作业时获取了保存点,然后从mapper1中删除了< code > thread . sleep(100000),我希望该事件应该会被重放,因为它没有下沉。但这并没有发生,乔布斯正在等待新的事件。 我的Ka
查看Flink的留档和书籍,我对时间戳有疑问:如果流设置为事件时间模式,这意味着时间戳在进入Flink之前具有源的时间(甚至在通过消息传递队列之前,可能是Kafka),为什么Flink将时间戳作为元数据附加到记录中?幻灯片3根据它们所占的内容具有不同类型的时间戳:https://www.slideshare.net/dataArtisans/apache-flink-training-time-a
null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有
我有一份流媒体工作: 读Kafka-- 启动时,一切都很好。问题是,过了一段时间,磁盘空间被我认为是链接检查点的东西填满了。 我的问题是,在链接作业运行时是否应该清除/删除检查点?找不到此上的任何资源。 我使用的是写入/tmp的文件系统后端(无hdfs设置)
请检查上面的代码,并告诉我是否做得正确。在事件时间和水印分配之后,我想在process函数中处理流,其中我将为不同的密钥收集10分钟的流数据。