当前位置: 首页 > 知识库问答 >
问题:

Apache Flink是否支持具有相同时间戳的多个事件?

谭敏学
2023-03-14

在某些场景中,Apache Flink似乎无法很好地处理具有相同时间戳的两个事件。

根据文档,水印t表示任何新事件的时间戳都将严格大于t。除非您完全放弃两个事件具有相同时间戳的可能性,否则您将无法安全地发出t的水印。强制使用不同的时间戳还将系统每秒可处理的事件数限制为1000。

这真的是Apache Flink中的一个问题还是有解决方法?

对于那些希望使用具体示例的人,我的用例是为事件时间顺序流构建每小时聚合滚动字数。对于我复制到文件中的数据示例(请注意重复的9):

mario 0
luigi 1
mario 2
mario 3
vilma 4
fred 5
bob 6
bob 7
mario 8
dan 9
dylan 9
dylan 11
fred 12
mario 13
mario 14
carl 15
bambam 16
summer 17
anna 18
anna 19
edu 20
anna 21
anna 22
anna 23
anna 24
anna 25

以及代码:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                        setTimer(ctx.timerService(), value.getTimestamp());
                    }

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                    if (currentWatermark < Long.MAX_VALUE) {
                        setTimer(ctx.timerService(), currentWatermark);
                    }
                }

                private void setTimer(TimerService service, long t) {
                    service.registerEventTimeTimer(((t / 10) + 1) * 10);
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            String[] wordsAndTimes = value.split(" ");
            out.collect(new TimestampedWord(wordsAndTimes[0], Long.parseLong(wordsAndTimes[1])));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        long timestamp = value.getField(2);
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + (timestamp - 10) + "-" + (timestamp - 1));
    }
}

我明白了

    mario=4 at 1-10
    dylan=2 at 1-10
    luigi=1 at 1-10
    fred=1 at 1-10
    bob=2 at 1-10
    vilma=1 at 1-10
    dan=1 at 1-10
    vilma=1 at 10-19
    luigi=1 at 10-19
    mario=6 at 10-19
    carl=1 at 10-19
    bambam=1 at 10-19
    dylan=2 at 10-19
    summer=1 at 10-19
    anna=2 at 10-19
    bob=2 at 10-19
    fred=2 at 10-19
    dan=1 at 10-19
    fred=2 at 9223372036854775797-9223372036854775806
    dan=1 at 9223372036854775797-9223372036854775806
    carl=1 at 9223372036854775797-9223372036854775806
    mario=6 at 9223372036854775797-9223372036854775806
    vilma=1 at 9223372036854775797-9223372036854775806
    edu=1 at 9223372036854775797-9223372036854775806
    anna=7 at 9223372036854775797-9223372036854775806
    summer=1 at 9223372036854775797-9223372036854775806
    bambam=1 at 9223372036854775797-9223372036854775806
    luigi=1 at 9223372036854775797-9223372036854775806
    bob=2 at 9223372036854775797-9223372036854775806
    dylan=2 at 9223372036854775797-9223372036854775806

注意dylan=2在0-9的位置应该是1

共有1个答案

王君墨
2023-03-14

不,具有相同时间戳的流元素没有问题。但水印是一种断言,即所有后续事件的时间戳都将大于水印,因此这确实意味着您无法在时间t安全地为流元素发出水印t,除非流中的时间戳严格地单调递增——如果有多个事件具有相同的时间戳,则情况并非如此。这就是为什么AscendingTimestampExtractor产生等于currentTimestamp-1的水印,您也应该这样做。

请注意,您的应用程序实际上报告dylan=2在0-10,而不是0-9。这是因为dylan在时间11产生的水印触发了第一个计时器(计时器设置为时间10,但由于没有时间戳为10的元素,因此直到“dylan 11”的水印到达时,计时器才会触发)。您的PrintlnSink使用时间戳1来表示时间跨度的上限,因此是11-1或10,而不是9。

您的ProcessFunction的输出没有任何问题,如下所示:

(mario,4,11)
(dylan,2,11)
(luigi,1,11)
(fred,1,11)
(bob,2,11)
(vilma,1,11)
(dan,1,11)
(vilma,1,20)
(luigi,1,20)
(mario,6,20)
(carl,1,20)
(bambam,1,20)
(dylan,2,20)
...

的确,到11岁时,已经有了两个Dylan。但PrintlnSink的报告具有误导性。

要使示例按预期工作,需要更改两件事。首先,水印需要满足水印契约,目前情况并非如此,其次,加窗逻辑不太正确。ProcessFunction需要为“dylan 11”事件做好准备,以便在触发关闭0-9窗口的计时器之前到达。这是因为“dylan 11”流元素位于流中由其生成的水印之前。

更新:时间戳超出当前窗口的事件(如“dylan 11”)可以由

  1. 跟踪当前窗口何时结束
 类似资料:
  • 我将AWS S3触发器配置为带有PUT操作的lambda函数。 每2分钟上载100KB大小的.txt文件。 有时S3会以相同的事件和时间触发lambda两次。 触发器1: 触发器2: 我怎样才能避免这种情况呢?

  • 我一直在搜索Amazon real time transcription是否支持单词级别的时间戳,我发现最接近的是AWS Transcripbe Medical确实支持它。这里有人知道AWS是否支持这个吗?如果是这样,你能给我这个描述的链接和/或如何开始它?

  • 至少一次语义:如果生产者从Kafka代理接收到确认(ack),并且acks=all,则表示消息已准确写入Kafka主题一次。但是,如果生产者确认超时或收到错误,它可能会在假定消息未写入Kafka主题的情况下重试发送消息。如果代理在发送ack之前失败,但在消息成功写入Kafka主题之后失败,则此重试会导致消息被写入两次,并因此多次传递给最终使用者。 我知道时间戳是根据消息从生产者发送的时间设置的。如

  • 因此,我可以像为TCP套接字一样为每个客户端创建一个不同的UDP套接字吗? 注意:如果可能,请使用类来解释,而不是类。 代码说明:我以为建立TCP连接(Connect/Accept)后可以在两边(客户机/服务器)绑定一个UDP套接字。对于服务器上只有一个客户机是很好的。对于新客户机,server会引发异常,因为它无法将多个套接字与一个localEndPoint绑定在一起。和客户端可以超过所有可用端

  • 我有一个复杂的Kafka流应用程序,在同一个流中有两个完全有状态的流: null 主要目标是提供一个工作流系统。 详细的逻辑是: 执行是任务运行列表 查看所有的所有当前状态,并查找要执行的下一个 如果找到任何任务,则执行更改它们的并添加下一个任务并发布回Kafka,同时将要完成的任务()发送到另一个队列 在Kafka流之外继续并使用简单的Kafka使用者和生产者发布回另一个队列( ) 在当前中的更

  • 问题内容: 我们有n个变量,它们没有任何结构。 例如在python中,我可以这样做: 在Java中,我必须这样做: 您知道一种改进此语法的简单方法吗?(想象一下很长的变量名以及很多) 谢谢。 问题答案: 如果您有很多这样的变量,是否考虑过将它们放在集合中,而不是将它们作为单独的变量?此时有多种选择。 如果发现自己经常这样做,则可能要编写辅助方法,可能使用varargs语法。例如: glowcode