当前位置: 首页 > 知识库问答 >
问题:

有条件地将Mono与Flux结合

芮岳
2023-03-14

我需要结合两个反应性出版商的结果——Mono和Flux。我尝试使用zipjoin函数来实现这一点,但我无法满足两个特定条件:

  1. 结果包含的元素应与通量发射的元素一样多,但相应的Mono源只应调用一次(仅此条件可通过join实现)
  2. 当通量为空时,链应在不等待单一元素的情况下完成

第一个条件的解决方案出现在结合单声道和通量条目中(粘贴在下面)。但是我无法在不阻塞链的情况下实现第二个条件——这是我想避免的。

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
        Mono.just(2).delayElement(Duration.ofMillis(500))).log();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();

List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
    return x + y;
}).collectList().block();

System.out.println(list);

共有1个答案

亢建白
2023-03-14

如果通量为空,则要取消整个操作,可以执行以下操作

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
            Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

//Throw exception if flux is empty
flux = flux.switchIfEmpty(Flux.error(IllegalStateException::new));

List<String> list = flux
        .join(mono, s -> Flux.never() , s -> Flux.never(), (x, y) -> x + y)
        //Catch exception and return nothing
        .onErrorResume(s -> Flux.empty())
        .collectList().block();

    System.out.println(list);

如果你想让Mono完成,但又不想让连接挂起,你可以做如下操作

DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();

Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
    Mono.just(2).delayElement(Duration.ofMillis(500))).log();

//Uncomment below and comment out above statement for empty flux

//Flux<Integer> flux = Flux.empty();

Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();

List<String> list = flux
     .doOnComplete(sink::complete)
     .join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();

System.out.println(list);
 类似资料:
  • 嗨,我有Flux,在迭代每个元素时,它会创建新的单声道。我也有其他单声道之外的通量。并要做到以下几点:当流量(与相应的内部单声道的结束),然后做第二个单声道。最大的挑战是单声道内部的流量从网络客户端请求中产生。作为起点,请看看“加载”方法。基本上没有webClient它的工作,但在情况下与webClient内部地图工作之后。使用Spring启动2 渐变依赖性:

  • 更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如:

  • 我有一个应该向用户发送电子邮件的用例。首先,我创建电子邮件正文。 然后我选择用户并向他们发送电子邮件: 我不喜欢什么 没有cache()方法,emailBody Mono会在每个迭代步骤中进行计算 要获得emailBody值,我使用emailBody。block(),但可能有一种反应方式,而不是在通量流中调用block方法

  • 我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为

  • 我是相当新的反应性编程,我使用去年Spring5 Webflux框架和玩这样的项目Reactor。我正面临一个问题,我想在继承模式中使用Mono: 我怎样才能做到这一点?我是不是做了坏事? 非常感谢

  • 我是反应性编程概念的新手。我正在学习“学习Spring Boot2.0”,所描述的简单概念/示例是可以理解的。但是我不知道如何在更复杂的用例中使用mono/flux。spring boot,mongo和project reactor的一些例子 我的模型