当前位置: 首页 > 知识库问答 >
问题:

Flink事件时间处理水印总是-9223372036854725808

通沛
2023-03-14

我试图使用process函数对一组事件进行处理。我正在使用事件时间和键控流。我面临的问题是,水印值始终为9223372036854725808。我已经对print语句进行了调试,它显示如下:

时间戳------1583128014000提取时间戳1583128014000当前水印------9223372036854775808

时间戳------1583128048000提取时间戳1583128048000当前水印------9223372036854775808

时间戳------158312808900提取的时间戳158312808900当前水印------9223372036854775808

所以时间戳和提取的时间戳会改变,但水印不会更新。所以没有记录排队,因为context.timestamp永远不会少于水印。

DataStream<GenericRecord> dataStream = env.addSource(searchConsumer).name("search_list_keyless");
        DataStream<GenericRecord> dataStreamWithWaterMark =  dataStream.assignTimestampsAndWatermarks(new SessionAssigner());

       try {
            dataStreamWithWaterMark.keyBy((KeySelector<GenericRecord, String>) record -> {
                StringBuilder builder = new StringBuilder();
                builder.append(record.get("session_id"));
                builder.append(record.get("user_id"));
                return builder.toString();
            }).process(new MatchFunction()).print();
        }
        catch (Exception e){
            e.printStackTrace();
        }
        env.execute("start session process");

    }

    public static class SessionAssigner implements AssignerWithPunctuatedWatermarks<GenericRecord>  {
        @Override
        public long extractTimestamp(GenericRecord record, long previousElementTimestamp) {
            long timestamp = (long) record.get("event_ts");
            System.out.println("timestamp------"+ timestamp);
            return timestamp;
        }

        @Override
        public Watermark checkAndGetNextWatermark(GenericRecord record, long extractedTimestamp) {
            // simply emit a watermark with every event
            System.out.println("extractedTimestamp "+extractedTimestamp);
            return new Watermark(extractedTimestamp - 30000);
        }
 }

这是processFunction的代码。。。。

public class MatchFunction extends KeyedProcessFunction<String, GenericRecord, Object> {

    private ValueState<Tuple2<Long, PriorityQueue<GenericRecord>>> queueState = null;

    @Override
    public void open(Configuration config) throws Exception {
        System.out.println("open");
        ValueStateDescriptor<Tuple2<Long, PriorityQueue<GenericRecord>>> descriptor = new ValueStateDescriptor<>(
                "sorted-events", TypeInformation.of(new TypeHint<Tuple2<Long, PriorityQueue<GenericRecord>>>() {
        })
        );
        queueState = getRuntimeContext().getState(descriptor);
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception {
        Tuple2<Long, PriorityQueue<GenericRecord>> tuple = queueState.value();

        PriorityQueue<GenericRecord> records = tuple.f1;

    }

    @Override
    public void processElement(GenericRecord record, Context context, Collector<Object> collector) throws Exception {

        TimerService timerService = context.timerService();
        System.out.println("currentwatermark----"+ timerService.currentWatermark());
        if (context.timestamp() > timerService.currentWatermark()) {

            Tuple2<Long, PriorityQueue<GenericRecord>> queueval = queueState.value();
            PriorityQueue<GenericRecord> queue = queueval.f1;
            long startTime = queueval.f0;
            System.out.println("starttime----"+ startTime);

            if (queue == null) {
                queue = new PriorityQueue<>(10, new TimeStampComprator());
                startTime = (long) record.get("event_ts");
            }
            queueState.update(new Tuple2<>(startTime, queue));
            timerService.registerEventTimeTimer(startTime + 5 * 60 * 1000);
        }
    }

}

共有1个答案

司寇光华
2023-03-14

以下是你分享的一个可能的解释:

时间戳和标点水印操作符在调用给定记录的checkAndGetNextWatermark之前调用提取时间戳。这意味着在每个任务(并行实例)中第一次调用匹配函数中的processElement时,当前水印将很长。最小值(即-9223372036854775808)。

如果你的平行度足够大,那就可以解释看到

currentwatermark-----9223372036854775808

好几次。

 类似资料:
  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 我们正在接收来自多个独立数据源的事件,因此,到达我们Flink拓扑(通过Kafka)的数据将是无序的。 我们正在Flink拓扑中创建1分钟的事件时间窗口,并在源操作符处生成事件时间水印(当前事件时间-某些阈值(30秒))。 如果一些事件在设置的阈值之后到达,那么这些事件将被忽略(在我们的例子中这是可以的,因为属于该分钟的大多数事件都已经到达并在相应的窗口中得到处理)。 现在的问题是,如果程序崩溃(

  • Flink中关于事件时间处理的水印是什么?为什么需要它。?为什么在所有使用事件时间的情况下都需要它。在所有情况下,我的意思是,如果我不做窗户操作,那么为什么我们还需要一个水位线。我来自spark的背景。在spark中,只有在对传入事件使用windows时才需要水印。 我读过一些文章,在我看来,水印和窗口似乎是一样的。如果有差异,请解释并指出来 把你的回复贴出来,我读了一些。下面是一个更具体的查询<

  • 有人能正确解释事件时间戳和水印吗。我从文件中理解了,但不太清楚。一个真实的例子或外行的定义会有所帮助。此外,如果可能,请给出一个示例(以及一些可以解释它的代码片段)。提前感谢

  • 我想使用Flink的事件时间戳,并计划实现一个简单的emitWatermark,即系统。currentTimeInMillis-10秒。我的理解是,翻滚窗口将触发start\u time window\u间隔10秒。因此,如果事件晚于水印到达,则会删除这些事件。 有没有办法将Flink丢弃的所有事件写入S3这样的接收器?

  • 总之,我希望flatMap1()和flatMap2()按照我在事件中设置的时间戳的顺序被调用。但那不是真的。