我遇到了与Flink EventTime处理水印类似的问题-9223372036854725808。然而,建议的解决方案(设置并行度和禁用检查点)没有任何效果。在本例中,我只是将1000个事件以1秒的间隔流式传输,然后将事件时间戳与ctx进行比较。timerService()。currentWatermark()
>>> v=(61538659200000,0), watermark=-9223372036854775808
>>> v=(61538659201000,1), watermark=-9223372036854775808
>>> v=(61538660198000,998), watermark=-9223372036854775808
>>> v=(61538660199000,999), watermark=-9223372036854775808
public void watermarks()
throws Exception
{
final var env = StreamExecutionEnvironment.createLocalEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setMaxParallelism(1);
final long startMs = new Date(2020, 1, 1).getTime();
final var events = new ArrayList<Tuple2<Long, Integer>>();
for (var ii = 0; ii < 1000; ++ii ) {
events.add(new Tuple2<Long, Integer>(startMs + ii * 1000, ii));
}
env.fromCollection(events)
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Long, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> event.f0))
.setParallelism(1)
.keyBy(row -> row.f1 % 2)
.process(new ProcessFunction<Tuple2<Long, Integer>, String>()
{
@Override
public void processElement(
final Tuple2<Long, Integer> value,
final Context ctx,
final Collector<String> out)
throws Exception
{
out.collect("v=" + value + ", watermark=" + ctx.timerService().currentWatermark());
}
})
.setParallelism(1)
.print()
.setParallelism(1);
final var result = env.execute();
System.out.println(result);
}
forMonotonousTimestamps
是一个周期性水印生成器,仅在由计时器触发时生成水印。默认情况下,此计时器每200毫秒触发一次(这是autoWatermark Interval
)。您的作业运行时间不足以让此计时器触发。
有界源确实会生成一个水印,其时间戳设置为MAX_WATERMARK当它们到达输入末尾时——就在关闭作业之前。您在作业的输出中看不到这个水印,因为它后面没有事件。
如果您想为每个事件生成水印,您可以实现自定义水印策略,在Watermark Generator
(docs)的onEvent
方法中发出水印。这在生产中通常是个坏主意,因为您会在这些额外的水印上浪费CPU周期和网络带宽,但有时对于测试来说这很有帮助。
在Flink中,我发现了2种设置水印的方法, 第一个是 第二个是 我想知道哪个最终会生效。
我们正在构建一个流处理管道,以使用Flink v1.11和事件时间特性来处理Kinesis消息。在定义源水印策略时,在官方留档中,我遇到了两个开箱即用的水印策略;forBoundedOutOfOrthy和forMonotonousTimestamps。但根据我对上述内容的理解,我认为这些不适合我的用法。以下是我的用法细节: 来自输入流的数据:(包含每分钟带有时间戳的数据) 现在,我想处理11:00
我试图了解Apache FLink中Windows和Watermark生成之间的依赖关系,我在下面的示例中出现错误: 这里的时间戳是一个长的,我们可以从Kafka源中检索到,应该是:a,4 C,8,其中C是类别,5是时间戳。 每当我发送事件时,数据流都会打印,但不会使用窗口打印这些事件(打印(“Windows”)。此外,如果我收到一个事件A,12,然后生成了一个水印(在10秒内),那么我有C,2,
我们正在构建一个流处理管道来处理/摄取Kafka消息。我们正在使用Flink v1.12.2。在定义源水印策略时,在官方留档中,我遇到了两种开箱即用的水印策略;forBoundedOutOfOrness和forMonotonousTimestamps。我确实浏览了javadoc,但并不完全理解何时以及为什么你应该使用一种策略而不是另一种策略。时间戳基于事件时间。谢谢。
有1个高通量Kafka流定义如下 上述窗口操作符的水印正确转发。 上述窗口操作符中的需要使用一些保存在某些S3文件中的信息来丰富。S3文件很少更新。 S3文件作为流读取,然后广播以丰富中的元素。 然后连接这两个流,用类型的元素来丰富类型的所有元素。 有2个输入。其中之一是不断转发水印,但广播流没有任何时间信息或水印。这导致EnrichedAProcess的水印根本无法转发,因为它的一个输入没有传入
我在Flink中做实时流,其中Kafka是消息队列。我正在应用120秒的EventTimeSlidingWindow。和1秒的幻灯片。我还在事件时间的每秒插入水印。 我担心的是,如果元素在水印之后延迟出现,会发生什么?现在,我的情况是,Flink简单地丢弃了相应水印之后的消息。filnk是否提供了任何机制来处理此类延迟消息,例如维护单独的窗口?我也看过了文档,但我没有弄清楚。