当前位置: 首页 > 知识库问答 >
问题:

反应式编程:SpringWebFlux:如何构建微服务调用链?

仲孙思源
2023-03-14

Spring Boot应用程序:

@RestController接收以下有效负载:

{
  "cartoon": "The Little Mermaid",
  "characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}

我需要按以下方式进行处理:

>

使用转换后的数据向“卡通数据库”微服务发送HTTP POST请求。

我遇到的问题是:

我需要使用Spring-WebFluxMono|Flux)和Spring-Reactive-WebClient)的反应式编程(非阻塞\异步处理)范例来实现所有这些步骤,但我对该堆栈没有任何经验,我尽可能多地阅读它,加上谷歌搜索的次数很多,但仍然有一大堆未回答的问题,例如:

Q1.我已经配置了反应性webClient,它向“卡通人物”微服务发送请求:

      public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
    return WebClient.builder().baseUrl("http://cartoon-characters").build()
        .get()
        .uri("/character/{characterName}", characterName)
        .retrieve()
        .bodyToMono(Integer.class);
  }

正如你可能看到的,我有一个列表的卡通人物的名字和他们中的每一个我需要调用getCartoon特性IdbyName(字符串名称)方法,我不确定正确的选项来调用它串联,相信正确的选项:并行执行。

写了以下方法:

  public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
    .flatMap(this::getCartoonCharacterIdbyName);

return StreamSupport.stream(flux.toIterable().spliterator(), false)
    .collect(Collectors.toList());

}

但我怀疑,这段代码是否并行WebClient执行,以及代码调用flux。toIterable()阻塞线程,因此在这个实现中,我丢失了非阻塞机制。

我的假设正确吗?

我需要如何将其重写为具有并行性和非阻塞性?

Q2.在技术上是否可以将控制器接收的输入数据(我的意思是用id替换名称)以反应性方式转换:当我们使用Flux操作时

第三季度。是否有可能不仅获取转换后的数据对象,而且获取Mono


共有1个答案

邬宏扬
2023-03-14

实际上,这是一个很好的问题,因为当涉及到链接微服务时,理解WebFlux或项目反应器框架需要几个步骤。

第一个是认识到WebClient应该接收发布者并返回发布者。将其外推到4种不同的方法特征,以帮助思考。

  • 单声道-

当然,在所有情况下,它只是Publisher-

在您的示例中,您希望转到

  • (单声道)-

正如你所说,你想要

  • 单声道-

第二部分是了解链接流。你可以

  • p3(p2(p1(对象))

这将链p1-

  • o2=p1(对象)
  • o3=p2(o2)
  • 结果=p3(o3)

这段代码更易于阅读和维护,并且随着一些成熟度的提高,您逐渐理解了这段语句的价值。

我对您的示例的唯一问题是执行通量

@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    Map<Integer, CartoonCharacter> characters;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
        characters = Arrays.asList( new CartoonCharacter[] {
                new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
                new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
                new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
                new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
        )
        .stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
        // TODO Auto-generated method stub
        CartoonRequest cr = CartoonRequest.builder()
        .cartoon("The Little Mermaid")
        .characterNames(Arrays.asList(names))
        .build();
        thisLocalClient
            .post()
            .uri("cartoonDetails")
            .body(Mono.just(cr), CartoonRequest.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class)
            .subscribe(System.out::println);
    }

    @Bean
    WebClient localClient() {
        return WebClient.create("http://localhost:8080/demo/");
    }

    @Autowired
    WebClient thisLocalClient;

    @PostMapping("cartoonDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
        Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
        Flux<Integer> ids = mapNamesToIds(fn);
        Flux<CartoonCharacter> details = mapIdsToDetails(ids);
        return details;
    }
    //  Service Layer Methods
    private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
        return thisLocalClient
            .post()
            .uri("findIds")
            .body(names, StringWrapper.class)
            .retrieve()
            .bodyToFlux(Integer.class);
    }
    private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
        return thisLocalClient
            .post()
            .uri("findDetails")
            .body(ids, Integer.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class);
    }
    // Services
    @PostMapping("findIds")
    Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
        return names.map(name->name.getString().hashCode());
    }
    @PostMapping("findDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
        return ids.map(characters::get);
    }
}

也:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
    private String string;
}
@Data
@Builder
public class CartoonRequest {
    private String cartoon;
    private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
    Integer id;
    String name;
    String species;
}

 类似资料:
  • 我正在努力实现以下目标: 我有两种方法——1。公共方法1 2。公共方法2 你能帮助: 问题是,只有在使用method1.subscribe()时,控件才会转到方法1。我无法返回值或抛出异常基于状态字段从方法1返回。 当method1.map()或method1.filter()时,控件甚至不会转到方法1 当方法1返回Mono.empty(),那么控件不会切换到If清空(方法2)

  • 导航异步、非阻塞和反应性是一项非常有用的工作。。。给定2个非阻塞、无功、垂直。基于x/quarkus的微服务A和B,其中约束条件是A必须通过http与B通信。如果我想让服务保持被动(非阻塞): 我应该使用Vertex web客户端吗?文档说明它是一个异步客户机,但我假设它是vert。基于x的它也是非阻塞的吗?(我在这里区分异步和非阻塞) 任何帮助都会很好。谢谢

  • 我想在另一个Flux流中间调用Mono,向mono发送Flux参数。我正在使用SpringBoot的WebClient。 我试过这个: 但是,如何将第一个API调用的返回发送给param2?然后得到双方的回应?第一个API返回许多值,对于每个值,我需要调用第二个API。 谢谢

  • 我面临的问题是,有一个服务,我必须调用,这是一个传统的Spring启动应用程序,而不是反应性的! 下面是一个示例endpoint,它接近上述遗留系统的想法: 我知道我不能用这个来实现真正的反应性善,有没有一个快乐的非阻塞和阻塞的媒介我可以在这里实现? 谢谢

  • 我有一些微服务,应该可以在WebFlux框架上运行。每台服务器都有自己的带有Mono或Flux的API。我们使用的是Spring支持的MongoDB(Spring数据MongoDB)。 问题是外部阻塞API,我必须在我的系统中使用它。 我有一个解决办法。我可以在专用线程池中封装阻塞API调用,并在CompletableFuture中使用它。 还有别的办法解决我的问题吗?我想,那个全新的Rsocke

  • 注意:前端不能制作编排,因为它是一个封闭的产品,我们不能接触它。 提前谢了。