当前位置: 首页 > 知识库问答 >
问题:

当消费者的处理时间超过max.poll.interval.ms时,消费者未停机

傅自明
2023-03-14

我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统

现在我看到kafka重新平衡单个消费者(不允许并发)的问题。

这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。setIdealTimeBetweenPolls(120000)并使消费者批处理过程花费的时间超过此max.poll.interval.ms。根据我的理解,实际上这不应该发生。只有第一次再平衡才会发生。

我遗漏了什么吗?

共有1个答案

游皓
2023-03-14

是的,如果超过< code > max . poll . interval . ms ,就会发生重新平衡——代理认为您的消费者已经死了,所以收回分区(即使不再有消费者)。

当你下次投票时,重新平衡发生了,因为你只有一个消费者,他再次得到所有8个分区。

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我正在尝试使用Apache Flume构建管道:Spooldir- 事件毫无问题地进入kafka主题,我可以使用kafkacat请求看到它们。但是kafka通道无法通过接收器将文件写入hdfs。错误是: 等待来自 Kafka 的数据时超时 完整日志: 2016-02-26 18:25:17,125 (SinkRunner-PollingRunner-DefaultSinkProcessor-Sen