当前位置: 首页 > 知识库问答 >
问题:

Kafka流拓扑优化

羊光辉
2023-03-14

在准备拓扑优化时,我偶然发现了以下几点:

目前,Kafka Streams在启用时会执行两种优化:

1-源KTable将源主题重新用作变更日志主题。

2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。

这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗:

1-KTable使用内部变更日志主题吗?如果是,有人能给我指出一个文档吗?接下来,变更日志主题是什么?它实际上是更新操作的更新日志吗?

2-如果我最后的猜测是真的,我不明白由upsert组成的变更日志如何只能被源主题替换?

共有1个答案

程英资
2023-03-14

changelog主题是配置了日志压缩的Kafka主题。对KTable的每次更新都会写入变更日志主题。因为主题是压缩的,所以不会丢失任何数据,重新读取changelog主题可以重新创建本地存储。

此优化的假设是,源主题是一个压缩的主题。在这种情况下,源主题和相应的变更日志主题将包含完全相同的数据。因此,优化删除了变更日志主题,并在恢复期间使用源主题重新创建状态存储。

如果输入主题未压缩但应用了保留时间,则可能不希望启用优化,因为这可能导致数据丢失。

关于历史:最初,Kafka Streams对这种优化进行了硬编码(因此,如果潜在的数据丢失是不可接受的,则“强制”用户只将压缩的主题作为KTables读取)。然而,在版本1.0中引入了一个回归错误(通过https://issues.apache.org/jira/browse/KAFKA-3856:新的StreamsBuilder行为与旧的KStreamBuilder不同,StreamsBuilder总是会创建一个变更日志主题“删除”优化。在版本2.0中,问题已得到修复,优化再次可用。(参见https://issues.apache.org/jira/browse/KAFKA-6874)

注意:优化仅适用于源KTable。对于计算结果(如聚合或其他),优化不可用,并将创建更改日志主题(如果未显式禁用,则禁用相应存储的容错功能)。

 类似资料:
  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于

  • 我有一个应用程序,是基于Spring启动,SpringKafka和Kafka流。当应用程序启动时,它会创建带有默认主题列表的kafka流拓扑。我需要做的是在运行时编辑/重新创建拓扑。例如,当应用程序已经运行时,有新的主题名称出现,我想将此主题添加到我的拓扑结构中。目前,我正在考虑以某种方式删除现有的拓扑,关闭并清理KafkaStreams,在创建拓扑但使用新主题名称的地方运行逻辑,并再次启动Kaf

  • 使用kafka processor API(不是DSL)读取源主题并写入目标主题,对于单个kafka集群设置(也就是说,如果源主题和目标主题都驻留在同一集群上)来说工作很好,但是当源主题和目标主题驻留在不同的kafka集群上时,我将获得目标处理器上下文的NullPointerException 我们如何使用kafka streams处理器API从一个集群中的一个主题写到另一个集群中的另一个主题?

  • 我正在运行一个Kafka Streams应用程序,它有三个子拓扑。活动的阶段大致如下: 主题A 主题A、B和C都是物化的,这意味着如果每个主题有40个分区,我的最大并行度是120。 起初,我运行5个流应用程序,每个线程8个。在这种设置下,我遇到了不一致的性能。似乎某些共享同一线程的子拓扑比其他子拓扑更渴望CPU,过了一会儿,我会得到这个错误:组[consumer_group]中的中删除。一切都会重

  • 我正在学习QuarkusKafka流教程,不太明白如何启动管道。 在本教程中,用于构建。构建拓扑的方法用<code>注释为products<code>。在本备忘单中,描述了这足以运行Kafka Streams管道。在本教程中,还公开了httpendpoint。这在我目前正在实现的服务中不是必需的。此外,在本示例中,从未显式调用提供程序方法。当我在没有endpoint的情况下启动应用程序时,管道不会