我试图实现类似于Akka Streams statefulMapConcat的东西...基本上,我有一个像这样的分数通量:
分数(LocalDate,整数分数)
我想采取这些和排放一个聚合每天:
ScoreAggregate(LocalDate日期、整数scoreCount、整数totalScore)
所以我有一个聚合器,它保留了我在处理前设置的一些内部状态,我想在返回单声道的聚合器上平面图。如果日期发生变化,聚合器只会发出一个带有值的Mono,所以你每天只能得到一个。
ScoreAggregator aggregator = ...
Flux<Score> scoreFlux = ...
scoreFlux.flatMap(aggregator::addScore)
所以我的问题是。。。当scoreFlux
完成时,如何发出最后一个元素?聚合器将有一些最后一天的数据尚未发布,我需要发送这些数据。
回应评论作为回答,这样就不会显示为未回答:
所以我的问题是。。。当scoreFlux完成时,如何发出最后一个元素?
您只需使用concatWith()
即可在原始流量完成后连接所需的发布服务器。如果您只希望在原始发布者完成时对其进行评估,请确保将其包装在Mono中。延迟()
,这将阻止先发制人的执行。
问题内容: 我有一个MonoA。对象A包含两个列表。我想直接创建两个助焊剂。没有block(),这可能吗? 问题答案: 使用Mono.flatMapMany()方法:
问题内容: 我正在将用RxJava 1.x编写的小项目转换为Reactor3.x。一切都很好,除了我找不到如何用合适的替代品代替。我有,我需要将其转换为。 问题答案: 在Reactor 3中,根据原始来源(数组,可迭代等),运算符被专门化为几个变体。 在您的情况下使用。
我有下面的反应代码使用通量在反应堆核心: 正如您所看到的,对于我的流程的外部源(FluxSink.OverflowStrategy.Latest),我对此进行了背压处理。但是,我还想为我的进程配置redis(redishashreactiveCommands.hmset(key,map))的背压,因为它对我的进程来说可能是一个比外部源更大的瓶颈。我希望我需要为redis部分创建另一个flux并将它
问题内容: 我正在为spring-boot应用程序使用自定义类 BodyExtractor.java 上面的代码适用于较小的有效负载,但不适用于较大的有效负载,我认为这是因为我仅使用读取单个通量值,并且不确定如何组合和读取全部。 我是电抗器的新手,所以我对通量/单声道不了解很多技巧。 问题答案: 我能够通过使用和使它工作
我正在学习jboss Weld Event教程中的Weld Event,我想写一个观察事件并在事件被激发时打印helloword的示例。 这是我的代码: 它不起作用,给出以下异常信息: 容器中似乎没有可以初始化的bean 那么我该怎么做才能使它运行,我的beans.xml是空的 也许我应该在beans.xml中做些什么? 或者我应该编写一个实现事件接口的Java类? 任何内容都将适用。