当前位置: 首页 > 知识库问答 >
问题:

Apache Flink中带有TumblingWindow的水印

鲜于雨石
2023-03-14

我试图了解Apache FLink中Windows和Watermark生成之间的依赖关系,我在下面的示例中出现错误:

   public static void main(String[] args) throws Exception {

         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.getConfig().setAutoWatermarkInterval(10000);

        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("watermarkFlink", new SimpleStringSchema(), props);
        DataStream<String> orderStream = env.addSource(kafkaSource);
        DataStream<Order> dataStream = orderStream.map(str -> {
                    String[] order = str.split(",");
                    return new Order(order[0], Long.parseLong(order[1]), null);
                });

        WatermarkStrategy<Order> orderWatermarkStrategy = CustomWatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner((element, timestamp) ->
                        element.getTimestamp()
                );
        dataStream
            .assignTimestampsAndWatermarks(orderWatermarkStrategy)
            .map(new OrderKeyValue())
            .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                @Override
                public String getKey(Tuple2<String, Integer> src) throws Exception {
                    return src.f0;
                }
            })
            .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(5)))
            .sum(1)
            .print("Windows");

            dataStream.print("Data");

            env.execute();
        }


    public static class OrderKeyValue implements MapFunction<Order, Tuple2<String, Integer>> {
        @Override
        public Tuple2<String, Integer> map(Order order) {
            return new Tuple2<>(order.getCategory(), 1);
        }
    }

这里的时间戳是一个长的,我们可以从Kafka源中检索到,应该是:a,4 C,8,其中C是类别,5是时间戳。

每当我发送事件时,数据流都会打印,但不会使用窗口打印这些事件(打印(“Windows”)。此外,如果我收到一个事件A,12,然后生成了一个水印(在10秒内),那么我有C,2,它出现在第一个窗口关闭之后,它会在窗口中处理还是被忽略?

共有1个答案

洪富
2023-03-14

Flink文档中有一个教程可以帮助澄清这些概念:https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/streaming_analytics/

但总结一下情况:

>

  • 如果您有一个类似(A,4)(C,8)(A,12)的事件流,那么这些整数将被解释为毫秒。

    您的第一个窗口将等待20000水印被触发。

    要生成如此大的水印,您需要一个时间戳至少为21000的事件(因为绑定无序度设置为1秒)。

    而且由于您已将自动水印间隔配置为10秒,因此您的应用程序必须运行很长时间才能生成第一个水印。(我想不出设置这么大的水印间隔有什么帮助。)

    如果事件在其窗口关闭后到达,则它将被忽略(默认情况下)。您可以配置允许的延迟以安排延迟事件来触发其他窗口触发。

  •  类似资料:
    • 我在Flink中做实时流,其中Kafka是消息队列。我正在应用120秒的EventTimeSlidingWindow。和1秒的幻灯片。我还在事件时间的每秒插入水印。 我担心的是,如果元素在水印之后延迟出现,会发生什么?现在,我的情况是,Flink简单地丢弃了相应水印之后的消息。filnk是否提供了任何机制来处理此类延迟消息,例如维护单独的窗口?我也看过了文档,但我没有弄清楚。

    • 问题内容: 我正在尝试在中间添加一些文本的水平尺。例如: -----------------------------------我的名字在这里------------ ----------------- 有没有办法在CSS中做到这一点?没有所有“-”的破折号很明显。 问题答案: 这大致就是我要做的:通过在containing上设置a ,然后再给a small 来创建该行。然后将文本放在具有非透明背

    • 我想做的从右到左滚动与我猜将是一个recylcer视图,但不确定是否是这个。我只是需要有人给我指明正确的方向。我想我需要一个内部布局的回收视图。 如果有人能指点我在哪里找到答案,我会变得很伟大!

    • 问题内容: 如标题中所述,我一直在尝试通过水平滚动设置某种垂直流布局。布局中的组件将是JLabels。让我画一幅画: 同一窗口,垂直扩展 因此,标签将填充可用的垂直空间,然后创建一个新列。一旦可用的水平空间用尽,就会出现一个 水平 滚动条。 垂直滚动条通常不应出现;但是,如果窗口的垂直高度异常小,最好有一个垂直滚动条。 任何帮助是极大的赞赏。我是Java的新手,所以任何其他解释都将是很棒的。谢谢!

    • 安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?

    • 我正在尝试使用docker+bitbucket管道进行自动发布;不幸的是,我有个问题。我阅读了Docker Hub上的管道部署说明,并创建了以下模板: 我已经完成了数据,但是在执行推送之后,当构建开始时,我得到了以下错误: