当前位置: 首页 > 知识库问答 >
问题:

重新尝试处理kafka消息是否会永远导致消费者重新平衡?

余靖
2023-03-14

我有一个带注释的kafka消费者方法@kafkalistener。我已经在容器上设置了重试模板,并且重试配置是这样的,如果在处理消息时发生了一些异常,它将始终重试。我已将最大轮询记录设置为1。如果这种情况实时发生,并且消费者一直在重试消息,经纪人会认为该消费者已经死亡并触发重新平衡吗?或者,在重试时,消费者是否会对未能处理的相同消息进行投票?如果这是真的,因为民意调查正在进行,我的假设是不会有任何再平衡。此外,我正在手动提交偏移量,因此我的enable.auto。commit属性设置为false,ack模式为手动。谁能澄清一下吗?提前谢谢。

共有1个答案

王长卿
2023-03-14

是的,在侦听器适配器级别使用无状态重试(默认值)时,当超过 max.poll.interval.ms 时,将导致重新平衡。

您应该改用有状态重试。

在这种情况下,异常被抛出到容器,并且< code > seektocurrenterrangendler 重新查找未处理的分区(包括失败的记录)。您仍然需要确保最大回退时间小于轮询间隔。不需要将< code>max.poll.records设置为1,因为寻道是在所有未处理的分区上进行的。

从2.3版开始,您可以消除侦听器级别的重试,只需使用SeekToCurrentErrorHandler

 类似资料:
  • 我用的是Kafka:2.11-1.0.1。应用程序包含主题“X”的并发性为5的使用者,分区为5。 重新启动应用程序并在分区分配之前在主题“X”上发布消息时,主题“X”的5个使用者会找到组协调器,并将加入组请求发送给组协调器。预计会收到小组协调员的回复,但未收到回复。 我检查了Kafka服务器日志,但在调试日志级别找不到相关日志。 当我运行描述消费者组的命令时,作出如下观察: 消费群体正在重新平衡

  • 我看到一个问题,我的主题中的所有消息都被我的消费者重新阅读。我只有1个消费者,我在开发/测试时打开/关闭它。我注意到,有时在几天没有运行消费者之后,当我再次打开它时,它会突然重新阅读我的所有消息。 客户端 ID 和组 ID 始终保持不变。我显式调用提交同步,因为我的启用.我确实设置了 auto.offset.reset=最早,但据我所知,只有在服务器上删除了偏移量时,才应该启动。我正在使用 IBM

  • 生产者发送消息到一个有四个分区的主题。我们有一个消费者在消费来自这个主题的消息。应用程序在工作日一直运行周末例外:它不会在周末期间调用poll方法。 使用者配置:自动提交,自动提交时间为5s(默认)。 应用程序一直运行良好,直到一个星期天,当它重新开始调用poll方法。我们看到有数百万条消息从这个话题中被轮询出来。消费者基本上是轮询来自主题的所有消息。将新的偏移量与它在周末停止之前的偏移量进行比较

  • 我们有一个制作人 在开发过程中,我重新部署了producer应用程序,并做了一些更改。但在此之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有成功。问题可能是什么和/或如何解决? 消费者配置: 生产者配置: 编辑2: 5分钟后,消费者应用程序死亡,但以下情况除外:

  • 我有一个Kafka集群正在运行,当重新启动应用程序(消费者)时,它会跳过一些在应用程序关闭时推送到主题的消息。 当应用程序启动时,我可以看到它读取带有偏移量的消息,然后将偏移量推送到。然后当应用程序关闭时,带有偏移量的消息被推送到主题。重启应用程序后,它读取并将其偏移量设置为,因此跳过。 这是我的配置:

  • 我们有一个kafka streams应用程序(2.0),它正在与kafka代理(1.1.0)通信。streams应用程序一直在毫无原因地重新处理整个日志-应用程序没有重新启动,没有重新平衡,只是闲坐着-在某些情况下,它正在处理消息,在另一些情况下,它正在等待接收消息(处理消息的时间不到6个小时)。我们已经做了大量的研究,通过将