当前位置: 首页 > 知识库问答 >
问题:

kafka源流上的事件时间窗口

那博瀚
2023-03-14
    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;

                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);

    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

为什么会这样?如果我在“assigntimestamps(timestampExtractor)”之前添加“keyby(keySelector)”,那么程序可以工作。有人能解释一下原因吗?

共有1个答案

赫连黎昕
2023-03-14

您受到Flink:Flink-3121中的一个已知bug的影响:水印转发对不产生任何数据的源不起作用。

问题是,有更多的FlinkKafkaConsumer正在运行(最有可能是CPU核的数量(比如4)),而您有更多的分区(1)。只有一个Kafaka消费者在发射水印,其他消费者都在空转。

窗口操作员没有意识到这一点,等待所有使用者的水印到达。这就是为什么窗户永远不会触发的原因。

 类似资料:
  • 我有一个关于Kafka流的时间窗的问题,有些概念真的让我困惑。 我们有一个主题每天获得1000万个事件,我们有6天的日志保留,所以总的主题包含6000万个事件。 现在,我创建了一个KTable,我正在执行load all操作并迭代事件。正如我之前提到的,实际上我们只是当前的事件,而不是6千万事件,所以我在KTable定义中窗口化了这些数据。 现在,当我用以下语句加载所有事件时,一切都运行良好。 问

  • 我正在运行一个简单的示例来测试基于EventTime的Windows。我能够生成带有处理时间的输出,但当我使用EventTime时,没有输出。请帮助我明白我做错了什么。

  • 我试图在Kafka流之上实现一个简单的CQRS/Event sourcing概念验证(如https://www.confluent.io/blog/event-sourcing-using-apache-kafka/所述) 我有4个基本部分: 命令处理器-命令流,左与聚合状态KTABLE连接。对于结果流中的每个条目,使用函数生成结果事件,并将它们发布到主题 问题是--有没有办法确保我在州存储中有聚

  • 我们使用流,并将每个消息发布到另一个主题,该主题按用户id对记录进行分区(按用户id重新分区原始流)。 然后我们消耗这个重新分区的流,我们将消耗的记录存储在加窗10分钟的本地状态存储中。一个特定用户的所有点击总是在同一个分区中,但顺序并不保证,因为最初的主题有10个分区。 我理解Kafka流的窗口模型,当新记录进入时,时间会提前,但我需要这个窗口使用处理时间,而不是事件时间,然后当窗口过期时,我需