我有下面的反应代码使用通量在反应堆核心:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
正如您所看到的,对于我的流程的外部源(FluxSink.OverflowStrategy.Latest),我对此进行了背压处理。但是,我还想为我的进程配置redis(redishashreactiveCommands.hmset(key,map))的背压,因为它对我的进程来说可能是一个比外部源更大的瓶颈。我希望我需要为redis部分创建另一个flux并将它与这个flux链接起来,但是我如何实现这一点,因为.FlatMap只工作于单个项而不是项流?
同样,我也想存储相同的发射项目到Kafka,但链接FlapMap的似乎不起作用…有没有一种简单的方法将所有这些链接在一组函数调用中(外部源->我的流程,我的流程->redis,我的流程->kafka)?
如果您对主序列中的结果对象不感兴趣,可以在FlatMap
中合并两个保存。您还必须在flatMap中移动订阅和日志,以便将它们放在内部保存发布服务器上:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),
kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
或者,如果您确信两个进程都发出一个结果元素或一个错误,您可以通过将when
替换为zip
来将这两个结果合并到一个tuple2
中。
问题内容: 我正在将用RxJava 1.x编写的小项目转换为Reactor3.x。一切都很好,除了我找不到如何用合适的替代品代替。我有,我需要将其转换为。 问题答案: 在Reactor 3中,根据原始来源(数组,可迭代等),运算符被专门化为几个变体。 在您的情况下使用。
问题内容: 我有一个MonoA。对象A包含两个列表。我想直接创建两个助焊剂。没有block(),这可能吗? 问题答案: 使用Mono.flatMapMany()方法:
问题内容: 我正在为spring-boot应用程序使用自定义类 BodyExtractor.java 上面的代码适用于较小的有效负载,但不适用于较大的有效负载,我认为这是因为我仅使用读取单个通量值,并且不确定如何组合和读取全部。 我是电抗器的新手,所以我对通量/单声道不了解很多技巧。 问题答案: 我能够通过使用和使它工作
我试图实现类似于Akka Streams statefulMapConcat的东西...基本上,我有一个像这样的分数通量: 我想采取这些和排放一个聚合每天: 所以我有一个聚合器,它保留了我在处理前设置的一些内部状态,我想在返回单声道的聚合器上平面图。如果日期发生变化,聚合器只会发出一个带有值的Mono,所以你每天只能得到一个。 所以我的问题是。。。当完成时,如何发出最后一个元素?聚合器将有一些最后
webflux包中发生了有趣的事情。然而,我在源头的旅程并没有解决以下问题。 假设我有以下单声道(或通量): 我在webfilter中使用类似的构造,用租户和用户数据丰富管道。然后在控制器中使用如下构造: hello mono的上下文填充在world mono中。我试图弄清楚这是如何做到的,也是为了单元测试的目的。 最后,这仍然是一个谜。我试图用单声道/通量对象上可用的常规方法来做到这一点,但是我