当前位置: 首页 > 知识库问答 >
问题:

《Kafka流》中statestore和changelog主题的目的?

景鹏飞
2023-03-14

我有一个kafka流应用程序,它在其中使用stateStore(由RocksDB支持)。

stream thread所做的只是从kafka主题获取数据并将数据放入State Store。(还有其他线程从statestore读取数据并进行业务逻辑处理)。

我观察到它创造了一个新的Kafka主题“变化日志”,因为Statestore。

但我没有明白“变化”Kafka话题有什么用处?

  • 为什么需要它(更改日志)?
  • statestore与“Changelog”Kafka主题之间有什么关系?
  • 谁将数据放入此主题?(“ChangeLog”)

共有1个答案

鲁涵映
2023-03-14

这个问题的简单答案是实现容错。

详情:

changelog允许Kafka Streams应用程序中的状态存储容错。当应用程序将更多数据输入状态存储时,它会被推送到ChangeLog主题,这样,如果运行应用程序的节点关闭,就会使用ChangeLog主题来加载状态存储区中的最新状态。

每个应用程序线程或实例都有自己的changelog主题分区,这样每个实例都可以在应用程序失败后重新启动后重新创建它的状态。

当对状态存储进行更新时,数据会被Kafka流自动推送到主题中。

我建议阅读Kafka Definitive Guide的第11章--它包含了关于Kafka Streams体系结构和流处理模式的非常好的解释。

希望这有帮助。

 类似资料:
  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

  • 我正在尝试设置一个安全的Kafka集群,但在ACL方面遇到了一些困难。 Kafka流的汇流安全指南(https://docs.Confluent.io/current/Streams/developer-guide/security.html)只说明必须将集群创建ACL交给主体...但它没有说任何关于如何实际处理内部话题的内容。 通过研究和实验,我确定(对于Kafka版本1.0.0): 通配符不能

  • 打扰一下!我从我的团队那里得到了同样的问题,他们问我Kafka的主题、泉云溪的通道和泉云溪的目的地有什么不同点。我们试图在网上搜索,但没有得到任何线索。

  • 我正在尝试理解。 据我所知,在这种类型的流处理器中,它使用来维护某种状态。 我开始知道,实现的方法之一是使用。假设以下(并且只有一个处理器是) A- 假设sp只监听一个Kafka主题,比如带有10个分区的。 我观察到,当应用程序启动时(在不同的物理机器上有2个实例,并且=5),然后对于,它会创建目录结构,其内容如下: 0_0,0_1,0_2......0_9(每台机器有5个分区)。 我正在浏览一些

  • 只是关于Kafka的后续问题-未压缩主题与压缩主题 正如那里所说, 用数据库的说法,流可以被视为插入。每个不同的记录都保留在此日志视图中。流通常是从未压缩的主题构建的。 作为最佳实践,关于未压缩主题的语义,是否应禁用要在日志启用程序中取消压缩的主题,以便不会发生压缩(清理),其属性如下: 日志清洁工enable=false或log。清洁工启用=true(默认),清除策略为“delete”(默认)

  • 我在Kafka Streams拓扑工作,有时,在更改应用程序ID和/或clientId属性后,我在特定的kafka流上收到错误:“”。我已经在每个Kafka节点的server.properties中设置了属性,但似乎没有创建此流的主题。 这是我的Kafka Streams拓扑: