当前位置: 首页 > 知识库问答 >
问题:

Flink窗口:聚合并输出到接收器

慕容玉书
2023-03-14

我们有一个数据流,其中每个元素都属于这种类型:

id: String
type: Type
amount: Integer

我们希望聚合此流并每周输出一次amount的总和。

当前解决方案:

flink管道的示例如下所示:

stream.keyBy(type)
      .window(TumblingProcessingTimeWindows.of(Time.days(7)))
      .reduce(sumAmount())
      .addSink(someOutput())

输入

| id | type | amount |
| 1  | CAT  | 10     |
| 2  | DOG  | 20     |
| 3  | CAT  | 5      |
| 4  | DOG  | 15     |
| 5  | DOG  | 50     |

如果窗口在记录34之间结束,我们的输出将是:

| TYPE | sumAmount |
| CAT  | 15        | (id 1 and id 3 added together)
| DOG  | 20        | (only id 2 as been 'summed')

Id45仍将在flink管道中,并将在下周输出。

因此,下周我们的总产量将是:

| TYPE | sumAmount |
| CAT  | 15        | (of last week)
| DOG  | 20        | (of last week)
| DOG  | 65        | (id 4 and id 5 added together)

新规定:

我们现在还想知道每个记录在哪一周被处理。换句话说,我们的新产出应该是:

| TYPE | sumAmount | weekNumber |
| CAT  | 15        | 1          |
| DOG  | 20        | 1          |
| DOG  | 65        | 2          |

但我们还需要这样的额外输出:

| id | weekNumber |
| 1  | 1          |
| 2  | 1          |
| 3  | 1          |
| 4  | 2          |
| 5  | 2          |

这个怎么处理?

Flink有没有办法实现这一点?我想我们会有一个聚合函数,它可以求和金额,但也可以用当前星期的数字输出每个记录,但是我在文档中找不到这样做的方法

(注意:我们每周处理约1亿条记录,因此理想情况下,我们只希望在一周内将总量保持在flink的状态,而不是所有单个记录)

编辑:

我选择了安东下面描述的解决方案:

DataStream<Element> elements = 
  stream.keyBy(type)
        .process(myKeyedProcessFunction());

elements.addSink(outputElements());
elements.getSideOutput(outputTag)
        .addSink(outputAggregates())

KeyedProcessFunction看起来像:

class MyKeyedProcessFunction extends KeyedProcessFunction<Type, Element, Element>
    private ValueState<ZonedDateTime> state;
    private ValueState<Integer> sum;

    public void processElement(Element e, Context c, Collector<Element> out) {
        if (state.value() == null) {
            state.update(ZonedDateTime.now());
            sum.update(0);
            c.timerService().registerProcessingTimeTimer(nowPlus7Days);
        }
        element.addAggregationId(state.value());
        sum.update(sum.value() + element.getAmount());
    }

    public void onTimer(long timestamp, OnTimerContext c, Collector<Element> out) {
        state.update(null);
        c.output(outputTag, sum.value()); 
    }
} 

共有1个答案

史劲
2023-03-14

reduce方法的一个变体将ProcessWindowFunction作为第二个参数。您可以这样使用它:

stream.keyBy(type)
  .window(TumblingProcessingTimeWindows.of(Time.days(7)))
  .reduce(sumAmount(), new WrapWithWeek())
  .addSink(someOutput())

private static class WrapWithWeek
  extends ProcessWindowFunction<Event, Tuple3<Type, Long, Long>, Type, TimeWindow> {

      public void process(Type key,
                Context context,
                Iterable<Event> reducedEvents,
                Collector<Tuple3<Type, Long, Long>> out) {
          Long sum = reducedEvents.iterator().next();
          out.collect(new Tuple3<Type, Long, Long>(key, context.window.getStart(), sum));
      }
}

通常,ProcessWindowFunction会传递一个Iterable,其中包含窗口收集的所有事件,但如果您使用reduce或aggregate函数来预聚合窗口结果,则只会将该单个值传递给Iterable。这方面的文档在这里,但文档中的示例目前有一个小错误,我在这里的示例中修复了这个错误。

但是考虑到对第二个输出的新要求,我建议您放弃在Windows中这样做的想法,而是使用带键的ProcessFunction。您将需要两个每键ValueState:一个按周计数,另一个存储总和。您需要一个每周触发一次的计时器:当它触发时,它应该发出类型、总和和和周数,然后增加周数。同时,流程元素方法将简单地输出每个传入事件的ID以及周计数器的值。

 类似资料:
  • 在Flink中,是否可以计算键控窗口的聚合输出? 我们有一个数据流,我们调用byKey()指定一个由字符和数字组成的字段(例如A01、A02…A10、B01、B02…B10等),就像棋盘上的方块一样。在之后,我们调用,因此我们创建了一个每周窗口。在此之后,我们调用,结果我们得到了

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

  • 我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate functi

  • 尝试合并多个 Kafka 流,聚合

  • 我们计划将Apache Flink与一个巨大的IOT设置一起使用。客户将向我们发送某种结构化的传感器数据(如sensor_id、sensor_type、sensor_value、timestamp)。我们没有控制每个客户何时发送这些数据,最有可能是实时的,但我们没有保证。我们将所有事件存储在RabbitMQ/Kafka中。更新:我们可以假设每个传感器的事件是按顺序来的。 在开始实施可能的流式管道之

  • 我有一个,它是由一个kafka主题创建的,并且指定了属性。 当我试图创建一个时,会话窗口化了一个查询,如下所示: 我总是得到错误: KSQL不支持对窗口表的持久查询 如何在KSQL中创建开始会话窗口的事件的?