当前位置: 首页 > 知识库问答 >
问题:

在另一个通量结束后执行并行通量

宗政卓
2023-03-14

顺便说一句,我还在学习WebLUX;我不知道这是不是可能的,或者我有错误的方法,但考虑到这种平行的通量。

Flux<String> enablers = Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential();

调用具有webclient请求的方法(service.getamono)

webClient.post()
              .uri(url)
              .headers(headers -> headers.addAll(httpHeaders))
              .body(BodyInserters.fromObject(request))
              .retrieve()
              .bodyToMono(entity2.class);

我需要等待enablers Flux的流结束,并处理它内部的所有响应,原因是如果其中一个给我错误或否定的响应,我不会运行这个其他并行Flux的阻塞器

Flux<String> blockers = Flux.fromIterable(blockersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.callAMono(string, entity, element))
                .sequential();

我想过“zip”方法,但这一个合并了两个响应,并不是我想要的,如果任何人可以帮助我这样做。

更新

enablers. //handle enablers response and if error return a custom Mono<response> with .reduce

如果使能器的句柄没有错误,则继续到.thenmany和其他flux

共有1个答案

樊博雅
2023-03-14

我在第一个flux中找到了条件any来实现它的方法,如下所示

Flux.fromIterable(enablersList)
                .parallel()
                .runOn(Schedulers.elastic())
                .flatMap(element -> service.getAMono(string, entity, element))
                .sequential()
                .any(element -> *stuff here)//condition
                .flatMap(condition->{
                        if(condition.equals(Boolean.FALSE)){
                           return Flux.fromIterable(blockersList)
                                                   .parallel()
                                                   .runOn(Schedulers.elastic())
                                                   .flatMap(element -> service.callAMono(string, entity, element))
                                                   .sequential()
                                                   .reduce(**stuff here)// handle noError response and return;
                          }
                          return Mono.just(**stuff here);//handle error response and return
                 });

如果有其他的方法来做这件事,我会很高兴你把它贴在这里谢谢,:D

 类似资料:
  • 我有一个LegacyAcCountDto,我需要从两个不同的来源建立一个列表。一个是本地JPA存储库,另一个是Web服务调用。Web服务版本具有JPA数据源不可用的帐户状态。我需要并行执行两个调用,当它们都完成时,我需要找到Web服务列表的legacyId,并用从Web服务中提取的帐户状态填充列表。整个想法是返回一个包含完整DTO的列表。我不需要把它保存回网络服务或JPA回购 DTO: merge

  • 我在同一个drl文件中有两个Drools规则,如下所示: 我的想法是用第一条规则处理所有处于临界状态的事件。然后使用第二个规则,如果任何阀门有警报,这是由“如果临界”规则设置的,发送一个短信。 你知道吗?用口水可能吗?

  • 问题内容: 我有以下方法: 在这里,我依次调用三种方法,这依次命中数据库并获取我的结果,然后对从数据库命中获得的结果进行后处理。我知道如何通过使用并发调用这三种方法。但是我想用Java 8 来实现。有人可以指导我如何通过并行流实现相同目标吗? 编辑 我只想通过Stream并行调用方法。 问题答案: 您可以利用这种方式:

  • 问题内容: 我有一个http服务器(使用启动),我想做一些操作。 我该怎么做(在Linux上)?在ctrl-C的情况下可以进行那些操作吗? 我不熟悉Unix信号,因此答案可能很简单。 问题答案: 您可以使用信号包订购TERM和INT信号。但是请注意,只有在明确终止进程时才发送这些信号。正常退出(由流程本身启动)不涉及任何信号。我认为,对于正常退出,只需在主例程中执行某些操作即可(该例程应该生成工作

  • 我试图在多个android设备的chrome浏览器上运行我的测试。我使用的量角器与鸦片。我启动2 appium服务器具有不同的端口,即。,和,并指定具有2个设备信息的多功能。我尝试了两个,模拟器和真实的设备。当我运行配置文件时,两个Appium服务器都试图访问一个设备,即使我指定了设备的UDID。我对量角器非常陌生。有人能帮我吗?下面是我的配置文件 config.js

  • 我正在尝试用Mono中的值填充Flux中的对象。当我尝试这样做时,它只是忽略了我的“设置”操作。我假设这是因为Flux正在并行工作,而Mono没有。我该如何解决这个问题? 以下是一些日志 如您所见,我正在尝试将国家代码设置为代理。