当前位置: 首页 > 知识库问答 >
问题:

Spring-Kafka:在使用用户暂停/恢复时发生再平衡,这不应该根据文档

吴欣然
2023-03-14

Spring-Kafka:当按照文档使用Pause/Resume方法暂停/恢复消费者时,当使用自动分配时不应该发生再平衡,但它不工作,再平衡发生了。如何暂停/恢复消费者并在一段时间后保持轮询而不重新平衡?

用例:消费者应该暂停一段时间,并保持轮询给心跳,并在时间结束后恢复,但Kafka不应该在消费者暂停时重新平衡。

            System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] stopped consumption.");
            consumer.pause(Collections.singleton(topicPartition));                    
            try {
                    Thread.sleep(60000);
                    consumer.resume(Collections.singleton(topicPartition));
                    System.out.println("Consumer[" + Thread.currentThread().getName() + "] Partition [" + topicPartition + "] resumed consumption.");
            } catch (InterruptedException e) {              
                       e.printStackTrace();
            }

日志:2019-02-19 15:19:49.173信息82272---[rTaskExecutor-1]O.A.K.C.C.Internals.Abstract协调器:[消费者客户=Consumer-2,GroupID=Customer](重新)加入组2019-02-19 15:19:49.175信息82272---[rTaskExecutor-2]O.A.K.C.Internals.Abstract协调器:[rTaskExecutor-2]O.A.K.C.Internals.Abstract协调器:[消费者客户=Consumer-3,GroupID=Customer](重新)加入组2019-02-19 entid=consumer-4,groupid=customer](重新)加入组

2019-02-19 15:19:49.192信息82272---[rTaskExecutor-1]O.A.K.C.C.Internals.Abstract协调器:[消费者客户=Consumer-2,GroupID=Customer]成功加入第581代组201 9-02-19 15:19:49.192信息82272---[rTaskExecutor-2]O.A.K.C.Internals.Abstract协调器:[消费者客户=Consumer-3,GroupID=Customer]成功加入第581代组

共有1个答案

鄢英毅
2023-03-14

阅读Kafka文档。

暂停使用者只是意味着,在调用resume()之前,后续的poll()将不会返回任何记录,但是为了防止重新平衡,您仍然必须在max.poll.interval.ms中调用poll()

 类似资料:
  • 由于本机KafkaConsumer不是线程安全的,因此不鼓励从不同的线程而不是kafka使用者处理线程调用pause和resume方法。但正如spring kafka提供的另一层kafka信息容器,内部使用kafka consumer。所以我的问题是,我们可以使用KafkaListenerEndpointRegistry通过id获取侦听器容器,并从其他线程而不是消费者处理线程调用resume或pa

  • 我已经实现了Kafka消费者,现在我有了一个场景。 从Kafka流2.2.5中读取数据。通过Srpingboot发布 加载数据库表1 将数据从表1复制到表2 清理桌子1 要执行上述操作,我需要使用quartz的调度作业(已编写)暂停/恢复Kafka使用者,该作业将数据从表1复制到表2。但是在这个活动中,我希望我的Kafka听众暂停,一旦复制完成,它应该继续。 我的实施:

  • 我有一个Spring靴Kafka消费者 为了避免重新平衡,我尝试在KafkaContainer上调用pause()和resume(),但消费者总是在运行 我错过了什么吗?有人能指导我如何正确地达到要求的行为吗?

  • 当我们的kafka主题中有多个分区时,分区重新平衡是一件常见的事情吗? 这并不一定意味着我们的应用程序存在延迟或问题? 我一直看到分区被撤销和重新分配的日志。

  • 我们正在使用spring cloude stream 2.0 现在在Spring Cloud stream 2.0中,有一种使用执行器管理绑定器生命周期的方法:绑定可视化和控制 是否可以从代码控制绑定器的生命周期,这意味着在目标服务器关闭的情况下,绑定器,当它启动时,?

  • 问题内容: 我有一个基本的Swing UI,带有一个标记为“播放”的按钮。按下按钮后,标签变为“暂停”。现在,当按下按钮时,它变为“继续”。 在“播放”中,我将实例化并执行一个SwingWorker。我想要的是能够暂停该线程(不要取消该线程),并根据上述按钮按下来恢复它。但是,我不想在doInBackground()中求助于Thread.sleep()。这似乎有点骇人听闻。有什么方法可以阻止运行d