我想知道如何对REST或Web服务进行几个并行调用,然后加入响应并将其发送到调用@RestController的响应中。
类似于下面的代码构建与比较未来,但Reactor(通量,单声道)。
CompletableFuture<Company> companyCompletableFuture = CompletableFuture.supplyAsync(() -> {
return Company.find.where().eq("id", id).findUnique();
});
CompletableFuture<List<Domain>> domainsCompletableFuture = CompletableFuture.supplyAsync(() -> {
return Domain.find.where().eq("company_id", id).findList();
});
// wait for all the data
CompletableFuture allDoneFuture = CompletableFuture.allOf(companyCompletableFuture, domainsCompletableFuture);
allDoneFuture.get(); // wait for all done
company = companyCompletableFuture.get();
domain = domainsCompletableFuture.get()
您可以使用subscribeOn在新线程和zip操作符中运行
/**
* SubscribeOn just like in Rx make the pipeline run asynchronously, from the beginning to the end.
* <p>
* In this example we get the three Flux pipelines and we run async all of them.
* Once they are finish we zip the results in the order we establish in the operator.
*/
@Test
public void subscribeOn() throws InterruptedException {
Scheduler scheduler = Schedulers.newElastic("thread");
Scheduler scheduler1 = Schedulers.newElastic("thread");
Scheduler scheduler2 = Schedulers.newElastic("thread");
Flux<String> flux1 = Flux.just("hello ")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler);
Flux<String> flux2 = Flux.just("reactive")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler1);
Flux<String> flux3 = Flux.just(" world")
.doOnNext(value -> System.out.println("Value " + value + " on :" + Thread.currentThread().getName()))
.subscribeOn(scheduler2);
Flux.zip(flux1, flux2, flux3)
.map(tuple3 -> tuple3.getT1().concat(tuple3.getT2()).concat(tuple3.getT3()))
.map(String::toUpperCase)
.subscribe(value -> System.out.println("zip result:" + value));
Thread.sleep(1000);
}
你可以在这里看到更多Reactor的例子https://github.com/politrons/reactive
您可以从callable创建两个Mono,然后将其压缩。如果你想并行地执行callable,你还需要显式地将subscribeOn(Schedulers.parallel())添加到每个Mono
:
Mono<Integer> mono1 = Mono.fromCallable(() -> {
System.out.println(Thread.currentThread().getName());
return 123;
}).subscribeOn(Schedulers.parallel());
Mono<Integer> mono2 = Mono.fromCallable(() -> {
System.out.println(Thread.currentThread().getName());
return 321;
}).subscribeOn(Schedulers.parallel());
Tuple2<Integer, Integer> result = mono1.zipWith(mono2).block();
System.out.println(result.getT1());
System.out.println(result.getT2());
结果如下:
parallel-1
parallel-2
123
321
我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了
我遇到了一个顺序和并行服务调用的主题。对于序列调用,我们可以使用FlatMap/MergeMap;对于并行调用,我们可以使用ForkJoin。 以下是我的问题 1)flatmap和MergeMap的区别是什么?使用这两种方法有什么特别的原因吗。 2)如何为4-5个调用添加多个序列?如果我使用多个订阅,那么flatmap和mergemap有什么区别? 3)forkjoin用于连接多个并行服务调用。如
我是SpringWebFlux的新手。 我有一个不支持批处理调用的上游服务,因此我必须为数组中的每个对象多次调用它。我还需要请求参数的实例,因为服务在响应中不返回这些属性。 例如,这是我将发送给客户的回复 我只有身份证清单
对于,您可以指定订阅时发生的自定义操作。例如
我对Spring的靴子是陌生的,在它们到来的时候学习。我有一个关于并行API调用的快速问题。 我有一个ID数组,我将把它附加到第三方APIendpoint,发出GET请求,聚合数据,并在所有3000个调用完成后从中生成一个文件。 这里的问题是Array的大小为3000,即我预计会进行3000次调用。我觉得使用for循环并迭代超过3000次没有任何意义,而且效率较低。 有谁能给我建议一个最好、最有效
本文向大家介绍C#语言Parallel.Invoke 并行调用,包括了C#语言Parallel.Invoke 并行调用的使用技巧和注意事项,需要的朋友参考一下 示例 并行调用方法或动作(并行区域)