当前位置: 首页 > 知识库问答 >
问题:

用Project Reactor并行调用rest o web服务?

充煌
2023-03-14

我想知道如何对REST或Web服务进行几个并行调用,然后加入响应并将其发送到调用@RestController的响应中。

类似于下面的代码构建与比较未来,但Reactor(通量,单声道)。

CompletableFuture<Company> companyCompletableFuture = CompletableFuture.supplyAsync(() -> {
     return  Company.find.where().eq("id", id).findUnique();  
});

CompletableFuture<List<Domain>> domainsCompletableFuture = CompletableFuture.supplyAsync(() -> {
     return Domain.find.where().eq("company_id", id).findList();
});

// wait for all the data
CompletableFuture allDoneFuture = CompletableFuture.allOf(companyCompletableFuture, domainsCompletableFuture);

allDoneFuture.get(); // wait for all done
company = companyCompletableFuture.get();
domain = domainsCompletableFuture.get()

共有2个答案

爱繁
2023-03-14

您可以使用subscribeOn在新线程和zip操作符中运行

 /**
     * SubscribeOn just like in Rx make the pipeline run asynchronously, from the beginning to the end.
     * <p>
     * In this example we get the three Flux pipelines and we run async all of them.
     * Once they are finish we zip the results in the order we establish in the operator.
     */
    @Test
    public void subscribeOn() throws InterruptedException {
        Scheduler scheduler = Schedulers.newElastic("thread");
        Scheduler scheduler1 = Schedulers.newElastic("thread");
        Scheduler scheduler2 = Schedulers.newElastic("thread");

        Flux<String> flux1 = Flux.just("hello ")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler);
        Flux<String> flux2 = Flux.just("reactive")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler1);
        Flux<String> flux3 = Flux.just(" world")
                .doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
                .subscribeOn(scheduler2);
        Flux.zip(flux1, flux2, flux3)
                .map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3()))
                .map(String::toUpperCase)
                .subscribe(value -> System.out.println("zip result:" + value));
        Thread.sleep(1000);

    }

你可以在这里看到更多Reactor的例子https://github.com/politrons/reactive

郭辉
2023-03-14

您可以从callable创建两个Mono,然后将其压缩。如果你想并行地执行callable,你还需要显式地将subscribeOn(Schedulers.parallel())添加到每个Mono

Mono<Integer> mono1 = Mono.fromCallable(() -> {
    System.out.println(Thread.currentThread().getName());
    return 123;
}).subscribeOn(Schedulers.parallel());

Mono<Integer> mono2 = Mono.fromCallable(() -> {
    System.out.println(Thread.currentThread().getName());
    return 321;
}).subscribeOn(Schedulers.parallel());

Tuple2<Integer, Integer> result = mono1.zipWith(mono2).block();

System.out.println(result.getT1());
System.out.println(result.getT2());

结果如下:

parallel-1
parallel-2
123
321
 类似资料:
  • 我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了

  • 我遇到了一个顺序和并行服务调用的主题。对于序列调用,我们可以使用FlatMap/MergeMap;对于并行调用,我们可以使用ForkJoin。 以下是我的问题 1)flatmap和MergeMap的区别是什么?使用这两种方法有什么特别的原因吗。 2)如何为4-5个调用添加多个序列?如果我使用多个订阅,那么flatmap和mergemap有什么区别? 3)forkjoin用于连接多个并行服务调用。如

  • 我是SpringWebFlux的新手。 我有一个不支持批处理调用的上游服务,因此我必须为数组中的每个对象多次调用它。我还需要请求参数的实例,因为服务在响应中不返回这些属性。 例如,这是我将发送给客户的回复 我只有身份证清单

  • 对于,您可以指定订阅时发生的自定义操作。例如

  • 我对Spring的靴子是陌生的,在它们到来的时候学习。我有一个关于并行API调用的快速问题。 我有一个ID数组,我将把它附加到第三方APIendpoint,发出GET请求,聚合数据,并在所有3000个调用完成后从中生成一个文件。 这里的问题是Array的大小为3000,即我预计会进行3000次调用。我觉得使用for循环并迭代超过3000次没有任何意义,而且效率较低。 有谁能给我建议一个最好、最有效

  • 本文向大家介绍C#语言Parallel.Invoke 并行调用,包括了C#语言Parallel.Invoke 并行调用的使用技巧和注意事项,需要的朋友参考一下 示例 并行调用方法或动作(并行区域)