当前位置: 首页 > 知识库问答 >
问题:

状态存储changelog主题的使用者是什么

酆鸿彩
2023-03-14

我有一个使用处理器api更新状态存储的拓扑,配置为复制因子3,ACKS=ALL

Topologies:
   Sub-topology: 0
    Source: products-source (topics: [products])
      --> products-processor
    Processor: products-processor (stores: [products-store])
      --> enriched-products-sink
      <-- products-source
    Sink: enriched-products-sink (topic: enriched.products)
      <-- products-processor

我试图找出这个changelog主题滞后的根本原因,因为我没有在这个处理器中发出任何外部请求。有对rocksdb状态存储的调用,但这些数据存储都是本地的,检索速度应该很快。

我的问题是这个变更日志主题的使用者到底是什么?

共有1个答案

诸葛砚
2023-03-14

changelog主题的使用者是还原使用者。restore消费者是构建在Kafka流中的Kafka消费者。与从源主题读取记录的主使用者不同,还原使用者负责在本地状态不存在或过期的情况下从changelog主题还原本地状态存储。基本上,它确保本地状态存储在故障后恢复。还原使用者的第二个目的是保持备用任务是最新的。

Kafka Streams客户端中的每个流线程都有一个还原使用者。还原使用者不是使用者组的成员,Kafka Streams将changelog主题手动分配给还原使用者。还原使用者的偏移量不作为主使用者的偏移量在使用者偏移量主题__consumer_offsets中管理,而是在Kafka Streams客户端的状态存储目录中的一个文件中管理。

 类似资料:
  • 我正在用Kafka和Kafka溪流作为Spring-Cloud-Stream流的一部分。在我的Kafka Streams应用程序中流动的数据在特定的时间窗口内被聚合和物化: 按照设计,正在具体化的信息也由changelog主题支持。 用解决方案更新Kafka Streams 2.0.1版不包含Materialized.WithRetention方法。对于这个特定的版本,我可以使用以下代码设置状态存

  • 我有多个冗余的应用程序实例,希望消费一个主题的所有事件,并存储它们独立的磁盘查找(通过一个rocksdb)。 为了便于讨论,让我们假设这些冗余消费者正在服务无状态http请求;因此,不使用kafka共享负载,而是使用kafka将数据从生产者复制到每个实例LocalStore中。 在查看生成的主题时,每个消费应用程序创建了3个额外的主题: null null 下面是创建存储区的代码

  • 问题内容: 广泛的讨论问题。是否已经有任何库可以让我在Java中存储应用程序的执行状态? 例如,我有一个处理文件的应用程序,现在该应用程序可能在某个时刻被迫关闭。我想存储所有已处理文件和未处理文件的信息,以及处理正在进行的阶段正在进行的流程。 是否已经有抽象此功能的库,或者我将不得不从头开始实现它? 问题答案: 似乎您正在寻找的是可以使用Java Serialization API 执行的序列化。

  • 我创建自定义密钥并将数据存储在全局状态存储中,但在重新启动后,它将消失,因为全局存储在恢复时将直接从源主题中获取数据,并绕过处理器。 我的输入主题有上面的数据。 我维护以下两个状态存储: null 全局状态存储应该具有: 如何在全局状态存储中维护上述场景的还原案例

  • Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0 我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商

  • 我目前正在考虑将opengl状态存储为某种适当类型的全局thread_local变量。那个设计有多糟糕?有什么陷阱吗?