当前位置: 首页 > 知识库问答 >
问题:

如何确保Spring云流监听器在启动时等待处理消息,直到应用程序完全初始化?

郑嘉悦
2023-03-14

使用Spring Cloud Stream Kafka应用程序,我们如何确保流侦听器等待处理消息,直到一些依赖任务(例如引用数据填充)完成?下面的应用程序无法处理消息,因为消息传递得太早。我们如何保证Spring Boot App中的这种排序?

@Service
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {

  @Autowired FooService fooService;

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          // !!! this fails to save
          // messages are delivered too early before foo reference data got loaded into database
          fooService.save(v);
      });         
  }
}
    < li >春-云-流:2.1.0.RELEASE < li >Spring启动:2.1.2 .释放

我发现截至 2018 年 5 月 15 日,Spring云流中不可用。

Kafka -延迟绑定,直到复杂的服务初始化完成

我们是否有一个计划/时间表来支持这一点?

共有1个答案

东方宜
2023-03-14

与此同时,我通过使用@Ordered和ApplicationRunner实现了我想要的目标。它很乱,但有效。基本上,流侦听器将等到其他工作完成。

@Service
@Order(1)
public class ApplicationStartupService implements ApplicationRunner {

  private final FooReferenceDataService fooReferenceDataService;

  @Override
  public void run(ApplicationArguments args) throws Exception {
      fooReferenceDataService.loadData();
  }
}

@EnableBinding(MyBinding.class)
@Order(2)
public class MyFooStreamProcessor implements ApplicationRunner {

  @Autowired FooService fooService;
  private final AtomicBoolean ready = new AtomicBoolean(false);

  @StreamListener("my-input")
  public void process(KStream<String, Foo> input) {
      input.foreach((k,v)-> {
          while (ready.get() == false) {
            try {
              log.info("sleeping for other dependent components to finish initialization");
              Thread.sleep(10000);
            } catch (InterruptedException e) {
              log.info("woke up");
            }
          }
          fooService.save(v);
      });         
  }

  @Override
  public void run(ApplicationArguments args) throws Exception {
    ready.set(true);
  }
}
 类似资料:
  • 我想创建一个基于JavaFx WebEngine的自定义FunctionPlotter组件。我的情节将在浏览器中显示。在执行plot命令之前,我必须等待浏览器初始化(它加载d3.js)。目前,我这样做的方法是将我的绘图表达式放入一个Runnable中,并将该Runnable传递给FunctionPlotter。(FunctionPlotter将runnable传递给浏览器的loading fini

  • 我正在将Spring Boot应用程序从版本1.5.6升级到2.1.1。当我启动应用程序时,它会卡在这一行: 当我点击这个网址:http://localhost:8888/actuator/health,我得到 同样,当我点击这个网址:http://localhost:8888/swagger-ui.html,我看到斯瓦格用户界面。 但是我的主应用程序没有启动。知道为什么它卡住了吗?

  • 在下面的示例中,我有两个正在处理来自kafka的消息的服务实例,但我希望确保只在之后处理。 显然,通过将一个实例配置为仅从特定分区消费,可以很容易地解决这种情况,该分区将存储带有公共标识符的消息: 现在顺序得到了保证,将永远不会在之前处理。 但是,我在想这个问题是否可以用另一种方式来解决,直接在代码中而不是依赖基础设施?这看起来可能是微服务架构中的一个标准问题,但我不确定哪种方法是解决它的首选方法

  • 问题内容: 暂时禁用消息侦听器的好方法是什么?我要解决的问题是: 消息侦听器接收到JMS消息 尝试处理该消息时出现错误。 我等待系统再次准备就绪,以便能够处理该消息。 在系统准备就绪之前,我不再需要任何消息,因此… …我想禁用消息监听器。 我的系统已准备好再次处理。 处理失败的消息,并确认JMS消息。 再次启用消息监听器。 现在,我正在使用Sun App Server。我通过在MessageCon

  • 版本 维特。x核心:3.5.0 vert. x redis客户端:3.5.0 上下文 2018-06-02 17:40:55.981错误4933---[ntlop-thread-2]io.vertx.redis.impl.重新连接:没有处理程序等待消息:14751915 2018-06-02 17:41:10.937错误4933---[ntloop-thread-2]io。维特斯。雷迪斯。impl

  • 我有以下兔子听者: 我需要将listener配置为在它处理一条消息后等待15分钟,然后再接收下一条消息。不需要在此方法中等待。我所需要的只是在处理完一条后不接收任何消息。可以通过来完成,但我不确定这是否是实现这一点的最佳方法。对于这种情况有没有rabbitmq的配置?