当前位置: 首页 > 知识库问答 >
问题:

如何在Flink中实现键控窗口超时?

温开畅
2023-03-14

我对流中的事件进行了键控,我希望通过键来累积,直到超时(例如,5分钟),然后处理累积到该点的事件(忽略该键之后的所有内容,但首先是第一件事)。

我是一个新的Flink,但从概念上来说,我认为我需要一些类似下面代码的东西。

    DataStream<Tuple2<String, String>> dataStream = see
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .keyBy(0)
            .window(GlobalWindows.create())
            .trigger(ProcessingTimeTrigger.create()) // how do I set the timeout value?
            .fold(new Tuple2<>("", ""), new FoldFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                public Tuple2<String, String> fold(Tuple2<String, String> agg, Tuple2<String, String> elem) {
                    if ( agg.f0.isEmpty()) {
                        agg.f0 = elem.f0;
                    }
                    if ( agg.f1.isEmpty()) {
                        agg.f1 = elem.f1;
                    } else {
                        agg.f1 = agg.f1 + "; " + elem.f1;
                    }
                    return agg;
                }
            });

如何在Flink中完成键控窗口超时?

共有1个答案

南宫胡媚
2023-03-14

如果您使用keyedprocessfunction来处理这个问题,将会轻松得多。

我建议在keyedProcessFunction的open()方法中建立一个keyedListState项。在processElement()方法中,如果列表为空,则设置一个处理时间计时器(每键计时器,相对于当前时间),以便在希望窗口结束时触发。然后将传入事件追加到列表中。

当计时器触发时,将调用onTimer()方法,您可以对列表进行迭代,生成一个结果,并清除列表。

文档描述了如何使用流程函数以及如何使用状态。你可以在Flink培训站点找到额外的例子,比如这个练习。

 类似资料:
  • 在Flink中,是否可以计算键控窗口的聚合输出? 我们有一个数据流,我们调用byKey()指定一个由字符和数字组成的字段(例如A01、A02…A10、B01、B02…B10等),就像棋盘上的方块一样。在之后,我们调用,因此我们创建了一个每周窗口。在此之后,我们调用,结果我们得到了

  • 我正在编写一个Flink应用程序,它使用kafka主题中的时间序列数据。时间序列数据包含度量名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口来根据度量键(度量名称、键值对和时间戳的组合)聚合数据。这里是主流看起来像 我还想检查是否有任何指标在上面的窗口外迟到。我想检查有多少指标延迟到达,并计算延迟指标与原始指标相比的百分比。我正在考虑使用flink的“允许延迟”功能将延迟指标发送到不同

  • 主要内容:1.窗口概述,2.窗口分类,3.细分,4.窗口Api,5.窗口分配器 Window Assigners,6.窗口函数,7.TopN 实例1.窗口概述 Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 在 Flink 中, 窗口就是用来处理无界流的核心。我们很容易把窗口想象成一个固定位置的“框”,数据源源不断地流过来,到某个时间点窗口

  • 我使用的是EventTime,它从Kafka记录中的一个字段中派生出时间戳。在apply运算符之后,事件不再有时间戳记录,而是有一个很长的WindowStartTime。

  • 我正在构建一个具有以下目标的 Flink 应用程序: 将事件收集到键控的非活动触发会话窗口中 尽早发出输入事件的副本,并通过会话引用进行增强 打开和关闭会话时发出会话更新以及收集的会话统计信息(在会话关闭时) 通过滚动窗口,我已经能够实现上述目标,但在会话窗口中我没有成功。 我的窗口处理代码如下 来生成我正在使用的输入 和 当我使用会话窗口执行时,会出现以下异常: 希望我错过了一个技巧,因为无法使