当前位置: 首页 > 知识库问答 >
问题:

一旦创建了三个单声道,就并行执行它们,等待所有单声道完成,并以特定的顺序/逻辑收集结果

经慈
2023-03-14

我是SpringWebFlux的新手,所以请温柔一点。。。我很抱歉,如果我错过了一些明显的东西,但我试图寻找在线的例子,每次我结束与顺序调用。

我有这样的情况:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);

响应是我的项目中的一个类,但是对于这个例子,我们可以将它们视为单个列表的简单容器。

我想:

  • 并行执行它们(一旦我将它们分配给单数1/单数2/单数3,通过调用。
  • 当一切都完成后,保存响应,以呼吸1,呼吸2,呼吸3
  • 如果res1有结果(列表不是空的)返回res1...
  • ...如果res2有结果(列表不是空的)返回res2...
  • 返回res3(即使是空的)

我如何做到这一点?(因为我失败过多次)

我的第一次尝试是:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
    return resp1;
}
if (resp2.isNotEmpty()) {
    return resp2;
}
return resp3;

这似乎不起作用,ParallelFlux.from(mon1,mon2,mon3).然后(). block()真的并行运行这些monos吗?为什么我需要ParallelFlux?我不能在创建每个单声道时就说“在单独的线程上运行这个单声道”吗?每个. block()实际上重做调用......就像它在执行单声道一样...为什么?

更新:

通过阅读评论,我将代码更改为:

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();

Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();

if (resp1.hasMessages()) {
    return resp1;
}
if (resp2.hasMessages()) {
    return resp2;
}
return resp3;

现在它似乎起作用了。我需要做其他事情还是我同意这个解决方案?我是否也应该更改Mono.zip(mon1,mon2,mon3). block()Mono.zip(mon1,mon2,mon3)。P. S.我现在再次阅读文档,我认为我应该使用Schedulers.elastic()代替Schedulers.parallel()。


共有1个答案

羊和光
2023-03-14

创建单声道不会自动执行它。您需要一个终端操作符,如订阅来触发执行(订阅不是终端操作符。除非您想将执行延迟到另一个线程池,否则您不需要它。默认情况下,它使用默认线程池)。如果您希望多个单声道并行运行,您可以使用zip操作符。

Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());

return Mono.zip(mono1, mono2, mono3)
        .map(t -> {
            if (t.getT1().isEmpty()) {
                if (t.getT2().isEmpty()) {
                    return t.getT3();
                } else {
                    return t.getT2();
                }
            } else {
                return t.getT1();
            }
        });

注意:调用它不会执行并给出结果。它会返回一个mono,您可以在其上调用subscribe()以获得结果。

 类似资料:
  • 我不熟悉Reactor框架,并尝试在现有的实现中使用它。LocationProfileService和InventoryService都返回一个Mono,并并行执行,彼此之间没有依赖关系(来自MainService)。在LocationProfileService中-发出了4个查询,最后2个查询依赖于第一个查询。 有什么更好的方法来写这个?我看到调用按顺序执行,而其中一些应该并行执行。正确的做法是

  • 我面临的情况是,我必须使用2 Mono,其中第二个将依赖于第一个的Id字段,并在第一个Mono的主体中返回第二个的响应。 例如: 然后将结果返回为 我试过了 但像这样,我只能返回第二个单声道的响应。 通过尝试Map或Flatmap,它只在第二个单声道上起作用。 请提出建议。

  • 我最近一直在学习使用Java中的reactor库和Spring框架进行反应式编程,并且在很大程度上我已经能够掌握它。然而,我发现自己有好几次遇到同样的情况,我想知道我哪里出了问题。 我正在努力解决的问题的要点是,我经常想用mono做一些事情,比如找到一些补充数据,然后将其添加回原始mono中。zip函数在我看来是一个理想的候选函数,但最终我订阅了两次原始mono,这不是我的意图。 这里有一个人为的

  • 我有一个类,它从通量源填充地图。我希望能够动态地更新thedata(),但我希望能够返回解析为当前或挂起数据的Mono。 基本上如果数据!=null和no in progress flux,返回Mono。仅返回(数据),否则返回最终将发出数据的单声道。 编辑:这是目前为止我能做的最好的了 但是仍然存在一个问题,在第一次完成updateTheData()之前,

  • 我正在尝试将bash脚本迁移到Python。 bash脚本并行运行多个OS命令,然后在继续之前等待它们完成,即: 命令 我希望使用Python子进程实现同样的目标。这可能吗?如何等待subprocess.call命令完成后再继续?

  • 给定以下monos: 双: 和: 具有相同的输出: 和之间有什么区别,在这种情况下? 从https://projectreactor.io/docs/core/release/reference/index.html#which-operator: [如果你]有一个序列,但[你]对值不感兴趣,并且[你]想在最后切换到另一个单声道,[使用]。 [如果您]希望通过将发布者从1个Mono和任何源端协调到