我是反应编程的新手,并尝试使用project reactor模拟下面的用例,但我发现将响应从一个服务调用传递到另一个依赖的服务有点困难。如有任何建议或参考,将不胜感激。
响应getDetails(Request inputRequest){
//Call two external services parallel based on the incoming request
Response1 = callExternalService1(inputRequest)
Response2 = callExternalService2(inputRequest)
//Call one external service based on the Response1 and Response2
Response3 = callExternalService3(Response1,Response2);
//Call three external service parallel based on the Response1, Response2 and Response3
Response4 = callExternalService4(Response1,Response2,Response3);
Response5 = callExternalService5(Response1,Response2,Response3);
Response6 = callExternalService6(Response1,Response2,Response3);
//Last service call
Response finalResponse= callLastExternalService(Response4,Response5,Response6);
return finalResponse;
}
我尝试了下面的示例,它对一个服务调用起作用,但不能将响应传递给其他依赖的服务调用。
Mono<Response> getDetails(Request inputRequest){
return Mono.just(inputRequest.getId())
.flatMap(id->{
DbResponse res = getDBCall(id).block();
if(res == null){
return Mono.error(new DBException(....));
}
return Mono.zip(callExternalService1(res),callExternalService2(inputRequest));
}).flatMap(response->{
Response extser1 = response.getT1();
Response extser2 = response.getT2();
//any exceptions?
return Mono.zip(Mono.just(extser1),Mono.just(extser2),callExternalService3();
}).flatMap(response->callExternalService4(response.getT1(),response.getT2(),response.getT3())
});
}
private Mono<DbResponse> getDBCall(String id) {
return Mono.fromCallable(()->dbservice.get(id))
.subscribeOn(Schedulers.boundedElastic());
}
如果您有n个呼叫,并且希望在steplock中移动(也就是说,如果您有来自所有呼叫的响应,则向前移动),请使用zip
。例如:
Mono.zip(call1, call2)
.flatMap(tuple2 -> {
ResponseEntity<?> r1 = tuple2.getT1(); //response from call1
ResponseEntity<?> r2 = tuple2.getT2(); //response from call2
return Mono.zip(Mono.just(r1), Mono.just(r2), call3);
})
.flatMap(tuple3 -> {
//at this point, you have r1, r2, r3. tuple3.getT1() response from call 1
return Mono.zip(call4, call5, call6); //tuple3.getT2() response from call 2, tuple3.getT3() response from call3
})
.flatMap(tuple3 -> callLastService);
注意:如果是伪代码,不会马上编译
你可以扩展上面的内容来回答你自己的问题。请注意,由于call1
和call2
是独立的,您可以使用subscribeON(Schedulers.BoundedeLastic())
并行运行它们
编辑:回答两个后续问题:
>
不需要使用block()
订阅,因为flatmap
会急切地订阅您的内部流。您可以执行以下操作:
Mono.just(inputRequest.getId())
.flatMap(a -> getDBCall(a).switchIfEmpty(Mono.defer(() -> Mono.error(..))))
注意:mono.callable(..)
如果调用对象返回空流,则返回空流。这就是switchifempty
的原因
问题内容: 我需要使用标题检查是否打开了一个外部窗口(另一个Java程序,但不受我正在使用的程序控制),如果打开了,则根据Java中的用户命令将其最大化或最小化(我只知道窗口的标题,什么也不知道)。Google只说我可以用来获取窗口句柄并使用该句柄对其进行操作,但是我找不到如何执行此操作。 我可以在此处找到有关如何使用JNI的参考: 在JavaSwing中,如何获取对窗口的Win32窗口句柄(hw
我正在考虑两个小的Spring Boot应用程序: 应用1:运行在http://localhost:8081上的小型web服务,实现一个简单的Spring控制器来响应/camel上的GET请求。访问http://localhost:8081/camel时,该服务仅返回“Hello World”。 应用程序2:一个小型应用程序,它应该执行对应用程序1的GET请求,并打印出对控制台的响应(在本例中为“
我想从Python中为大约8000个文件启动一个外部命令。每个文件都独立于其他文件进行处理。唯一的限制是在处理完所有文件后继续执行。我有4个物理核,每个物理核有2个逻辑核(返回8)。我的想法是使用一个由四个并行独立进程组成的池,这些进程将在8个内核中的4个上运行。这样我的机器就可以同时使用了。 以下是我一直在做的事情: 连续处理一批100个文件需要120秒。上述多处理版本(函数)只需20秒即可完成
我希望托管在 Openshift 集群中的应用程序面向外部 REST API,而无需在客户端应用程序中硬编码 IP/PORT,并且能够在不重新交付应用程序的情况下更改 IP/PORT。 我设法通过ConfigMap做到了这一点,但我看到也可能通过OpenShift doc中的服务做到这一点。 然而,我没有设法理解它是如何工作的。我做了以下操作: 在第一个curl中,使用endpoint中定义的地址
我试图处理一个用户输入,并允许只输入浮动。可以输入的浮点数是无限的,但是如果连续输入两个非浮点数,程序将结束。当程序结束时,它将打印所有数字的和。 问题是,每当我运行这个时,它立即运行while循环,将计数增加到2,并中断循环。在取消之前,您只能输入一个非浮点数。 编辑:正如你们中的一些人所指出的,count应该在while循环之前初始化
我正在传递多部分文件与其他用户信息。无法将类型的属性值转换为属性嵌套异常为 下面的代码我已经试过了 控制器类 @RequestMapping(value=RestMappingURLS.user.saveUser,headers={“Content-Type=Multipart/Mixed”,“Content-Type=Multipart/Form-Data”})public RestRespon