当前位置: 首页 > 知识库问答 >
问题:

Flink如何计算键控窗口的聚合输出

赖渊
2023-03-14

在Flink中,是否可以计算键控窗口的聚合输出?

我们有一个数据流,我们调用byKey()指定一个由字符和数字组成的字段(例如A01、A02…A10、B01、B02…B10等),就像棋盘上的方块一样。在byKey()之后,我们调用窗口(TumblingEventTimeWindow.of(Time.days(7)),因此我们创建了一个每周窗口。在此之后,我们调用reduce(),结果我们得到了SingleOutputStreamOperator

现在,我们要对SingleOutputStreamOP进行分组

这是我的代码,您可以看到:

>

  • 我们使用基于Tuple2的keyBy()

    代码窗口(timeIntervalConstructor.newInstance())基本上创建了一个每周窗口。

    我们调用duce(),因此对于每个键,我们都有一个聚合值。

    现在我们使用另一个keyBy(),这次键基本上是根据代码A01,..., A10的数量计算的:如果它大于5,我们有一个海类型,如果它小于或等于我们有另一个。

    同样,窗口(timeIntervalConstructor.newInstance())为第二个每周窗口。

    最后,在aggregate()中,我们计算每组的top3。

            .keyBy(new KeySelector<Query2IntermediateOutcome, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> getKey(Query2IntermediateOutcome intermediateOutcome) throws Exception {
                    return new Tuple2<String, Integer>(intermediateOutcome.getCellId(), intermediateOutcome.getHourInDate());
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .reduce(new ReduceFunction<Query2IntermediateOutcome>() {
                @Override
                public Query2IntermediateOutcome reduce(Query2IntermediateOutcome t1, Query2IntermediateOutcome t2) throws Exception {
                    t1.setAttendance(t1.getAttendance()+t2.getAttendance());
                    return t1;
                }
            })
            .keyBy(new KeySelector<Query2IntermediateOutcome, String>() {
                @Override
                public String getKey(Query2IntermediateOutcome query2IntermediateOutcome) throws Exception {
                    return query2IntermediateOutcome.getSeaType().toString();
                }
            })
            .window(timeIntervalConstructor.newInstance())
            .aggregate(new Query2FinalAggregator(), new Query2Window())
    

    这个解决方案可以工作,但我真的不喜欢它,因为第二个窗口在前一次触发时接收所有数据,但它每周都会发生,所以第二个窗口一起接收所有数据,并且必须立即运行聚合()


  • 共有1个答案

    傅志用
    2023-03-14

    我认为将所有这些业务逻辑分解为一个KeyedProcessFunction是相当简单的。然后您可以避免在周末爆发的活动。

    以Flink文档中的本教程为例,了解如何用KeyedProcessFunction替换带关键帧的窗口。

     类似资料:
    • 我们有一个数据流,其中每个元素都属于这种类型: 我们希望聚合此流并每周输出一次的总和。 当前解决方案: flink管道的示例如下所示: 输入 如果窗口在记录和之间结束,我们的输出将是: Id和仍将在flink管道中,并将在下周输出。 因此,下周我们的总产量将是: 新规定: 我们现在还想知道每个记录在哪一周被处理。换句话说,我们的新产出应该是: 但我们还需要这样的额外输出: 这个怎么处理? Flin

    • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi

    • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

    • 在Apache Flink中使用滑动时间窗口时,当窗口滑动时会重新计算窗口中的许多元组/元素。例如,假设一个窗口大小为5秒,滑动时间为1秒,则窗口内容的80%与上一个窗口的内容相同。 考虑一个数据流S,其元组由时间戳和整数值组成: , , , , , , ,... 假设t1、t2、t3、...表示连续时间戳,其中t2-t1=1秒。给定S,窗口大小为5秒、滑动1秒的Flink窗口化ProcessWi

    • 我对流中的事件进行了键控,我希望通过键来累积,直到超时(例如,5分钟),然后处理累积到该点的事件(忽略该键之后的所有内容,但首先是第一件事)。 我是一个新的Flink,但从概念上来说,我认为我需要一些类似下面代码的东西。 如何在Flink中完成键控窗口超时?

    • 尝试合并多个 Kafka 流,聚合