我需要结合两个反应性出版商的结果——Mono和Flux。我尝试使用zip
和join
函数来实现这一点,但我无法满足两个特定条件:
join
实现)第一个条件的解决方案出现在结合单声道和通量条目中(粘贴在下面)。但是我无法在不阻塞链的情况下实现第二个条件——这是我想避免的。
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(50)).log();
List<String> list = flux.join(mono, (v1) -> Flux.never(), (v2) -> Flux.never(), (x, y) -> {
return x + y;
}).collectList().block();
System.out.println(list);
如果通量为空,则要取消整个操作,可以执行以下操作
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
//Uncomment below and comment out above statement for empty flux
//Flux<Integer> flux = Flux.empty();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();
//Throw exception if flux is empty
flux = flux.switchIfEmpty(Flux.error(IllegalStateException::new));
List<String> list = flux
.join(mono, s -> Flux.never() , s -> Flux.never(), (x, y) -> x + y)
//Catch exception and return nothing
.onErrorResume(s -> Flux.empty())
.collectList().block();
System.out.println(list);
如果你想让Mono完成,但又不想让连接挂起,你可以做如下操作
DirectProcessor<Integer> processor = DirectProcessor.create();
//Could omit sink, and use processor::onComplete in place of sink::complete
//But typically recommended as provides better thread safety
FluxSink<Integer> sink = processor.serialize().sink();
Flux<Integer> flux = Flux.concat(Mono.just(1).delayElement(Duration.ofMillis(100)),
Mono.just(2).delayElement(Duration.ofMillis(500))).log();
//Uncomment below and comment out above statement for empty flux
//Flux<Integer> flux = Flux.empty();
Mono<String> mono = Mono.just("a").delayElement(Duration.ofMillis(5000)).log();
List<String> list = flux
.doOnComplete(sink::complete)
.join(mono, s -> processor , s -> processor, (x, y) -> x + y).collectList().block();
System.out.println(list);
嗨,我有Flux,在迭代每个元素时,它会创建新的单声道。我也有其他单声道之外的通量。并要做到以下几点:当流量(与相应的内部单声道的结束),然后做第二个单声道。最大的挑战是单声道内部的流量从网络客户端请求中产生。作为起点,请看看“加载”方法。基本上没有webClient它的工作,但在情况下与webClient内部地图工作之后。使用Spring启动2 渐变依赖性:
更新: 一点我想要实现的内容。我有两个服务--一个通过Http返回me,另一个通过Redis返回。对于这两种情况,我有完全相同的功能-10-15个操作符链,我想要实现的是避免重复代码。 例如:
我有一个应该向用户发送电子邮件的用例。首先,我创建电子邮件正文。 然后我选择用户并向他们发送电子邮件: 我不喜欢什么 没有cache()方法,emailBody Mono会在每个迭代步骤中进行计算 要获得emailBody值,我使用emailBody。block(),但可能有一种反应方式,而不是在通量流中调用block方法
我有这个场景。我有一个分页的API,它给我过去12个月的数据。API的响应是这样的: 现在我必须收集所有的数据,然后计算所有的总和,并返回为
我是相当新的反应性编程,我使用去年Spring5 Webflux框架和玩这样的项目Reactor。我正面临一个问题,我想在继承模式中使用Mono: 我怎样才能做到这一点?我是不是做了坏事? 非常感谢
我是反应性编程概念的新手。我正在学习“学习Spring Boot2.0”,所描述的简单概念/示例是可以理解的。但是我不知道如何在更复杂的用例中使用mono/flux。spring boot,mongo和project reactor的一些例子 我的模型