当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams KTable store包含更改日志主题和日志压缩源主题

宗政霄
2023-03-14

我在一个输入主题上构建KTable,并且在两个Kafka Stream应用程序实例上加入KStream。

KTable的输入主题已经是一个日志压缩主题。因此,当我的一个应用程序实例关闭时,通过读取input log compacted主题,另一个实例状态存储似乎会用整个状态刷新。

所以不需要为我的KTable存储启用日志记录(更改日志)?

我的源输入日志压缩主题可能有数百万条记录,所以如果我在KTable状态存储上启用日志记录,它会在失败的情况下改善我的状态存储刷新时间吗?还是它不会有效果,因为源主题已经进行了日志压缩?谢谢!

共有1个答案

经伟
2023-03-14

所以不需要为我的KTable存储启用日志记录(更改日志)?

那是正确的。Kafka流不会创建额外的changelog主题,但会使用输入主题进行恢复(不需要重复数据)。

因此,如果启用在KTable状态存储上日志记录

如果要减少故障时间,应通过streamsconfig参数num.standby.replicas配置standbytasks(默认值为0,因此可以将其设置为1)。Cf https://docs.confluent.io/current/streams/developer-guide.html#state-restoration-during-workload-rebalance

 类似资料:
  • 一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?

  • 我需要压缩Weblogic日志。 我检查了Weblogic控制台的日志设置,发现我可以旋转日志,但没有看到可以为日志压缩(压缩到zip文件)设置的任何属性。 当前设置如图所示。 有没有自动压缩这些日志的方法?

  • 日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。 这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。 接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。 迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、

  • 我有一个带有Kafka使用者的spring应用程序,它使用@KafKalisterner注释。正在使用的主题是日志压缩的,我们可能会遇到必须再次使用主题消息的情况。以编程方式实现这一目标的最佳方法是什么?我们不控制Kafka主题配置。

  • 我需要从其他项目模块中提取包含变更集作为maven依赖项的changelog文件。我需要在当前项目中包含其他项目的主变更日志文件(它引用了其中的所有变更集),我将在该文件中执行maven liquibase更新/回滚或spring boot liuqibase设置。有没有办法让它奏效? 但我在设置或运行LiquiBase时出错:LiquiBase.Exception.SetupException:

  • 我需要维护自己的全局表结构。基本上,全局存储与处理器相连,我使用一些计算来创建键,然后将其存储到键值存储。 由于全局状态存储没有更改日志主题,所以它使用原始主题作为更改日志。在状态恢复的情况下,它只是将主题数据加载到全局表(这将是错误的),因为我们构建了自己的密钥 我的要求是用我的自定义键(不是直接来自主题的键)创建一个全局存储。有什么解决方案吗? 下面的链接回答了我的问题Kafka stream