我使用的是spring kafka 2.2。8,并试图了解是否有一个选项可以部署处于暂停模式的Kafka消费者,直到我发出信号开始使用消息。请建议。
我在下面的帖子中看到,我们可以暂停并启动消费者,但我需要消费者在部署时处于暂停模式。如何使用spring kafka暂停并恢复@KafkaListener
@KafkaListener(id=“foo”,…,autoStartup=“false”)
然后在准备就绪时使用KafkaListenerEndpoint注册表
启动它
registry.getListenerContainer("foo").start();
在暂停模式下启动它没有多大意义,但您可以这样做。。。
@SpringBootApplication
public class So62329274Application {
public static void main(String[] args) {
SpringApplication.run(So62329274Application.class, args);
}
@KafkaListener(id = "so62329274", topics = "so62329274", autoStartup = "false")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so62329274").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry, KafkaTemplate<String, String> template) {
return args -> {
template.send("so62329274", "foo");
registry.getListenerContainer("so62329274").pause();
registry.getListenerContainer("so62329274").start();
System.in.read();
registry.getListenerContainer("so62329274").resume();
};
}
}
分配分区时,您将看到如下日志消息:
Kafka由于重新平衡而恢复消费;消费者再次暂停,因此初始poll()将永远不会返回任何记录
我使用的是spring引导版本1.3.2。我正在使用@JMSListener从activemq获取我使用JMSTemplate创建/生成的消息。代码如下: 现在当从smtp出现连接失败错误时,我想暂停@JMSListener一段时间,然后重新开始使用消息。对于这个用例,我还没有看到使用@JMSListener的更好的例子。由于我使用的是spring boot,我在应用程序属性中添加了activem
我正在从数据库中提取数据,以检查我是否有可用的系统资源来处理来自KafkaListener的进一步消息。如果我的条件没有满足,那么我希望@KafkaListener暂停,当条件满足时,我希望@KafkaListener恢复。我如何在SpringKafka实现这一点? 另外,为特定分区暂停消费者有什么缺点吗?
我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?
我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)
例如,我有一个消费者,最初在时间t1发送100条消息,然后我的消费者在t1+30秒启动并运行,那么我的消费者会使用t1+30秒之后发布的消息,还是会使用t1之后发布的消息?