当前位置: 首页 > 知识库问答 >
问题:

Flink:在时间窗口中合并所有键的结果

狄钧
2023-03-14
stream
    .keyBy("key")
    .window(<tumbling window of 5 mins>)
    .aggregate(<aggFunc>, <function adding window key and start wd time>)
    ...
    .addSink(sink)
1. (<aggregated val>, key1, <window1-start-time)
2. (<aggregated val>, key2, <window1-start-time)
3. (<aggregated val>, key3, <window1-start-time)
4. (<aggregated val>, key1, <window1-start-time)
5. (<aggregated val>, key3, <window2-start-time)
6. (<aggregated val>, key4, <window2-start-time)
7. (<aggregated val>, key5, <window2-start-time)
[json-string(1), json-string(2), json-string(3)] 
-> send to sink after window 1 fires
[json-string(4), json-string(5), json-string(6), json-string(7)] 
-> send to sink after window 2 fires

我使用的是EventTime,它从Kafka记录中的一个字段中派生出时间戳。在apply运算符之后,事件不再有时间戳记录,而是有一个很长的WindowStartTime。

共有1个答案

濮阳俊明
2023-03-14

您可以使用非键控TimeWindow跟随键控TimeWindow,该非键控TimeWindow将第一个窗口的所有结果合并在一起:

stream
    .keyBy("key")
    .window(<tumbling window of 5 mins>)
    .aggregate(<aggFunc>, <function adding window key and start wd time>)
    .windowAll(<tumbling window of 5 mins>)
    .process(<function iterating over batch of keys for each window>)
    .addSink(sink)

timewindow发出的记录会以允许应用另一层窗口的方式自动打上时间戳。这可以用于聚合所有键的结果(如图所示),或者在不同的时间尺度上生成键化的结果(例如,将5分钟的窗口合并到60分钟的窗口)。

Flink训练包括一个演示此模式的练习。

 类似资料:
  • 我正在尝试使用主题列表中的单个kafka使用者组合两个kafka主题,进一步将流中的json字符串转换为POJO。然后,通过keyBy(On事件时间字段)将它们加入,并将它们合并为单个胖json,我计划使用窗口流并在窗口流上应用窗口函数。假设主题A 我有几个问题。 这种方法适合合并主题并创建单个JSON吗 所有窗口流上的窗口函数似乎工作不正常;任何指点都将不胜感激 代码片段: 我得到了- AllW

  • 我对流中的事件进行了键控,我希望通过键来累积,直到超时(例如,5分钟),然后处理累积到该点的事件(忽略该键之后的所有内容,但首先是第一件事)。 我是一个新的Flink,但从概念上来说,我认为我需要一些类似下面代码的东西。 如何在Flink中完成键控窗口超时?

  • 的结果是一个元素流-因此,我希望从这个流中获得一个“具有最高计数的key”的更新流。 然后我通过一个常量(-因为这是一个全局操作)进行键控,并使用-这几乎可以实现:我得到一个最高计数流,但当前的最高计数是针对每个元素发出的。 我想我要找的是某种带有前一个值的过滤器,它只会在新值与前一个值不同时才会发出元素。 目前在Flink有可能吗?

  • 在Flink有可能吗?如果是的话,那该怎么做呢?

  • 假设我有一个股票市场交易事件流,如下所示: 使得technicalN(其中N是一些数字)代表给定公司的日终股票市场交易数据的第N个技术交易条目[开盘(浮动)、高位(浮动)、低位(浮动)、收盘(浮动)、成交量(int)]。(即ticker GOOG的技术1不同于ticker MSFT的技术1。)如: (请注意,这些交易价格/交易量完全是虚构的。 假设我想创建一个大小为2、时间间隔为1天的窗口,这样我

  • 我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情