当前位置: 首页 > 知识库问答 >
问题:

《Kafka流》中Kafka话题的保留及其对国家存储的影响

林念
2023-03-14

我在Kafka streams word-count应用程序中有一个状态存储(使用materialized.as())。
根据我的理解,状态存储在Kafka的内部主题中是保持的。

  1. 状态存储是否可以有无限制的键值对,或者由基于log.retention策略或log.segment.bytes的kafka主题规则来管理?
  2. 我设置了log.retention.ms=60000,并希望状态存储值在一分钟后重置为0。但我发现它没有发生,我仍然可以看到从state Store的值。kafka是完全清除日志还是保留case log-compaction主题中的快照?
  3. “段提交”是什么意思?

如果有的话,请附上解决方案的来源。

共有1个答案

杜炫明
2023-03-14

状态存储是否可以有无限的键值对,或者它们是由基于log.retention策略或log.segment.bytes的kafka主题规则管理的?

是的,状态存储可以有无限的键值对=events(或'messages')。当然,本地应用程序存储空间和Kafka中的远程存储空间是允许的(后者用于在您的状态存储中持久地存储数据)。

应用程序的状态存储在压缩的内部Kafka主题中远程持久化。压缩意味着Kafka定期从存储中清除相同事件键(例如,Bob的旧帐户余额)的旧事件。但压缩主题不会删除每个事件键的最新事件(例如,Bob的当前帐户余额)。在压缩主题中存储多少个这样的“唯一”键值对没有上限。

我设置了log.retention.ms=60000,并希望状态存储值在一分钟后重置为0。但我发现它没有发生,我仍然可以看到从state Store的值。

将主题配置为压缩(log.cleanup.policy=compact)时,不使用log.retention.ms。有关详细信息,请参阅现有的SO question Log compaction以保持每个键仅一个消息,包括为什么压缩不立即发生(简而言之,这是因为压缩操作在分区段文件上,它不会触及最新的段文件,并且该文件中的每个事件键可能有多个事件)。

注意:您现在可以将配置log.cleanup.policy设置为压缩和基于时间/卷的保留的组合,使用log.cleanup.policy=compact,delete(有关详细信息,请参阅KIP-71)。但是通常情况下,除非你真的知道你在做什么,否则你不应该乱弄这个设置--默认值是你99%的时间所需要的。

Kafaka是完全清除日志还是保留快照,以防日志压缩主题?“段被提交”是什么意思?

我不明白这个问题,很不幸。:-)也许我之前的回答和参考链接已经满足了您的需求。我可以说的是,不,Kafka并没有完全清除日志。压缩操作是在主题分区的段文件上操作的。您可能需要阅读压缩是如何工作的,对此我建议写一篇类似https://medium.com/@sunny_81705/kafka-log-retention-and-cleanup-policies-c8d9cb7e09f8的文章,以防Apache Kafka文档还不够清楚。

 类似资料:
  • 我正在实验Kafka流,我有以下设置: null 有什么方法可以让我的KTable从我的主题中“继承”保留策略吗?这样当记录从主主题过期时,它们在KTable中就不再可用了? 我担心将所有记录转储到KTable中,并使StateStore无限增长。 我能想到的一个解决方案是转换成一个窗口流,其跳跃窗口等于记录的TimeToLive,但我想知道是否有更好的解决方案,以更原生的方式。 多谢了。

  • 我需要获取存储中的行数,存储在低级处理器API中维护。我看到,方法“近似数字条目()”可以在此存储中提供键值映射的近似计数。你能澄清一下准确度的%吗,这意味着如果商店里有100行,我们会得到95行作为近似计数吗?或者它有时会低于50行吗?只是想了解影响计数准确性的因素。 注意:假设流应用程序使用单个主题并在单个实例上运行。存储是通过低级处理器API访问的,不确定默认情况下是否应用了任何缓存。提交频

  • 我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化: 按照设计,正在具体化的信息也由changelog主题支持。 用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存

  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!

  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码