当前位置: 首页 > 知识库问答 >
问题:

Spring集成Kafka和管理偏移

杜晨朗
2023-03-14

在我的例子中,我们有auto.commit.enable=false,以便在处理消息后将offset提交给Zookeeper。如果处理失败,那么offset将不会被提交,我们应该尝试在某个配置的时间内再次处理相同的消息,从html" target="_blank">zookeeper的offset开始。但它不起作用,因为,我假设,Apache Kafka客户端在内存中保留了偏移量。

我发现kafka.consumer.consumeriterator处理偏移量,如果它的consumedOffset大于Zookeeper中的consumedOffset,那么它将使用consumedOffset读取消息。

所以,我想知道有没有什么方法可以重置Kafka客户端的偏移量,以开始从Zookeeper中的偏移量读取?

共有1个答案

巢皓君
2023-03-14

您不介意分享您的 配置并指出您的问题在哪里吗?

也许做这个MessageLeftOverTracker.ClearMessageLeftOver()就够了?

我对Kafka不太好,但知道Spring的整合是做什么的。

 类似资料:
  • 我试图在Spring Integration中定义一个简单的消息流,它从一个通道读取消息,然后将消息转储到Kafka队列中。为此,我使用了spring集成kafka。问题是我得到了一个无法解读的错误。 以下是我的XML配置: 当我通过Spring启动运行我的应用程序时,我得到这个异常: 创建名称为org.springframework.integration.kafka.outbound.Kafk

  • 我的pom.xml是: 我错过了什么?

  • 我已经用Storm构建了一个示例拓扑,使用Kafka作为源。这是一个我需要解决的问题。 每次我杀死一个拓扑并重新启动它时,该拓扑都从一开始就开始处理。 假设Topology处理了主题X中的消息A,然后我终止了该拓扑。 现在,当我再次提交拓扑时,消息A仍然存在,主题X再次被处理。 有没有一个解决方案,也许某种偏移管理来处理这种情况。

  • 我正在研究为Spark结构化流在kafka中存储kafka偏移量,就像它为DStreams工作一样,除了结构化流,我也在研究同样的情况。是否支持结构化流?如果是,我如何实现? 我知道使用进行hdfs检查点,但我对内置的偏移量管理感兴趣。 我期待Kafka存储偏移量只在内部没有火花hdfs检查点。

  • 假设我有 3 台 Kafka 服务器。服务器 1 zoopkeeper1 服务器 2 zoopkeeper2 服务器 3 zoopkeeper3 在集群配置中,zoopkeepers 会发生什么?它们是为每个服务器单独维护的,还是会在群集配置中同步其数据?

  • 我目前正在尝试编写一个适配器,它将使用来自ActiveMQ的消息并将其发布到Kafka。 我正在考虑使用Spring集成来集成这两个消息传递系统。 我的问题是,我的应用程序不会维护模型的注册表,许多应用程序将使用该注册表将记录发布到activeMQ。我想接收这些javax-jms消息,并想执行一些转换,比如将jmscorrelationId添加到kafka消息中。 另外,另一个要求是仅当kafka