当前位置: 首页 > 知识库问答 >
问题:

停止Spring Cloud Stream@StreamListener侦听,直到收到某个Spring事件

江恩
2023-03-14

我正在开发Camunda BPM Spring Boot应用程序。该应用程序使用Spring Cloud Stream从Rabbitmq队列中读取消息。一旦收到消息,应用程序就会调用Camunda中的进程实例。

如果在应用程序启动期间rabbitmq队列中已经存在消息,则云流侦听器甚至在初始化Camunda之前就开始读取消息。

是否可以停止云流侦听器侦听队列,直到触发某个事件为止?在本例中为PostDeployEvent。

我已经创建了一个示例应用程序供参考https://github.com/kpkurian/spring-cloud-stream-camunda

谢谢

共有2个答案

方寒
2023-03-14

Spring cloud stream(Kafka活页夹)添加了暂停和恢复消费者的方法

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
        System.out.println(in);
        consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
    }

    @Bean
    public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
        return event -> {
            System.out.println(event);
            if (event.getConsumer().paused().size() > 0) {
                event.getConsumer().resume(event.getConsumer().paused());
            }
        };
    }
}

请检查文档https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples

但是我认为暂停方法有一些问题:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479

PS/您可以在示例侦听器中获取partion id和主题名称:

  @StreamListener(Sink.INPUT)
  public void in(String in,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
    System.out.println(in);
    TopicPartition p = new TopicPartition(topic, partition);
    consumer.pause(Collections.singleton(p));
  }

或在errorChannel全局侦听器中

   @StreamListener("errorChannel")
  public void errorGlobal(Message<?> message) {
    Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
    Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
    int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
    String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
    TopicPartition p = new TopicPartition(topic, partition);
    // ?
    consumer.pause(Collections.singleton(p));
}
夔高寒
2023-03-14

根据@OlegZhurakousky的建议

问题

RuntimeService是自动装配的,在启动应用程序时,假设所有服务、bean等都已完全初始化。如果它仍在经历初始化和启动过程,那么从Spring习惯用法的角度来看,它没有正确实现。

解决方案

使用自定义生命周期实现包装RuntimeService,直到执行其start()方法确保RuntmeService准备就绪后才会返回。

我已经在示例github应用程序中实现了这一点

 类似资料:
  • 有没有办法防止事件链中的一些听众为事件开火,但允许链上的其他人开火? 比如我有这个结构 假设我已经将单击事件监听器连接到body、div#1和div#2。是否有可能在div#2事件监听器上阻止事件进入div#1或介于两者之间的任何其他监听器,并允许事件在body元素上触发? 我这样说是因为我使用谷歌地图和emberjs构建了一系列可以在地图上显示的交互式信息框。问题是ember将事件侦听器附加到b

  • 我创建了一个垂直,使用并交换X和Y坐标。(我使用这种方法) 现在,我想让它做的是在某一点停止滚动(我想让最后一个视图只占屏幕高度的65%,但全宽)。通常,在这种情况下,我会覆盖< code>getPageWidth(),但是由于我的宽度和高度现在有点混淆,当我这样做时,我的视图占据了屏幕高度和宽度的65%。 那么我应该如何解决这个问题呢? 谢谢! MyViewPager.java MyPagerA

  • 我有一个应用程序,从Kafka获取消息,并调用目标系统来更新遗留的Oracle DB。 我想启用一个场景,如果目标系统关闭,将消息留在Kafka总线上,并且在给定的时间段内不处理它们。我在考虑一些基于Hystrix的断路器解决方案,但我找不到任何机制来告诉Spring Cloud Stream“停止”事件监听。我能想到的另一种选择是,如果断路器断开,将这些信息传输到错误/重新处理主题,但这听起来像

  • 问题内容: 当电话状态恢复为IDLE时,我正在拨打电话并使用电话监听器重新开始活动。但是当我完成活动后,电话监听器仍在运行,因此,当用户进行自己的通话并挂断时,我的活动又重新启动! 问题答案: 使用作为参数传递给方法停止监听更新。

  • 我有一个很奇怪的问题快把我逼疯了。 我有一个Ruby服务器和一个Flash客户端(动作脚本3)。这是一个多人游戏。 问题是,一切都在完美地工作,然后,突然,一个随机的球员停止接收数据。当服务器因为不活动而关闭连接时,大约20-60秒后,客户端接收到所有缓冲的数据。 null 我不认为这是一个操作系统/网络问题,因为我已经从一个位于西班牙的VPS换到了位于爱尔兰的亚马逊EC2,问题仍然存在。 我觉得

  • ap.onBackgroundAudioStop(CALLBACK) 监听音乐停止事件。 代码示例 <script src="https://gw.alipayobjects.com/as/g/h5-lib/alipayjsapi/3.1.1/alipayjsapi.inc.min.js"></script> <style>.output{ display:block; max-width: 1