当前位置: 首页 > 知识库问答 >
问题:

带状态的Flink窗口聚合

白迪
2023-03-14

我想使用早期触发逻辑进行窗口聚合(您可以认为聚合是由窗口关闭或特定事件触发的),我阅读了文档:https://ci . Apache . org/projects/flink/flink-docs-release-1.12/dev/stream/operators/windows . html # incremental-window-aggregate-with-aggregate function

文档提到< code >注意到使用ProcessWindowFunction进行简单的聚合(如计数)是非常低效的。因此,建议与增量窗口聚合配合使用。

我的问题是,在文档中,状态不会保存在任何地方,所以如果应用程序崩溃,averageAggregate会失去所有的中间值,对吧?

那么,如果是这种情况,有没有办法进行窗口聚合,仍然支持增量聚合,并且有一个状态后端可以从崩溃中恢复?

共有1个答案

李星波
2023-03-14

< code>AggregateFunction实际上只是描述了将输入事件组合成某个结果的机制,该特定类不存储任何数据。

当我们写下这样的东西时,Flink在幕后为我们保留了这种状态:

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

的代码。

在崩溃或重启的情况下,不会丢失任何数据(假设状态后端配置正确):与Flink state的其他部分一样,此处的状态将从状态后端检索,或从上游数据的第一原理重新计算。

 类似资料:
  • 在阅读了Flink的文档并四处搜索后,我无法完全理解Flink的句柄在其窗口中的状态。假设我有一个每小时滚动的窗口,其中包含一个聚合函数,该函数将消息累积到某个java pojo或scala case类中。该窗口的大小将与一小时内进入该窗口的事件数量相关联,还是仅仅与POJO/Case类相关联,因为我将事件累加到该对象中。(例如,如果将10000个味精数成一个整数,大小会接近10000*味精大小还

  • 我想将一个交易流聚合成相同交易量的窗口,这是区间内所有交易的交易规模之和。 我能够编写一个自定义触发器,将数据分区到Windows中。代码如下: 上面的代码可以将其划分为大致相同大小的窗口: 现在我喜欢对数据进行分区,以便卷与触发器值完全匹配。为此,我需要稍微修改一下数据,方法是将区间结束时的交易分成两部分,一部分属于正在触发的实际窗口,剩余的超过触发器值的数量必须分配给下一个窗口。 那可以用一些

  • 这是我的代码。我的问题如下 > 以这种方式清除状态是否正确? 这是使用keyBy的正确方法吗? //有100万个storeId

  • 我有两条流: 测量 WhoMeasured(关于谁进行了测量的元数据) 这些是它们的案例类: 流包含大量数据。流几乎没有任何可用性。事实上,对于<code>who_measured_id</code>流中的每个<code>who_。这本质上是一个哈希表,由流填充。 在我的自定义窗口函数中 这是我的工作。现在你可能会看到,有一些东西不见了:两个流的结合。 因此,从本质上讲,这是一种查找表,当流中的新

  • 我正在编写一个Flink应用程序,它使用kafka主题中的时间序列数据。时间序列数据包含度量名称、标记键值对、时间戳和值等组件。我已经创建了一个滚动窗口来根据度量键(度量名称、键值对和时间戳的组合)聚合数据。这里是主流看起来像 我还想检查是否有任何指标在上面的窗口外迟到。我想检查有多少指标延迟到达,并计算延迟指标与原始指标相比的百分比。我正在考虑使用flink的“允许延迟”功能将延迟指标发送到不同

  • 我正在构建一个具有以下目标的 Flink 应用程序: 将事件收集到键控的非活动触发会话窗口中 尽早发出输入事件的副本,并通过会话引用进行增强 打开和关闭会话时发出会话更新以及收集的会话统计信息(在会话关闭时) 通过滚动窗口,我已经能够实现上述目标,但在会话窗口中我没有成功。 我的窗口处理代码如下 来生成我正在使用的输入 和 当我使用会话窗口执行时,会出现以下异常: 希望我错过了一个技巧,因为无法使