我最近开始使用WebFlux,需要关于如何链接多个服务和聚合响应的建议。这4个服务及其响应POJO类似于以下示例:
class Response1{
String a1;
String a2;
}
class Response2{
String b1;
}
class Response3{
String c1;
}
class Response4{
String d1;
}
以及4项服务的签名:
Flux<Response1> service1();
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)
因此,需要为Flux中的每个响应1调用service2,为每个响应2调用service3。模型之间的关系是:
Response1 <1-----*>Response2 (1 to many),
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)
聚合的最终响应应类似于(JSON):
[
{
"a1": "",
"a2": "",
"d1": "",
"response2s": [
{
"b1": "",
"response3s": [
{
"c1": ""
}
]
}
]
}
]
所以首先我需要调用Service1,然后为每个响应调用service2,然后为每个响应调用service3(由service2返回)。另外,为service1返回的每个响应1调用service4(可以与service2和service3调用并行调用)。为了更新聚合的最终响应,我添加了两个额外的POJO以允许存储子响应,例如(相关位):
public class AggResponse extends Response1{
List<AggResponse2> response2s;// populated from service2 response
String d1; // populated from service4 response
public void add(AggResponse2 res2){
if(response2s == null)
response2s = new ArrayList<>();
response2s.add(res2);
}
}
和
public class AggResponse2 extends Response2{
List<Response3> response3s;// populated from service3 response
public void add(Response3 res3) {
if (response3s == null)
response3s = new ArrayList<>();
response3s.add(res3);
}
}
如何最好地进行链接,以便保留以前的响应数据,并在组合运算符时保留Accessponse对象中的所有数据?我尝试了以下几点:
public Flux<AggResponse> aggregate() {
return services.service1()
.map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
.flatMap(aggRes -> services.service2(aggRes.getA1())
.map(res2 -> {
AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
aggRes.add(aggRes2);
return aggRes2;
})
.flatMap(aggRes2 -> services.service3(aggRes2.getB1())
.map(res3 -> {
aggRes2.add(res3);
return res3;
})
.reduce(aggRes2, (a, aggRes3) -> aggRes2)
)
.reduce(aggRes, (a, aggRes2) -> aggRes)
)
.flatMap(aggRes -> services.service4(aggRes.getA1())
.map(res4 -> {
aggRes.setD1(res4.getD1());
return aggRes;
})
);
}
然而,我得到了以下不完整的答复:
[ {
"a1" : "a1v1",
"a2" : "a2v1"
} ]
我看到所有服务都被调用,就像我看到日志一样。两个问题:1。为什么看不到聚合的响应,会减少丢失它?2.有没有更好的方法来实现这一点?
使用Flux.CombineTest()
看起来更简单一些。
service1().flatMap(response1 -> Flux.combineLatest(
service2Service3(response1.a1).collectList(), // call service2 which call service3
service4(response1.a2), // call service4
(aggResponse2, response4)->{
//log.info(agg);
//log.info(response4);
FinalResponse agg = FinalResponse.builder()
.a1(response1.a1)
.a2(response1.a2)
.d1(response4.d1)
.response2s(aggResponse2)
.build();
return agg;
})
)
这是完整的源代码。
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
import java.util.List;
@Log4j2
public class CombiningTests {
@Data
@AllArgsConstructor
@ToString
static class Response1{
String a1;
String a2;
}
@Data
@AllArgsConstructor
@ToString
static class Response2{
String b1;
public Response2(){
}
}
@Data
@AllArgsConstructor
@ToString
static class Response3{
String c1;
}
@Data
@AllArgsConstructor
@ToString
static class Response4{
String d1;
}
@Data
@ToString(callSuper=true)
static class AggResponse2 extends Response2{
List<Response3> response3s;
public AggResponse2(String b1, List<Response3> response3s) {
super(b1);
this.response3s = response3s;
}
}
@Data
@ToString
@Builder
static class FinalResponse {
final String a1;
final String a2;
final String d1;
final List<AggResponse2> response2s;
}
static Flux<Response1> service1(){
//
return Flux
.just(new Response1("a1", "a2"))
.delayElements(Duration.ofMillis(4));
}
static Flux<Response2> service2(String a1){
//
return Flux
.just(new Response2("b1-" + a1), new Response2("b2-" + a1))
.delayElements(Duration.ofMillis(3));
}
static Flux<Response3> service3(String b1){
//
return Flux
.just(new Response3("c1-" + b1), new Response3("c2-" + b1))
.delayElements(Duration.ofMillis(5));
}
static Mono<Response4> service4(String a2){
//
return Mono
.just(new Response4("d1-" + a2))
.delayElement(Duration.ofMillis(8));
}
static Flux<AggResponse2> service2Service3(String a1){
return service2(a1)
.flatMap(
x2->service3(x2.b1)
.collectList()
.map(x3->new AggResponse2(x2.b1, x3))
);
}
/**
* service1() 1 -----> * service2() 1 --> * service3()
* |--> 1 service4()
*/
@Test
void testComplexCombineLatest(){
ObjectMapper objectMapper = new ObjectMapper();
service1().flatMap(response1 -> Flux.combineLatest(
service2Service3(response1.a1).collectList(), // call service2 which call service3
service4(response1.a2), // call service4
(aggResponse2, response4)->{
//log.info(agg);
//log.info(response4);
FinalResponse agg = FinalResponse.builder()
.a1(response1.a1)
.a2(response1.a2)
.d1(response4.d1)
.response2s(aggResponse2)
.build();
return agg;
})
).doOnNext(e->{
try {
String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(e);
System.out.println("JSON = " + json);
} catch (Exception ex) {
//e.printStackTrace();
}
}).blockLast();
}
}
以下是json格式的执行结果:
JSON = {
"a1" : "a1",
"a2" : "a2",
"d1" : "d1-a2",
"response2s" : [ {
"b1" : "b1-a1",
"response3s" : [ {
"c1" : "c1-b1-a1"
}, {
"c1" : "c2-b1-a1"
} ]
}, {
"b1" : "b2-a1",
"response3s" : [ {
"c1" : "c1-b2-a1"
}, {
"c1" : "c2-b2-a1"
} ]
} ]
}
如果您不想等待service2
的next
信号,您可以使用merge
方法。大概是这样的:
return service1().flatMap(response1 ->
Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
.reduce((aggResponse, aggResponse2) -> new AggResponse(
response1.a1,
response1.a2,
Optional.ofNullable(aggResponse.d1)
.orElse(aggResponse2.d1),
Optional.ofNullable(aggResponse.response2s)
.orElse(aggResponse2.response2s))));
实用程序类和方法:
class AggContainer {
final String b1;
final List<Response3> response3s;
AggContainer(String b1, List<Response3> response3s) {
this.b1 = b1;
this.response3s = response3s;
}
}
class AggResponse {
final String a1;
final String a2;
final String d1;
final List<AggContainer> response2s;
AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
this.a1 = a1;
this.a2 = a2;
this.d1 = d1;
this.response2s = response2s;
}
AggResponse(String d1) {
this.a1 = null;
this.a2 = null;
this.d1 = d1;
this.response2s = null;
}
AggResponse(List<AggContainer> response2s) {
this.a1 = null;
this.a2 = null;
this.d1 = null;
this.response2s = response2s;
}
}
private Mono<AggResponse> service23Agg(String a1) {
return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
.map(response3s -> new AggContainer(response2.b1, response3s)))
.collectList()
.map(AggResponse::new);
}
private Mono<AggResponse> service4Agg(String a2) {
return service4(a2).map(response4 -> new AggResponse(response4.d1));
}
在异步环境中,您应该非常小心处理可变集合。避免在反应性管道内更换。
我们有一个登录后显示的用户仪表板。仪表板由多个小部件组成。每个小部件从单独的restful服务中提取内容。例如:新闻/邮件/问题/警报。每个小部件在加载到页面上后调用服务。这样就有多个webservice调用。 有没有办法减少多次通话。 它的工作方式是当第一次加载页面时,服务应该在单个调用中返回所有小部件的聚合数据。 每个服务也应该独立可用,以便可以用于刷新单个小部件和其他系统集成。 注意:小部件
我正在为一个Rails应用程序进行搜索引擎优化。 该网站是完全本地化的,我下面的这篇谷歌文章,以添加hirang备用链接到网页。
如何测试接收并返回
我构建了一个基于文件下载器RMI客户端-服务器的应用程序。在这方面,我不了解不同客户端的工作。文件位于服务器端,RMI在其一侧有等效的代理服务器(称为存根)。我为这个应用程序使用了10个客户端和1个服务器。 问题1-我的问题是当多个客户端在从注册表查找后同时进行RMI调用时,客户端以什么顺序为他们播种/提供文件?意味着底层服务算法---? 我无法理解这一点,当我执行代码时,我只看到相应的文件正在同
我在Scala上使用Play 2.5,我创建了一个类,可以多次调用外部web服务。 外部Web服务在某些条件下被调用并得到ok或nok的简单响应。如果可以,那么我应该更新内部对象状态,如果可以,我现在什么也不做。 这是我的类,它将String的列表作为参数,并返回要在控制器中处理的对象的Future列表。 是列表类型的列表,但我希望它只是一个简单的响应列表。 1)如何简化和纠正我的代码以获得响应列
我在架构“ID”中的链接服务器(宿主服务器)上有一个名为“Application”的表。我正在尝试使用< code>sp_help查看相同的详细信息。 但我做不到。问题是:1.sp_help位于schema中。(很明显!)2.但我的表位于另一个名为的模式中。(例如,从ID.Application中选择*) > 我已经将链接服务器添加到我的本地sql server management studio