当前位置: 首页 > 知识库问答 >
问题:

使用事件时间和时间戳赋值器时,Flink窗口联接不起作用

戎桐
2023-03-14

我刚刚遇到了一个非常奇怪的问题,当使用带有时间戳和水印赋值器的EventTime时,我无法从流窗口联接中获得任何结果。

我使用Kafka作为我的数据流源,并尝试了AscendingTimestampExtractor和自定义赋值器,它们实现了Flink留档中提到的Assignerwith周期水印,正如我测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用ProcessingTime和TumblingProcessingTimeWindows而没有任何时间戳赋值器,那么我可以得到正确的结果。

我的自定义时间戳和水印分配器的代码是这样的:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
...
public static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<String> {
        private long currentMaxTimestamp;
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) {
            long timestamp = myFunctionToGetMillisFromString(element);
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - 1L);
        }
}
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

我的AscendingTimestampExtractor代码如下:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});

FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

感谢您的帮助!

共有2个答案

韩麒
2023-03-14

我的消费3 =我的消费1 .分配***我的消费4 =我的消费2 .分配***

并使用myConsumer3 / myConsumer4,这将没问题

高森
2023-03-14

我有同样的问题,这是一个相当愚蠢的错误,我在这里找到了解决方案:

当你写作时:

myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

它会创建一个新的数据流而不是修改该流,并且您没有将其存储在变量中。所以底线是:

将其存储在新的数据流中,并将联接应用于此数据流(将分配这些时间戳和水印)。

 类似资料:
  • null 此窗口具有一个允许延迟为一分钟的BoundedOutoFordernesTimeStampExtractor。 水印:据我的理解,Flink和Spark结构化流中的水印定义为(max-event-timestamp-seen-so-far-alloged-lateness)。事件时间戳小于或等于此水印的任何事件都将被丢弃并在结果计算中忽略。 在此场景中,几个事件到达Flink运算符时具有

  • 我有一个带有数据和时间戳的记录日志,我的Flink应用程序按时间戳升序接收记录。在某个键的第一个项到达窗口后,我想在X事件时间后关闭窗口,检查是否有足够的项到达某个条件,并为该键发出通过或失败消息。 对于Flink中的基本窗口功能,这是不可能的吗?例如,如果我希望我的窗口有30秒长,但是键的第一个项在15秒到达,最后一个项在40秒到达,似乎窗口将在30秒关闭,并且该键的记录轨迹将是分成两个窗口。在

  • 查看Flink的留档和书籍,我对时间戳有疑问:如果流设置为事件时间模式,这意味着时间戳在进入Flink之前具有源的时间(甚至在通过消息传递队列之前,可能是Kafka),为什么Flink将时间戳作为元数据附加到记录中?幻灯片3根据它们所占的内容具有不同类型的时间戳:https://www.slideshare.net/dataArtisans/apache-flink-training-time-a

  • 我有数据流就像 事件名,事件id,Start_time(时间戳)... 在这里,我想对最后一个带有时间戳的字段<;code>;Start_。 因此,我在flink window中看到的是,所以我猜它需要过去30分钟的事件,但不考虑 我想把数据放在start_ time在最后30分钟内的位置,然后我如何编写转换?我是否需要使用该列使用? 我是Flink的新手。 谢啦

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢

  • 我正在研究一个Flink流式处理器,它可以从Kafka读取事件。这些事件由其中一个字段键控,并且在减少和输出之前应该在一段时间内加窗。我的处理器使用事件时间作为时间特性,因此从它所消耗的事件中读取时间戳。以下是它目前的样子: 我所知道的事件如下: null null