我是SpringWebFlux的新手,所以请温柔一点。。。我很抱歉,如果我错过了一些明显的东西,但我试图寻找在线的例子,每次我结束与顺序调用。
我有这样的情况:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class);
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class);
响应是我的项目中的一个类,但是对于这个例子,我们可以将它们视为单个列表的简单容器。
我想:
我如何做到这一点?(因为我失败过多次)
我的第一次尝试是:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
ParallelFlux.from(mono1, mono2, mono3).then().block(); // im not sure if this really execute them in parallel
Response resp1 = mono1.block();
Response resp2 = mono2.block();
Response resp3 = mono3.block();
if (resp1.isNotEmpty()) {
return resp1;
}
if (resp2.isNotEmpty()) {
return resp2;
}
return resp3;
这似乎不起作用,ParallelFlux.from(mon1,mon2,mon3).然后(). block()真的并行运行这些monos吗?为什么我需要ParallelFlux?我不能在创建每个单声道时就说“在单独的线程上运行这个单声道”吗?每个. block()实际上重做调用......就像它在执行单声道一样...为什么?
更新:
通过阅读评论,我将代码更改为:
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Tuple3<Response, Response, Response> all = Mono.zip(mono1, mono2, mono3).block();
Response resp1 = all.getT1();
Response resp2 = all.getT2();
Response resp3 = all.getT3();
if (resp1.hasMessages()) {
return resp1;
}
if (resp2.hasMessages()) {
return resp2;
}
return resp3;
现在它似乎起作用了。我需要做其他事情还是我同意这个解决方案?我是否也应该更改Mono.zip(mon1,mon2,mon3). block()
在Mono.zip(mon1,mon2,mon3)。P. S.我现在再次阅读文档,我认为我应该使用Schedulers.elastic()代替Schedulers.parallel()。
创建单声道不会自动执行它。您需要一个终端操作符,如订阅
或块
来触发执行(订阅
不是终端操作符。除非您想将执行延迟到另一个线程池,否则您不需要它。默认情况下,它使用默认线程池)。如果您希望多个单声道并行运行,您可以使用zip操作符。
Mono<Response> mono1 = webclient1.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono2 = webclient2.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
Mono<Response> mono3 = webclient3.post()...usual_stuffs....bodyToMono(Response.class).subscribeOn(Schedulers.parallel());
return Mono.zip(mono1, mono2, mono3)
.map(t -> {
if (t.getT1().isEmpty()) {
if (t.getT2().isEmpty()) {
return t.getT3();
} else {
return t.getT2();
}
} else {
return t.getT1();
}
});
注意:调用它不会执行并给出结果。它会返回一个mono,您可以在其上调用subscribe()
以获得结果。
我不熟悉Reactor框架,并尝试在现有的实现中使用它。LocationProfileService和InventoryService都返回一个Mono,并并行执行,彼此之间没有依赖关系(来自MainService)。在LocationProfileService中-发出了4个查询,最后2个查询依赖于第一个查询。 有什么更好的方法来写这个?我看到调用按顺序执行,而其中一些应该并行执行。正确的做法是
我面临的情况是,我必须使用2 Mono,其中第二个将依赖于第一个的Id字段,并在第一个Mono的主体中返回第二个的响应。 例如: 然后将结果返回为 我试过了 但像这样,我只能返回第二个单声道的响应。 通过尝试Map或Flatmap,它只在第二个单声道上起作用。 请提出建议。
我最近一直在学习使用Java中的reactor库和Spring框架进行反应式编程,并且在很大程度上我已经能够掌握它。然而,我发现自己有好几次遇到同样的情况,我想知道我哪里出了问题。 我正在努力解决的问题的要点是,我经常想用mono做一些事情,比如找到一些补充数据,然后将其添加回原始mono中。zip函数在我看来是一个理想的候选函数,但最终我订阅了两次原始mono,这不是我的意图。 这里有一个人为的
我有一个类,它从通量源填充地图。我希望能够动态地更新thedata(),但我希望能够返回解析为当前或挂起数据的Mono。 基本上如果数据!=null和no in progress flux,返回Mono。仅返回(数据),否则返回最终将发出数据的单声道。 编辑:这是目前为止我能做的最好的了 但是仍然存在一个问题,在第一次完成updateTheData()之前,
我正在尝试将bash脚本迁移到Python。 bash脚本并行运行多个OS命令,然后在继续之前等待它们完成,即: 命令 我希望使用Python子进程实现同样的目标。这可能吗?如何等待subprocess.call命令完成后再继续?
给定以下monos: 双: 和: 具有相同的输出: 和之间有什么区别,在这种情况下? 从https://projectreactor.io/docs/core/release/reference/index.html#which-operator: [如果你]有一个序列,但[你]对值不感兴趣,并且[你]想在最后切换到另一个单声道,[使用]。 [如果您]希望通过将发布者从1个Mono和任何源端协调到