当前位置: 首页 > 知识库问答 >
问题:

如何部署处于暂停模式的kafka使用者,直到我发出开始使用消息的信号

伯和蔼
2023-03-14

我使用的是spring kafka 2.2。8,并试图了解是否有一个选项可以部署处于暂停模式的Kafka消费者,直到我发出信号开始使用消息。请建议。

我在下面的帖子中看到,我们可以暂停并启动消费者,但我需要消费者在部署时处于暂停模式。如何使用spring kafka暂停并恢复@KafkaListener

共有1个答案

顾炎彬
2023-03-14

@KafkaListener(id=“foo”,…,autoStartup=“false”)

然后在准备就绪时使用KafkaListenerEndpoint注册表启动它

registry.getListenerContainer("foo").start();

在暂停模式下启动它没有多大意义,但您可以这样做。。。

@SpringBootApplication
public class So62329274Application {

    public static void main(String[] args) {
        SpringApplication.run(So62329274Application.class, args);
    }


    @KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so62329274", "foo");
            registry.getListenerContainer("so62329274").pause();
            registry.getListenerContainer("so62329274").start();
            System.in.read();
            registry.getListenerContainer("so62329274").resume();
        };
    }

}

分配分区时,您将看到如下日志消息

Kafka由于重新平衡而恢复消费;消费者再次暂停,因此初始poll()将永远不会返回任何记录

 类似资料:
  • 我使用的是spring引导版本1.3.2。我正在使用@JMSListener从activemq获取我使用JMSTemplate创建/生成的消息。代码如下: 现在当从smtp出现连接失败错误时,我想暂停@JMSListener一段时间,然后重新开始使用消息。对于这个用例,我还没有看到使用@JMSListener的更好的例子。由于我使用的是spring boot,我在应用程序属性中添加了activem

  • 我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?

  • 我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?

  • 我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)

  • 例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?