当前位置: 首页 > 知识库问答 >
问题:

Kafka流-定义变更日志的保留策略

石正奇
2023-03-14

我使用Kafka流对时间窗口进行一些聚合。我只对每个窗口的最终结果感兴趣,因此我使用了.suppress()特性,该特性为其状态创建了一个changelog主题。

这个changelog主题的保留策略配置被定义为“紧凑”,我的理解是,它将至少保留每个键过去的最后一个事件。

我的应用程序中的问题是密钥经常变化。这意味着主题将无限增长(每个窗口都会带来永远不会被删除的新键)。

因为聚合是每个窗口的,所以在聚合完成之后,我就不需要“旧的”键了。

有没有办法告诉Kafka流从以前的窗口中移除键?

对于这个问题,我认为将changelog主题保留策略配置为“compact,delete”就可以完成这项工作(在kafka中可以根据这个:KIP-71,kafka-4015使用。

但是是否可以使用Kafka Streams API更改保留策略呢?

共有1个答案

程鸿波
2023-03-14

suppress()运算符在将记录从其缓冲区中逐出并向下游发送时向changelog主题发送tombstone消息。因此,您不需要担心主题的无限增长。更改压缩策略实际上可能会破坏操作员提供的保证,并且可能会导致数据松散。

 类似资料:
  • 假设我有一个多代理(运行在同一主机上)的Kafka设置,其中有3个代理和50个主题,每个主题配置为有7个分区和3个复制因子。 我有50GB的内存要用于kafka,并确保kafka日志永远不会超过这个内存数量,因此我想配置我的保留策略以防止这种情况。 我已设置删除清理策略: 我应该如何配置上述参数,以便每7天删除一次数据,并确保如果需要,可以在较短的窗口中删除数据,这样我就不会耗尽内存?

  • 我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。

  • 假设我将一个KStream聚合到一个KTable,将一个KStream聚合到一个KTable。和都不传递空值(删除事件被聚合为快照的状态属性)。此时,我们可以假设对于和聚合都有一个持久化的kafka changelog主题和一个rocksDB本地存储。然后,我的拓扑将与连接起来,生成一个连接的。也就是说,我的问题是和物化生命周期(包括changelog主题和本地rocksdb存储)。假设主题和主题

  • 我有一个关于Kafka主题清理策略和它们的日志.保留... 例如,如果我将cleanup.policy设置为compact,则只有在主题的保留时间或保留时间对压缩没有影响之后才会开始压缩? 答案Thx...

  • 我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。 聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。 这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。 然而,从长远来看,这意味着这个变更日志将永远增长(随着更

  • 本文向大家介绍kafka 有几种数据保留的策略?相关面试题,主要包含被问及kafka 有几种数据保留的策略?时的应答技巧和注意事项,需要的朋友参考一下 kafka 有两种数据保存策略:按照过期时间保留和按照存储的消息大小保留。