Flux.fromIterable(actions)
.parallel()
.runOn(Schedulars.elastic())
.flatMap(request -> someRemoteCall)
.sequential()
.subscribe();
我通常使用三种可能的方法来实现这一点:
flatmap()
的3参数版本,第二个参数是mapperonerror
-例如。.FlatMap(request->someRemoteCall(),x->mono.empty(),null)
;onerrorresume(x->mono.empty())
作为单独调用忽略任何错误;.onerrorResume(myexception.class,x->mono.empty()))
忽略某种类型的错误。第二个是我倾向于默认使用的,因为我发现这一点最清楚。
我用的是Spring助焊剂。我需要从不同的来源组装一个物体。如何确保两个流都返回了所需的数据? 比如:
我本可以使用而不是第二个,因为它不映射任何东西,但我不确定这是否是peek方法的可接受用法。 我也可以在第二个中使用一个有状态映射器来只运行一次,而不是用索引压缩,我想这是可以接受的,因为我已经使用了一个有状态谓词...
因此,我从文档中了解到,并行通量本质上是将通量元素划分为单独的轨道。(本质上类似于分组)。就线程而言,这将是调度程序的工作。让我们考虑一下这样的情况。所有这些都将在通过runOn()方法提供的同一个调度程序实例上运行。让我们考虑如下情况: 现在让我们打大约100个电话 如果我们使用parailFlux: 因此,如果我的理解是正确的,那么它似乎非常相似。那么,平行磁通相对于磁通的优势是什么?什么时候
我正在进行一个新的反应项目,其中有很多文件处理IO正在进行。如果我以命令式阻塞的方式编写IO代码,然后将它们包装成一个单声道,在boundedElastic调度器上发布它们,这是否足够?boundedElastic池大小会限制并发操作的数量吗? 如果这不是正确的方法,你能展示一个如何使用Reactor将字节写入文件的例子吗?
添加行为(副作用)在流量终止后触发,要么成功完成下游,要么出现错误。 这里有一个简单的例子来再现我面临的问题: 标志简单地说明了在和调用之间的代码可能引发错误。数字1到4表示为了本例的目的,我尝试插入调用的位置,。 null 有人能解释一下上面的事情吗?这是图书馆的故意行为吗? 注意,我使用的是Reactor堆芯3.3.8。
正如您所看到的,在平面映射之后,我应该得到从到的连续数字的有序流。我拆分了一次拆分器,所以它应该跳到某个中间位置。接下来,我从它中消耗一个元素,并再拆分一次。之后,我打印所有剩余的元素。我希望我将有几个连续的元素从流尾(可能零元素,也会很好)。然而,我得到的是和,然后突然跳转到。 我知道目前在JDK中拆分器不是这样使用的:它们总是在遍历之前拆分。但是,官方文档并没有明确禁止在之后调用。 当我使用直