当前位置: 首页 > 知识库问答 >
问题:

WebFlux链接调用多个服务和响应聚合

丌官坚秉
2023-03-14

我最近开始使用WebFlux,需要关于如何链接多个服务和聚合响应的建议。这4个服务及其响应POJO类似于以下示例:

class Response1{
   String a1;
   String a2;
}

class Response2{
   String b1;
}

class Response3{
   String c1;
}

class Response4{
   String d1;
}

以及4项服务的签名:

Flux<Response1> service1(); 
Flux<Response2> service2(String a1); //output field of Response1(service 1)
Flux<Response3> service3(String b1); //output field of Response2(service 2)
Mono<Response4> service4(String a2); //output field of Response1(service 1)

因此,需要为Flux中的每个响应1调用service2,为每个响应2调用service3。模型之间的关系是:

Response1 <1-----*>Response2 (1 to many), 
Response2 <1-----*>Response3 (1 to many),
Response1 <1-----1>Response4 (1 to 1)

聚合的最终响应应类似于(JSON):

[
  {
    "a1": "",
    "a2": "",
    "d1": "",
    "response2s": [
      {
        "b1": "",
        "response3s": [
          {
            "c1": ""
          }
        ]
      }
    ]
  }
]

所以首先我需要调用Service1,然后为每个响应调用service2,然后为每个响应调用service3(由service2返回)。另外,为service1返回的每个响应1调用service4(可以与service2和service3调用并行调用)。为了更新聚合的最终响应,我添加了两个额外的POJO以允许存储子响应,例如(相关位):

public class AggResponse extends Response1{
    List<AggResponse2> response2s;// populated from service2 response
    String d1; // populated from service4 response

    public void add(AggResponse2 res2){
        if(response2s == null)
            response2s = new ArrayList<>();
        response2s.add(res2);
    }
}

public class AggResponse2 extends Response2{
    List<Response3> response3s;// populated from service3 response

    public void add(Response3 res3) {
        if (response3s == null)
            response3s = new ArrayList<>();
        response3s.add(res3);
    }
}

如何最好地进行链接,以便保留以前的响应数据,并在组合运算符时保留Accessponse对象中的所有数据?我尝试了以下几点:

public Flux<AggResponse> aggregate() {
    return services.service1()
            .map(res1 -> new AggResponse(res1.getA1(), res1.getA2()))
            .flatMap(aggRes -> services.service2(aggRes.getA1())
                    .map(res2 -> {
                        AggResponse2 aggRes2 = new AggResponse2(res2.getB1());
                        aggRes.add(aggRes2);
                        return aggRes2;
                    })
                    .flatMap(aggRes2 -> services.service3(aggRes2.getB1())
                            .map(res3 -> {
                                aggRes2.add(res3);
                                return res3;
                            })
                            .reduce(aggRes2, (a, aggRes3) -> aggRes2)
                    )
                    .reduce(aggRes, (a, aggRes2) -> aggRes)
            )
            .flatMap(aggRes -> services.service4(aggRes.getA1())
                    .map(res4 -> {
                        aggRes.setD1(res4.getD1());
                        return aggRes;
                    })
            );
}

然而,我得到了以下不完整的答复:

[ {
  "a1" : "a1v1",
  "a2" : "a2v1"
} ]

我看到所有服务都被调用,就像我看到日志一样。两个问题:1。为什么看不到聚合的响应,会减少丢失它?2.有没有更好的方法来实现这一点?

共有2个答案

冀俊良
2023-03-14

使用Flux.CombineTest()看起来更简单一些。

 service1().flatMap(response1 -> Flux.combineLatest(
      service2Service3(response1.a1).collectList(), // call service2 which call service3
      service4(response1.a2),                       // call service4
      (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            )

这是完整的源代码。

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import lombok.extern.log4j.Log4j2;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;

@Log4j2
public class CombiningTests {
    @Data
    @AllArgsConstructor
    @ToString
    static class Response1{
        String a1;
        String a2;
    }

    @Data
    @AllArgsConstructor
    @ToString
    static class Response2{
        String b1;
        public Response2(){
        }
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response3{
        String c1;
    }
    @Data
    @AllArgsConstructor
    @ToString
    static class Response4{
        String d1;
    }

    @Data
    @ToString(callSuper=true)
    static class AggResponse2 extends Response2{
        List<Response3> response3s;
        public AggResponse2(String b1, List<Response3> response3s) {
            super(b1);
            this.response3s = response3s;
        }
    }

    @Data
    @ToString
    @Builder
    static class FinalResponse {
        final String a1;
        final String a2;
        final String d1;
        final List<AggResponse2> response2s;
    }

    static Flux<Response1> service1(){
        //
        return Flux
                .just(new Response1("a1", "a2"))
                .delayElements(Duration.ofMillis(4));
    }
    static Flux<Response2> service2(String a1){
        //
        return Flux
                .just(new Response2("b1-" + a1), new Response2("b2-" + a1))
                .delayElements(Duration.ofMillis(3));
    }
    static Flux<Response3> service3(String b1){
        //
        return Flux
                .just(new Response3("c1-" + b1), new Response3("c2-" + b1))
                .delayElements(Duration.ofMillis(5));
    }
    static Mono<Response4> service4(String a2){
        //
        return Mono
                .just(new Response4("d1-" + a2))
                .delayElement(Duration.ofMillis(8));
    }

    static Flux<AggResponse2> service2Service3(String a1){
        return service2(a1)
                .flatMap(
                        x2->service3(x2.b1)
                                .collectList()
                                .map(x3->new AggResponse2(x2.b1, x3))
                );
    }
    /**
     * service1() 1 -----> * service2() 1 --> * service3()
     *                |--> 1 service4()
     */
    @Test
    void testComplexCombineLatest(){
        ObjectMapper objectMapper = new ObjectMapper();
        service1().flatMap(response1 -> Flux.combineLatest(
                service2Service3(response1.a1).collectList(), // call service2 which call service3
                service4(response1.a2),                       // call service4
                (aggResponse2, response4)->{
                    //log.info(agg);
                    //log.info(response4);
                    FinalResponse agg = FinalResponse.builder()
                            .a1(response1.a1)
                            .a2(response1.a2)
                            .d1(response4.d1)
                            .response2s(aggResponse2)
                            .build();
                    return agg;
                })
            ).doOnNext(e->{
                    try {
                        String json = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(e);
                        System.out.println("JSON = " + json);
                    } catch (Exception ex) {
                        //e.printStackTrace();
                    }
            }).blockLast();
    }
}

以下是json格式的执行结果:

JSON = {
  "a1" : "a1",
  "a2" : "a2",
  "d1" : "d1-a2",
  "response2s" : [ {
    "b1" : "b1-a1",
    "response3s" : [ {
      "c1" : "c1-b1-a1"
    }, {
      "c1" : "c2-b1-a1"
    } ]
  }, {
    "b1" : "b2-a1",
    "response3s" : [ {
      "c1" : "c1-b2-a1"
    }, {
      "c1" : "c2-b2-a1"
    } ]
  } ]
}

乐正烨熠
2023-03-14

如果您不想等待service2next信号,您可以使用merge方法。大概是这样的:

return service1().flatMap(response1 ->
        Flux.merge(service23Agg(response1.a1), service4Agg(response1.a2))
                .reduce((aggResponse, aggResponse2) -> new AggResponse(
                        response1.a1,
                        response1.a2,
                        Optional.ofNullable(aggResponse.d1)
                                .orElse(aggResponse2.d1),
                        Optional.ofNullable(aggResponse.response2s)
                                .orElse(aggResponse2.response2s))));

实用程序类和方法:

class AggContainer {
    final String b1;
    final List<Response3> response3s;

    AggContainer(String b1, List<Response3> response3s) {
        this.b1 = b1;
        this.response3s = response3s;
    }
}

class AggResponse {
    final String a1;
    final String a2;
    final String d1;
    final List<AggContainer> response2s;

    AggResponse(String a1, String a2, String d1, List<AggContainer> response2s) {
        this.a1 = a1;
        this.a2 = a2;
        this.d1 = d1;
        this.response2s = response2s;
    }

    AggResponse(String d1) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = d1;
        this.response2s = null;
    }

    AggResponse(List<AggContainer> response2s) {
        this.a1 = null;
        this.a2 = null;
        this.d1 = null;
        this.response2s = response2s;
    }
}

private Mono<AggResponse> service23Agg(String a1) {
    return service2(a1).flatMap(response2 -> service3(response2.b1).collectList()
            .map(response3s -> new AggContainer(response2.b1, response3s)))
            .collectList()
            .map(AggResponse::new);
}

private Mono<AggResponse> service4Agg(String a2) {
    return service4(a2).map(response4 -> new AggResponse(response4.d1));
}

在异步环境中,您应该非常小心处理可变集合。避免在反应性管道内更换。

 类似资料:
  • 我们有一个登录后显示的用户仪表板。仪表板由多个小部件组成。每个小部件从单独的restful服务中提取内容。例如:新闻/邮件/问题/警报。每个小部件在加载到页面上后调用服务。这样就有多个webservice调用。 有没有办法减少多次通话。 它的工作方式是当第一次加载页面时,服务应该在单个调用中返回所有小部件的聚合数据。 每个服务也应该独立可用,以便可以用于刷新单个小部件和其他系统集成。 注意:小部件

  • 我正在为一个Rails应用程序进行搜索引擎优化。 该网站是完全本地化的,我下面的这篇谷歌文章,以添加hirang备用链接到网页。

  • 如何测试接收并返回

  • 我构建了一个基于文件下载器RMI客户端-服务器的应用程序。在这方面,我不了解不同客户端的工作。文件位于服务器端,RMI在其一侧有等效的代理服务器(称为存根)。我为这个应用程序使用了10个客户端和1个服务器。 问题1-我的问题是当多个客户端在从注册表查找后同时进行RMI调用时,客户端以什么顺序为他们播种/提供文件?意味着底层服务算法---? 我无法理解这一点,当我执行代码时,我只看到相应的文件正在同

  • 我在Scala上使用Play 2.5,我创建了一个类,可以多次调用外部web服务。 外部Web服务在某些条件下被调用并得到ok或nok的简单响应。如果可以,那么我应该更新内部对象状态,如果可以,我现在什么也不做。 这是我的类,它将String的列表作为参数,并返回要在控制器中处理的对象的Future列表。 是列表类型的列表,但我希望它只是一个简单的响应列表。 1)如何简化和纠正我的代码以获得响应列

  • 我在架构“ID”中的链接服务器(宿主服务器)上有一个名为“Application”的表。我正在尝试使用< code>sp_help查看相同的详细信息。 但我做不到。问题是:1.sp_help位于schema中。(很明显!)2.但我的表位于另一个名为的模式中。(例如,从ID.Application中选择*) > 我已经将链接服务器添加到我的本地sql server management studio