我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程:
给定类实体:
Entity {
private Map<String, String> items;
public Map<String, String> getItems() {
return items;
}
}
ListenableFuture
目前我的代码是:
handler = this;
Mono
.fromFuture(readEntity())
.doOnError(t -> {
notifyError(“some err-msg” , t);
return;
})
.doOnSuccess(e -> log.info("Got the Entity: " + e))
.flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
.all(handler::processItem)
.consume(handler::doneProcessing);
这东西可以工作,但是
处理程序::Process Item
调用不会在所有项目上并发运行。我尝试在io
和async
SchedulerGroup
和各种参数下使用调度和发布
,但调用仍然在一个线程上串行运行。我做错了什么?
除此之外,我相信总的来说,上述内容可以改进,因此任何建议都将不胜感激。
谢谢
您需要另一个平面图,该平面图分叉并连接每个单独贴图元素的计算:
Mono.fromFuture(readEntity())
.flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
.flatMap(v -> Flux.just(v)
.publishOn(SchedulerGroup.io())
.doOnNext(handler::processItem))
.consume(handler::doneProcessing);
项目Reactor3.1.5。发布 考虑这一点: 我希望订阅服务器在多个线程中运行,但它只在一个线程中运行: 留档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading)。有人能给我解释一下那里发生了什么吗?
因此,我从文档中了解到,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑一下这样的情况。所有这些都将在通过runOn()方法提供的同一个调度程序实例上运行。让我们考虑如下情况: 现在让我们打大约100个电话 如果我们使用parailFlux: 因此,如果我的理解是正确的,那么它似乎非常相似。那么,平行磁通相对于磁通的优势是什么?什么时候
例如,Mono#Transform方法文档表示如下: “目标单声道”是什么意思?
官方文档中没有那么多信息,所以我想我误解了什么,并且错误地使用了函数。但我到底做错了什么? 更新 我发现,如果使用,就可以避免这个特殊的问题。比如: 但真正的问题是,在我的情况下,我需要根据提供的数据返回一个数字(而不是)。我可以在这里创建一个新的,但无论如何,以后我应该。例如,在final中。因此,如果我执行,那么它将失败。 但它无论如何都不起作用,困在: 注意到如果删除步骤,它就可以工作了。在
我试图在Reactor顶部设计一个管道框架。 在每个阶段(不考虑第一个和最后一个阶段),我们都有转换对象的任务(即字符串到其长度或url到其HTML内容等)。举个例子: 您可以看到中间层有3个任务,每个任务将一个X对象转换为一个Y对象(顺便说一句,它始终是一个完全连接的层) 我的问题/困境:我的第一个想法是,我所需要的是通量。merge(),然后将其连接到每个订阅者。例如: 另一种选择是放置处理器
在流量映射函数中,还对流量中的每一项执行。For doOnNext函数也会对流量中的每个项(发出)执行。从用户的角度看有什么不同?为什么存在两种相似的方法?可以用简单的用法来解释--轻松。