当前位置: 首页 > 知识库问答 >
问题:

如何停止Spring Kafka Listner并调用一些函数在消费者停止后执行?

卢志行
2023-03-14

我有一个场景,我需要停止kafka并调用一些函数在消费者停止后执行。同样,flow是这样的:

  1. 消费来自kafka主题的消息
  2. 将每个消耗的消息添加到文件中
  3. 停止kafka监听器,如果它在过去10秒内没有收到任何消息
  4. 为Ex调用一些函数:UploadFileToS3()

我在我的消费者方法中使用了spring kafka的@KafkaListener注释。

我知道停止使用@KafkaListener注释的Kafka消费者,我可以使用KafkaListenerEndpoint注册表

我想要这样的东西:

if(not consumed messages for more than 10s) {
    kafkaListenerEndpointRegistry.getListenerContainer("my-listener-id").stop();
    UploadFileToS3()
}

我如何使用Spring Kafka做到这一点?

共有1个答案

裴令秋
2023-03-14

请参见ListenerContainerIdleEvent:

/**
 * An event that is emitted when a container is idle if the container
 * is configured to do so.
 *
 * @author Gary Russell
 *
 */
public class ListenerContainerIdleEvent extends KafkaEvent {

文件如下:https://docs.spring.io/spring-kafka/docs/current/reference/html/#idle-容器

 类似资料:
  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我正在使用带有KafkaListener注释的spring kafka v2.5.2。 在运行时,我希望能够向消费者发送停止消费的信号。 我看到了autoStartup参数,但它似乎只对初始化有效,之后无法更改。 我看到了KafkaListenerEndpointRegistry的methode close()。。。 你有什么建议吗? 提前谢谢。

  • 我在项目中使用solace作为JMS提供者。我使用spring CachingConnectionFactory检索连接。在这个连接上,我创建了一个新会话。我在那个会话中创建了一个消费者的线程。 我正在做一些故障转移测试。当我将服务器从网络连接上拔下时,它会失败。当我再次连接服务器时,仍会收到相同的异常: 更重要的是,CachingConnectionFactory默认将reConnectOnEx

  • 我正在搜索框上使用事件 列的值列表,在变量中提到。 问题 什么时候 如果我输入一个字符(比如),筛选器工作正常。但是当我连续输入字符时,浏览器挂起了。原因是筛选器一直在激发筛选器框上的每一个输入(因为am使用事件) 我想要的 我在angular-2和打字。但是这个问题并不只是与或或或相关,因为我想要的是一个想法,而不是解决方案。所以我会为这个问题添加这些标记。不要移除。谢谢

  • 我正尝试使用Camel以事务方式从JMS队列中消费一条消息。特别是在这样的流程中: 等待消息在JMS队列上发布 尝试消费和处理单个消息 如果处理失败(发生异常),回滚消耗 如果处理通过,确认并停止使用更多消息 在应用程序生命周期的后期,另一个进程触发消费从(1)重新开始 起初,我试图使用轮询消费者,使用ConsumerTemplate来做这件事,但是我不知道是否可以通过事务来做这件事——似乎事务是

  • 我想这个话题发生了什么...偏移坏了还是我不知道... 有人知道会发生什么吗?谢谢