当前位置: 首页 > 知识库问答 >
问题:

Kafka消费者在重新平衡时获得不同的补偿

太叔岳
2023-03-14

我有一个微服务应用程序,它是使用camel kafka开发的,用于消费来自kafka的消息。我用4个并发实例运行这个服务,其中每个实例都有1个消费者,他们消费的主题有20个分区。

在维护期间(每天凌晨1-2点),应用程序将通过停止消费者来暂停消费。

当应用程序暂停消费时,kafka重新平衡发生,一些实例将在停止消费之前分配更多分区。

e、 由instanceA(partition1-5)使用的g分区将被重新分配给instanceB,然后再重新分配给instanceC,直到所有实例都被消费者关闭。

问题出现在应用程序启动消费者之后,一些实例正在重新处理来自kafka的消息,即使没有发布到主题的新消息。

检查日志后,请注意,当应用程序停止重新平衡时,偏移量=12399,但当应用程序开始使用消息时,偏移量=12336。

November 17th 2020, 01:00:02.951 | test-679656954c-5pssk | [Consumer clientId=consumer-group1-17, groupId=group1] Setting offset for partition TestTopic-14 to the committed offset FetchPosition{offset=12399, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[testbroker1.local:9093 (id: 1 rack: null)], epoch=6}}
November 17th 2020, 02:00:00.559 | test-679656954c-j4xbl | [Consumer clientId=consumer-group1-17, groupId=group1] Setting offset for partition TestTopic-14 to the committed offset FetchPosition{offset=12336, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[testbroker1.local:9093 (id: 1 rack: null)], epoch=6}}

这是为了检查偏移量是如何改变的吗?

共有1个答案

邓令
2023-03-14

我猜在关闭阶段,4个并发实例会一个接一个地关闭,这就是为什么会看到分区重新平衡,将第一个关闭实例中的实例分配给其他仍在运行的实例,依此类推。关于启动时的补偿,这取决于消费者如何提交补偿。如果启用了自动提交,那么它会定期发生,因此,当关闭发生时,提交可能没有完成(因为是周期性的),最新的偏移量也没有提交。如果您使用的是手动提交,在关闭之前,您应该从应用程序内部确保所有使用者都已提交已消费消息的偏移量。

 类似资料:
  • 根据Kafka的文件: kafka保证主题分区只分配给组中的一个消费者。 但我在服务中观察到了不同的行为。以下是一些细节: 我用的是Kafka2.8和SpringKafka2.2.13。 最初我有一个Kafka主题包含5个分区,这个主题在我的服务中使用了Spring和ConcurrentKafkAlisterContainerFactory中的注释,并发性=5。这个配置对我来说很好。 后来,我开始

  • 我有一个关于2个代理上的3个分区的主题。(Kafka版本:0.8.1) 使用不同的用户guid(如:FC42B34DD7658503E040970A2C437358)作为分区密钥批量添加消息。(约10K条消息) 在加载消息时,我有一个正在运行的消费者(consumer1),它开始很好地处理消息。 然后我用相同的消费者组ID启动了另一个消费者(consumer2)。 我希望两个消费者都应该分配负载。

  • 我有一个Kafka流应用程序,它从几个主题中获取数据,并将数据加入另一个主题。 Kafka配置: 注意:我在运行Kafka Brokers的机器上运行Kafka Streams应用程序。 每小时消耗/产生数百万条记录。每当我让Kafka经纪人倒下时,都会进入再平衡阶段,再平衡大约需要30分钟,有时甚至更长时间。 有人知道如何解决Kafka消费者的再平衡问题吗?而且,很多时候,它在重新平衡时抛出异常

  • 有人能告诉我Kafka消费者的再平衡算法是什么吗?我想了解分区计数和消费者线程是如何影响这一点的。 非常感谢。

  • 我们正在运行一个3 broker Kafka 0.10.0.1集群。我们有一个java应用程序,它产生了许多消费线程,从不同的主题消费。对于每一个主题,我们都指定了不同的消费者群体。 很多时候,我看到每当这个应用程序重新启动时,一个或多个CG需要超过5分钟来接收分区分配。在此之前,这个话题的消费者不会消费任何东西。如果我去Kafka broker并运行Consumer-Groups.sh并描述特定

  • 我正在编写一个概念验证应用程序来使用Apache Kafka0.9.0.0中的消息,看看是否可以使用它而不是通用的JMS消息代理,因为Kafka提供了好处。这是我的基本代码,使用新的消费者API: 我使用默认设置启动了一个kafka服务器,并使用shell工具启动了一个kafka生产者,以便将消息写入我的主题。然后,我使用这段代码与两个使用者连接,发送正确的服务器来连接,发送主题来订阅,其他一切都