当前位置: 首页 > 知识库问答 >
问题:

SpringReactorWebflux调度器并行性

萧无尘
2023-03-14

对于完全非阻塞的端到端反应调用,是否建议显式调用publishOn或subscribeOn来切换调度器?对于消耗cpu或不消耗cpu的任务,总是使用并行流量来优化性能是否有利?

共有1个答案

杜志
2023-03-14

对于完全非阻塞的端到端反应调用,是否建议显式调用publishOn或subscribeOn来切换调度器?

向下游发布数据时使用publishon,而从上游使用数据时使用subscribeon。所以这真的取决于你想做什么样的工作。

对于消耗cpu或不消耗cpu的任务,总是使用并行流量来优化性能是否有利?

Flux.range(1, 10)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .sequential()
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

上面的代码完全是浪费,因为i几乎会立即被处理。下面的代码将比上面的代码执行得更好:

Flux.range(1, 10)
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

现在考虑一下:

public static <T> T someMethodThatBlocks(T i, int ms) {
    try { Thread.sleep( ms ); }
    catch (InterruptedException e) {}
    return i;
}

// some method here
Flux.range(1, 10)
        .parallel(4)
        .runOn(Schedulers.parallel())
        .map(i -> someMethodThatBlocks(i, 200))
        .sequential()
        .elapsed()
        .subscribe(i -> System.out.printf(" %s ", i));

输出类似于:

 [210,3]  [5,1]  [0,2]  [0,4]  [196,6]  [0,8]  [0,5]  [4,7]  [196,10]  [0,9] 
 类似资料:
  • 我在学习Reactor,我想知道如何实现某种行为。假设我有一个传入消息流。每条消息都与某个实体关联,并包含一些数据。 与不同实体相关的消息可以并行处理。但是,与任何单个实体相关的消息必须一次处理一条,即在实体“abc”的消息1处理完成之前,无法开始处理实体“abc”的消息2。在处理消息的过程中,应该缓冲该实体的进一步消息。其他实体的消息可以不受阻碍地进行。可以将其视为每个实体的线程上都有这样的代码

  • Storm 现在有 4 种内置的 schedulers(调度器): DefaultScheduler, IsolationScheduler, MultitenantScheduler, ResourceAwareScheduler. Pluggable scheduler(可插拔的调度器) 你可以实现你自己的 scheduler(调度器)来替换掉默认的 scheduler(调度器),自定义分配e

  • 调度器提供了同步递增策略变化的方法。 它应以手工艺等一致性算法为基础,以确保所有执行者的一致性和一致性。 通过调度器用户们可以轻松地建立分布式集群。 调度器的方法分为两部分。 第一种是与Casbin相结合的方法。 这些方法应该在Casbin内部调用。 用户们可以使用由Casbin本身提供的更完整的api。 另一个部分是调度器本身定义的方法,包括调度器初始化方法, 和不同算法提供的不同函数,如动态资

  • 注:本节未经校验,如有问题欢迎提issue 有时需要设定将来发生的事情,这时该怎么办? ActorSystem 就能搞定一切! 在那儿你能找到 scheduler 方法,它返回一个 akka.actor.Scheduler 实例, 这个实例在每个Actor系统里是唯一的,用来在内部指定一段时间后发生的行为。 请注意定时任务是使用 ActorSystem 的 MessageDispatcher 执行

  • Akka MessageDispatcher是维持 Akka Actor “运作”的部分, 可以说它是整个机器的引擎。所有的MessageDispatcher实现也同时也是一个ExecutionContext,这意味着它们可以用来执行任何代码,例如Future(Scala)。 缺省派发器 在没有为Actor作配置的情况下,每一个ActorSystem将有一个缺省的派发器。该缺省派发器可以被配置,默

  • 调度并执行内核线程 initproc 在uCore执行完proc_init函数后,就创建好了两个内核线程:idleproc和initproc,这时uCore当前的执行现场就是idleproc,等到执行到init函数的最后一个函数cpu_idle之前,uCore的所有初始化工作就结束了,idleproc将通过执行cpu_idle函数让出CPU,给其它内核线程执行,具体过程如下: void cpu_i