当前位置: 首页 > 知识库问答 >
问题:

kafka streams会话窗口保留时间

桑成荫
2023-03-14

我们正在使用Kafka流的会话窗口来聚合相关事件的到达。除了聚合之外,我们还使用till()API指定窗口的保留时间。流信息:
会话窗口(非活动时间)为1分钟,传递到until()的保留时间为2分钟。我们使用定制的TimestampExtractor来映射事件的时间。

示例:
事件:e1;事件时间:上午10:00:00;到达时间:下午2点(同一天)
事件:e2;事件时间:上午10:00:30;到达时间:下午2:10(同一天)
第二个事件的到达时间是e1到达后10分钟,超过了保留时间非活动时间。但是旧事件e1仍然是聚合的一部分,尽管保留时间是2分钟。

问题:
1)kafka streams如何使用until()API清理状态存储?因为指定为参数的保留值是“窗口将保持多长时间的下限”什么时候清除窗口?

2)是否有一个后台线程定期清理状态存储?如果是,那么有没有一种方法来识别清除窗口的实际时间。

3) 在保留时间后清除窗口数据的任何流配置。

共有1个答案

柳钟展
2023-03-14

在我回答你的具体问题之前:请注意,保留时间不是基于系统时间,而是基于“流时间”。“流时间”是一个内部跟踪的时间进度,基于时间戳提取器返回的任何内容。没有太多细节:对于您有2条记录的示例,当第二条记录到达时,“流时间”将提前30秒,因此保留时间还没有过去。

还要注意,如果没有新数据到达(至少对于一个分区),流时间不会提前。这适用于Kafka 0.11.0及更旧版本,但在未来的版本中可能会更改。

更新:Kafka 2.1更改了流时间的计算,即使一个分区不提供数据,流时间也可能提前。有关详细信息,请参阅KIP-353:改进Kafka流时间戳同步

对于您的问题:

(1) Kafka Streams将所有存储更新写入changelog主题和本地RocksDB存储。两者都被划分为具有一定大小的所谓部分。如果新数据到达(即“流时间”进展),将创建新的段。如果发生这种情况,如果旧段中的所有记录都早于保留时间(即,记录时间戳小于“流时间”减去保留时间),则删除旧段。

(2) 因此,没有后台线程,但清理是常规处理的一部分,

(3)没有强制清除旧记录/窗口的配置。

如果所有记录都已过期,则整个段将被删除,因此段内较旧的记录(最有可能具有较小/较旧的时间戳)的保留时间将长于保留时间。这种设计背后的动机是性能:按记录过期成本太高。

 类似资料:
  • 有人能帮我理解一下在flink中的窗口(会话)是什么时候和如何发生的吗?或者样品是如何加工的? 例如:假设定义的时间窗口是30秒,如果一个事件在t时间到达,另一个事件在t+30,那么这两个事件都将被处理,但是在t+31到达的事件将被忽略。 如果我说的不对,请纠正。 上面的问题是:如果一个事件在t时间到达,而另一个事件在t+3时间到达,是否还会等待整个30秒来汇总并最终确定结果? DTO: ====

  • 你好,我正在Kafka会话窗口上工作,非活动时间为5分钟。我想要一些反馈,当达到非活动时间时,会话会因按键而降低。假设我有 (A,1) 记录“A”是键的位置。现在,如果我在5分钟内没有获得任何“A”键记录,则会话将被删除。 我想在会话结束时做一些操作,比如说(值)*2。有没有什么方法可以通过Kafka Stream API实现这一点

  • 我们正在使用kafka streams的windows join连接2个流,我们想知道: 为什么KS要在内部主题上增加24小时?例如,我们有一个1小时的窗口,但内部主题保留25小时。我们可以将其配置为不添加这些24小时吗 [更新] 例如,我们创建JoinWindow如下: 虽然我可以看到内部主题(JOINTHIS和OUTEROTHER)是用 这是刚刚在我的机器上的一个空代理(使用confluent

  • 问题内容: 我当前正在使用一个Web应用程序,每次都调用此Web应用程序,请删除所有当前的Express会话,因此我需要一种方法来保留所有这些会话。我试图用connect- mongodb和connect- redis保留所有这些会话,但是都无效,nodemon总是说req.session是未定义的。我不知道该怎么做才能保留所有会话。 我需要一种方法来保留所有会话,执行时不要丢失它们,并为其提供教

  • 我有一个聚合函数,它计算WindowedStream中一系列事件的平均值。 这里的警告是,平均值需要在可能无序(或根本没有)到达的事件对上计算。 换句话说,我需要在计算之前对数据进行排序,因为序列很重要。 我可以用getResult API来实现这一点,但是这个函数在窗口中的每个事件上都被调用,这在性能方面没有意义。我也可以用flink cep来做这件事,但出于同样的原因,我想避免使用它。 理想情

  • 我想知道是否可以创建类似于以下内容的WindowAssigner: 但我不希望窗口在每个元素的事件时间中保持增长。我希望在接收到的第一个元素(对于该键)处定义窗口的开头,并在1秒后精确结束,无论有多少元素到达该秒。 所以它可能看起来像这样的假设: 谢谢