当前位置: 首页 > 知识库问答 >
问题:

Flink:窗口不处理流末尾的数据

景阳平
2023-03-14

我有一个流(KafkaMSG流到一个主题上),有一个flinkKafka消费者,我注意到一个有趣的行为,我想解决这个问题。

当数据正在流入时,如果它在窗口“完成”之前停止,或者如果数据结束(在几个窗口之后)并且没有到达窗口的末尾,则管道的其余部分不会触发。

流程示例:

    env.addSource(kafkaConsumer)
       .flatMap(new TokenMapper())
       .keyBy("word")
       .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
       .reduce(new CountTokens())
       .flatMap(new ConvertToString())
       .addSink(producer);

我正在使用的Flink Kafka消费者010与env时间特性设置为事件时间。和consumer.assign时间戳和水印(新周期水印())

private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>{

    private long currentMaxTimestamp;
    private final long maxOutOfOrderness;

    public PeriodicWatermarksAuto(long maxOutOfOrderness){
        this.maxOutOfOrderness = maxOutOfOrderness;
    }

    @Override
    public Watermark getCurrentWatermark() {
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

    @Override
    public long extractTimestamp(String t, long l) {
        // this should be the event timestamp
        currentMaxTimestamp = l;
        logger.info("TIMESTAMP: " + l);
        return l;
    }
}

如果我的窗口是10秒,而我的数据流只包含8秒的数据(然后在一段时间内停止流),那么flatMap-

数据流处理问题示例:(每个x是每秒一段数据)

      xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
      ^(not processed)           (until I get here)^

类似地,例如,如果我有35秒的流式数据(我的窗口是10秒),则只有3个窗口的数据触发,其余5秒的数据从不处理。

     ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
         (processed)        ^(not processed)          (until I get here)^

最后,如果我的窗口是10秒,而我只有5秒的流数据平面映射-

我的问题是,如果我们在一段时间后看不到数据,有没有办法触发窗口数据进行处理?

如果我的数据是实时传输的,我可以看到没有数据的延伸,并且不希望最后一个窗口(比如说只有5秒的数据)必须等待一些不确定的时间,直到新数据进来,我希望在窗口时间过去后,最后一个窗口的结果。

大声想想,这似乎是因为使用EventTime而不是ProcessingTime,或者,我的水印没有正确生成,最后一个窗口无法实际触发。。。不确定也许两者兼而有之?我认为这对任何人来说都是一个问题,如果流结束时最后一位没有触发。我想说,我可能会发送一个流结束消息,但如果流结束,这并没有帮助,因为源中断流。

编辑:所以我改为流转时长,它确实正确处理了最后一个窗口中的数据,所以我想EventTime毕竟是罪魁祸首,我认为自定义触发器或适当的窗口水印可能是答案...

谢谢你的帮助!

共有1个答案

岳曦
2023-03-14

我会把这个留给后代,因为这个问题和我想的一样,与水印有关。时间戳和水印制作器(来自任务时间戳和水印)调用'getMONtWatermark()',因为我将基于传入实体的水印设置为固定数量(它们的时间戳-最大偏移量),所以它不会更新,直到它看到一个新的实体。

我的解决方案是某种计时器,如果在可配置的时间内没有看到数据,则最终将水印提前到下一个窗口。我将无法处理非常潜在的数据,但我不认为这应该是一个问题。这是EventTime处理的预期行为。

 类似资料:
  • 我有一些问题。 基于类中的时间戳,我想做一个逻辑,排除在1分钟内输入N次或更多次的数据。 UserData类有一个时间戳变量。 起初我试着用一个翻滚的窗户。 但是,滚动窗口的时间计算是基于固定时间的,因此无论UserData类的时间戳如何,它都不适合。 如何处理流上窗口UserData类的时间戳基? 谢谢。 附加信息 我使用这样的代码。 我试了一些测试。150个样本数据。每个数据的时间戳增加1秒。

  • 假设存在一个有限的 DataStream(例如,来自数据库源)和事件 < li> 。 如何将另一个事件< code>b追加到此流以获取 (即在所有原始事件之后输出添加的事件,保持原始顺序)? 我知道所有有限流在所有事件之后都会发出< code>MAX_WATERMARK。那么,有没有办法“抓住”这个水印,输出它之后的附加事件呢? (不幸的是,<代码>。union()将原始数据流与由单个事件组成的另

  • 我有flink stream,我在某个时间窗口上计算一些事情,比如说30秒。 这里发生的事情是,它给我的结果,我以前的窗口聚合以及。 假设前30秒我得到结果10。 接下来的三十秒我想要新的结果,而不是我得到最后一个窗口的结果新的等等。 因此,我的问题是如何为每个窗口获得新的结果。

  • null 其中lambda1、2等是条件检查函数,例如 但不知什么原因对我不起作用,也许还有其他方法?正如我从文档(https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html)中了解到的,OutputTag用于创建标记为tag的附加消息。还是我错了?

  • 下面是我的流处理的伪代码。 上面的代码流程正在创建多个文件,我猜每个文件都有不同窗口的记录。例如,每个文件中的记录都有时间戳,范围在30-40秒之间,而窗口时间只有10秒。我预期的输出模式是将每个窗口数据写入单独的文件。对此的任何引用或输入都会有很大帮助。