当前位置: 首页 > 知识库问答 >
问题:

项目Reactor通量的并行处理

籍利
2023-03-14

我是项目Reactor或反应式编程的新手,所以我可能做错了什么。我正在努力构建一个执行以下操作的流程:

给定类实体:

Entity {
    private Map<String, String> items;
    public Map<String, String> getItems() {
        return items;
    }
}
  1. 从DB读取实体(ListenableFuture

目前我的代码是:

handler = this;
Mono
    .fromFuture(readEntity())
    .doOnError(t -> {
        notifyError(“some err-msg” , t);
        return;
    })
    .doOnSuccess(e -> log.info("Got the Entity: " + e))
    .flatMap( e -> Flux.fromIterable(e.getItems().entrySet()))
    .all(handler::processItem)
    .consume(handler::doneProcessing);

这东西可以工作,但是处理程序::Process Item调用不会在所有项目上并发运行。我尝试在ioasyncSchedulerGroup和各种参数下使用调度和发布,但调用仍然在一个线程上串行运行。我做错了什么?

除此之外,我相信总的来说,上述内容可以改进,因此任何建议都将不胜感激。

谢谢


共有1个答案

南门祯
2023-03-14

您需要另一个平面图,该平面图分叉并连接每个单独贴图元素的计算:

Mono.fromFuture(readEntity())
.flatMap(v -> Flux.fromIterable(v.getItems().entrySet()))
.flatMap(v -> Flux.just(v)
                .publishOn(SchedulerGroup.io())
                .doOnNext(handler::processItem))
.consume(handler::doneProcessing);
 类似资料:
  • 项目Reactor3.1.5。发布 考虑这一点: 我希望订阅服务器在多个线程中运行,但它只在一个线程中运行: 留档告诉我的期望是正确的(http://projectreactor.io/docs/core/release/reference/#threading)。有人能给我解释一下那里发生了什么吗?

  • 因此,我从文档中了解到,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑一下这样的情况。所有这些都将在通过runOn()方法提供的同一个调度程序实例上运行。让我们考虑如下情况: 现在让我们打大约100个电话 如果我们使用parailFlux: 因此,如果我的理解是正确的,那么它似乎非常相似。那么,平行磁通相对于磁通的优势是什么?什么时候

  • 例如,Mono#Transform方法文档表示如下: “目标单声道”是什么意思?

  • 官方文档中没有那么多信息,所以我想我误解了什么,并且错误地使用了函数。但我到底做错了什么? 更新 我发现,如果使用,就可以避免这个特殊的问题。比如: 但真正的问题是,在我的情况下,我需要根据提供的数据返回一个数字(而不是)。我可以在这里创建一个新的,但无论如何,以后我应该。例如,在final中。因此,如果我执行,那么它将失败。 但它无论如何都不起作用,困在: 注意到如果删除步骤,它就可以工作了。在

  • 我试图在Reactor顶部设计一个管道框架。 在每个阶段(不考虑第一个和最后一个阶段),我们都有转换对象的任务(即字符串到其长度或url到其HTML内容等)。举个例子: 您可以看到中间层有3个任务,每个任务将一个X对象转换为一个Y对象(顺便说一句,它始终是一个完全连接的层) 我的问题/困境:我的第一个想法是,我所需要的是通量。merge(),然后将其连接到每个订阅者。例如: 另一种选择是放置处理器

  • 在流量映射函数中,还对流量中的每一项执行。For doOnNext函数也会对流量中的每个项(发出)执行。从用户的角度看有什么不同?为什么存在两种相似的方法?可以用简单的用法来解释--轻松。