我在spring中有一个服务,它需要使用十种不同的方法获取数据。
我希望这些方法并行执行,以执行一些DB操作并返回到父线程。但是父线程应该等到所有响应出现,然后返回响应。
在我当前的方法中,我使用反应式mono异步执行所有方法,但主线程不等待订阅者方法完成。
下面是我订阅的两种方法
private Mono<BaseResponse> getProfileDetails(long profileId){
return new Mono<BaseResponse>() {
@Override
public void subscribe(Subscriber<? super BaseResponse> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getProfileDetails");
s.onNext(new BaseResponse());
}
};
}
private Mono<Address> getAddressDetails(long profileId){
return new Mono<Address>() {
@Override
public void subscribe(Subscriber<? super Address> s) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// DB Operation
System.out.println("Inside getAddressDetails");
s.onNext(new Address());
}
};
}
下面是我的主要方法
public BaseResponse getDetails(long profileId){
ExecutorService executors = Executors.newFixedThreadPool(2);
Mono<BaseResponse> profileDetail = this.getProfileDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
Mono<BaseResponse> addressDetail = this.getAddressDetails(profileId).subscribeOn(Schedulers.fromExecutor(executors));
List<BaseResponse> list = new ArrayList<>();
profileDetail.mergeWith(addressDetail)
.subscribe(consumer -> {
list.add(consumer);
});
System.out.println("list: "+new Gson().toJson(list));
executors.shutdown();
return response;
}
以下是我的输出:
list: []
Inside getProfileDetails
Inside getAddressDetails
我的输出显示,主线程没有等待订阅服务器完成其任务,因此我如何处理这种情况?
我假设您的getProfileDetails()
和getAddressDetails()
方法只是占位符,因为它们在编写时没有多大意义。
也就是说,如果这是您在这里的整个应用程序,并且您真的只想在完成之前进行阻止,那么您也可以将当前的subscribe()
调用更改为doOnNext()
,然后只需blockLast()
:
profileDetail.mergeWith(addressDetail)
.doOnNext(consumer -> {
list.add(consumer);
})
.blockLast();
在反应性应用程序中,阻塞反应性线程通常是不明智的,这是有充分理由的,但在这种情况下,您实际上只是想在完全退出之前进行阻塞,所以我看不出有什么负面影响。
我试图利用固有的WSO2ESB主题发布到jms队列。我已经创建了主题,并提供了一个订阅者URL:jms:/topictest?transport.jms.destinationtype=queue。然而,当我将消息发布到主题时,它不能被传递到队列。日志生成以下内容 “系统无法从jms:/queue?destination=topictest URL推断传输信息。” 另外,我似乎不知道如何发布到WS
我有一个使用ActiveMQ的JMS生产者/订阅者的简单Spring应用程序,配置如下: 我试过所有可能的解决办法,但没有一个奏效。我们非常感谢任何帮助
我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”
我基本上需要从python服务器向设备发送命令,设备将发布对主题的回复,我需要捕获回复服务器端。要从服务器发布到设备,我正在使用boto3物联网数据模块。但是我如何订阅另一个主题以从设备获得回复?似乎没有办法使用aws python库。我需要使用像paho这样的遗传MQTT客户机吗? 谢谢你。
这似乎是最简单的解决办法。让我们看看流程: 第三方向RESTful API发送请求,以获取Windows Azure服务总线连接字符串-凭据-。 一旦拥有连接字符串,第三方就会连接到Windows服务总线,并开始从某个主题订阅接收消息。注意:连接字符串是在服务器端加密的,只能由接受的客户端解密。 优点 null null 第三方请求一个类似于RESTful的TCP API,以便订阅一些Window
来自第三次订阅的消息会发生什么情况,是否会在TTL之后发送到死信队列 有没有办法找出消息未被使用的订阅