当前位置: 首页 > 知识库问答 >
问题:

控制Kafka听众消费

胡鸿禧
2023-03-14

所以首先,为了能够暂停/停止消费者,我必须访问MessageListenerContainer。这意味着,在配置中,我将创建:ConcurrentKafkaListenerContainerFactory并(从2.2开始)使用它创建ConcurrentMessageListenerContainer的托管bean。然后可以使用这个bean来启动/停止消费者。管用。一旦它是并发的...我假设,我传递给setupMessageListener的必须是无状态类的实例,以便可以从多个线程/使用者对其进行操作。因此,如果我想注入spring依赖项,就像我以前在bean上使用@kafkalistener注释方法一样,我可以在这里传递无状态单例bean实例。

现在关于重新定位:这似乎很容易。只需在messageListener类中实现ConsumerSeekAware,该类是通过setupMessageListener注册的,并存储回调。然后,您可以只需autowire ConsumerSeekAware messageListener singleton,并执行搜索。相关的MessageListenerContainer,不管分区/并发设置如何,都应该执行查找。因为当ConcurrentMessageListenerContainer以大于1的并发开始时,它会在所有分区上启动多个KafkaMessageListenerContainer(?),但是由于所有分区都共享相同的groupId,所以只有一个使用者会使用消息,或者b)每个KafkaMessageListenerContainer将有一些分区子集。但是在seek中,我们必须指定Topic+Partition+offset,所以无论在哪种情况下,seek都应该由适当的KafkaMessageListenerContainer来拾取。

对吧?

我知道这是令人生畏的短信/问题,但也许这对其他人也有用。

共有1个答案

濮阳旭东
2023-03-14

您不需要创建自己的容器

@KafkaListener(id = "foo", topics = " ... "

然后,获取对KafkalistenerEndpointRegistry(自动连线等)的引用。

然后

registry.getListenerContainer("foo").stop();
registery.stop();
 类似资料:
  • 大家好。我有一个Kafka项目,使用SpringKafka来听一个明确的主题。我需要一天听一次所有的信息,把它们放到一个集合中,然后在那里找到特定的信息。我无法理解如何用一个@KafkaListener方法读取所有消息。我的班级是: 我的事件集合的大小始终为1;我尝试使用不同的循环,但后来,我的收藏被归档了530000次。 更新:我已经找到了一种方法来做它与factory.setBatchList

  • 有什么不同吗?术语KafkaConsumer和KafkaListener可以互换使用吗?

  • 有人知道一个听众是否可以听下面这样的多个话题吗?我知道“主题1”很管用,如果我想添加其他主题呢?你能给我举个例子吗?谢谢你的帮助! 或者

  • 控制台-观众 控制台-观众-创建 控制台-观众-删除 控制台-观众-更新 控制台-观众-详情

  • 我们有一个服务器,负责处理消息的生成和消费。我们有4台笔记本电脑,所有带有confluent的Mac都运行相同的命令行。。。 /kafka avro控制台使用者--从一开始--引导服务器0.0.0.0:9092,0.0.0.0:9092--主题主题名称--属性schema.registry.url=http://0.0.0.0:8081 4台笔记本电脑中有3台没有问题使用这些消息,但是第四台不会。

  • 我第一次在kafka中使用Node,使用Kafka-Node。使用消息需要调用外部API,这甚至可能需要一秒钟的时间来响应。我希望克服我的消费者的突然失败,这样,如果一个消费者失败了,另一个将替换它的消费者将收到相同的消息,即它的工作没有完成。 我正在使用Kafka0.10并尝试使用ConsumerGroup。 我想到了在options中设置,并且只在消息的工作完成后提交消息(就像我以前对一些Ja