使用Spring Cloud Stream Kafka应用程序,我们如何确保流侦听器等待处理消息,直到一些依赖任务(例如引用数据填充)完成?下面的应用程序无法处理消息,因为消息传递得太早。我们如何保证Spring Boot App中的这种排序?
@Service
public class ApplicationStartupService implements ApplicationRunner {
private final FooReferenceDataService fooReferenceDataService;
@Override
public void run(ApplicationArguments args) throws Exception {
fooReferenceDataService.loadData();
}
}
@EnableBinding(MyBinding.class)
public class MyFooStreamProcessor {
@Autowired FooService fooService;
@StreamListener("my-input")
public void process(KStream<String, Foo> input) {
input.foreach((k,v)-> {
// !!! this fails to save
// messages are delivered too early before foo reference data got loaded into database
fooService.save(v);
});
}
}
我发现截至 2018 年 5 月 15 日,Spring云流中不可用。
Kafka -延迟绑定,直到复杂的服务初始化完成
我们是否有一个计划/时间表来支持这一点?
与此同时,我通过使用@Ordered和ApplicationRunner实现了我想要的目标。它很乱,但有效。基本上,流侦听器将等到其他工作完成。
@Service
@Order(1)
public class ApplicationStartupService implements ApplicationRunner {
private final FooReferenceDataService fooReferenceDataService;
@Override
public void run(ApplicationArguments args) throws Exception {
fooReferenceDataService.loadData();
}
}
@EnableBinding(MyBinding.class)
@Order(2)
public class MyFooStreamProcessor implements ApplicationRunner {
@Autowired FooService fooService;
private final AtomicBoolean ready = new AtomicBoolean(false);
@StreamListener("my-input")
public void process(KStream<String, Foo> input) {
input.foreach((k,v)-> {
while (ready.get() == false) {
try {
log.info("sleeping for other dependent components to finish initialization");
Thread.sleep(10000);
} catch (InterruptedException e) {
log.info("woke up");
}
}
fooService.save(v);
});
}
@Override
public void run(ApplicationArguments args) throws Exception {
ready.set(true);
}
}
我想创建一个基于JavaFx WebEngine的自定义FunctionPlotter组件。我的情节将在浏览器中显示。在执行plot命令之前,我必须等待浏览器初始化(它加载d3.js)。目前,我这样做的方法是将我的绘图表达式放入一个Runnable中,并将该Runnable传递给FunctionPlotter。(FunctionPlotter将runnable传递给浏览器的loading fini
我正在将Spring Boot应用程序从版本1.5.6升级到2.1.1。当我启动应用程序时,它会卡在这一行: 当我点击这个网址:http://localhost:8888/actuator/health,我得到 同样,当我点击这个网址:http://localhost:8888/swagger-ui.html,我看到斯瓦格用户界面。 但是我的主应用程序没有启动。知道为什么它卡住了吗?
在下面的示例中,我有两个正在处理来自kafka的消息的服务实例,但我希望确保只在之后处理。 显然,通过将一个实例配置为仅从特定分区消费,可以很容易地解决这种情况,该分区将存储带有公共标识符的消息: 现在顺序得到了保证,将永远不会在之前处理。 但是,我在想这个问题是否可以用另一种方式来解决,直接在代码中而不是依赖基础设施?这看起来可能是微服务架构中的一个标准问题,但我不确定哪种方法是解决它的首选方法
问题内容: 暂时禁用消息侦听器的好方法是什么?我要解决的问题是: 消息侦听器接收到JMS消息 尝试处理该消息时出现错误。 我等待系统再次准备就绪,以便能够处理该消息。 在系统准备就绪之前,我不再需要任何消息,因此… …我想禁用消息监听器。 我的系统已准备好再次处理。 处理失败的消息,并确认JMS消息。 再次启用消息监听器。 现在,我正在使用Sun App Server。我通过在MessageCon
版本 维特。x核心:3.5.0 vert. x redis客户端:3.5.0 上下文 2018-06-02 17:40:55.981错误4933---[ntlop-thread-2]io.vertx.redis.impl.重新连接:没有处理程序等待消息:14751915 2018-06-02 17:41:10.937错误4933---[ntloop-thread-2]io。维特斯。雷迪斯。impl
我有以下兔子听者: 我需要将listener配置为在它处理一条消息后等待15分钟,然后再接收下一条消息。不需要在此方法中等待。我所需要的只是在处理完一条后不接收任何消息。可以通过来完成,但我不确定这是否是实现这一点的最佳方法。对于这种情况有没有rabbitmq的配置?