当前位置: 首页 > 知识库问答 >
问题:

RxJava调度器-线程行为和饥饿?

楚丰羽
2023-03-14

我对observeon()subscribeon()使用了RXJava。我知道他们不是在一条流上并行化排放。换句话说,单个排放流只会放在一个线程上,对吗?我下面的测试似乎表明了这一点。我的理解是,您还必须flatmap()调度器,如.flatmap(v->observable.just(v).subscribeon(Schedulers.Computation()),以在单个流上并行化发射。

同样,如果是这种情况,那么调度器会发生线程饥饿吗?如果我的计算调度程序有5个线程,但我有超过5个长时间运行的异步流正在处理,有没有可能出现饥饿?或者这不太可能仅仅是因为RXJava的性质?

public class Test {
    public static void main(String[] args) {


        Observable<String> airports = Observable.just("ABQ", "HOU", 
            "PHX", "DAL", "DFW", "AUS","SAN","LAX","JFK");


        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub1 " + s +
                " " + Thread.currentThread().getName()));

        airports.subscribeOn(Schedulers.io()).map(Test::stall)
        .subscribe(s -> System.out.println("Sub2 " + s +
                " " + Thread.currentThread().getName()));

        sleep();
    }

    private static String stall(String str) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return str;
    }

    private static void sleep() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

共有1个答案

荀金鹏
2023-03-14

使用flatmap时,可能一个异步源被其他异步源淹没,无法在自己的源上取得进展。然而,在实践中,我没有看到这种情况发生,这是由于OS和JVM的打嗝给了足够的喘息空间,也是因为flatmap本身的背压和仲裁。如果您担心这种情况太多,可以使用maxconcurrent参数和flatmap重载,并限制并发订阅的数量。

RxJava主要是以非阻塞的方式编写的,因此当需要合并或组合源代码时,它们不会真正等待对方。

计算调度程序是一个单线程执行程序池,并以循环方式分配给调用方。不知道标准执行者的公平性。

 类似资料:
  • 我需要帮助设计基于多线程的应用程序,包括动态url创建和线程处理。 我在我的应用程序中使用了一个Spring调度器,它每30秒调度一次。从这个调度方法中,我调用了一些基于服务的api,它在循环中,而且我需要每个API有一个线程池执行器,上面有一个线程处理。 由于这个过程是从计划方法开始的,所以每次创建新的线程池时,这就是问题所在。你可以在代码中看到。 我想要的是,如果对于任何一个应用编程接口,如果

  • 我必须根据传入的请求写入文件。由于多个请求可能同时出现,我不希望多个线程试图一起覆盖文件内容,这可能会导致丢失一些数据。 因此,我尝试使用实例变量PublishSubject收集所有请求的数据。我在初始化期间订阅了publishSubject,此订阅将在应用程序的整个生命周期内保持不变。此外,我还在一个单独的线程(由Vertx事件循环提供)上观察到相同的实例,该线程调用负责编写文件的方法。 稍后在

  • 调度器在多线程环境中用于与 Observable 操作符一起工作。 根据Reactive,Scheduler 用于调度运算符链如何应用于不同的线程。 默认情况下,一个 Observable 和你应用到它的操作符链将在调用它的 Subscribe 方法的同一个线程上完成它的工作,并通知它的观察者。SubscribeOn 运算符通过指定 Observable 应在其上运行的不同调度程序来更改此行为。O

  • 在RxJava中,有5种不同的调度程序可供选择: > immediate():创建并返回在当前线程上立即执行工作的调度程序。 trampoline():创建并返回一个调度程序,该调度程序将当前线程上的工作排队,以便在当前工作完成后执行。 newThread():创建并返回一个调度程序,该调度程序为每个工作单元创建一个新线程。 计算():创建并返回一个用于计算工作的调度程序。这可以用于事件循环、处理

  • 调度器 调度器的算法有许多种,我们将它提取出一个 trait 作为接口 os/src/algorithm/src/scheduler/mod.rs /// 线程调度器 /// /// 这里 `ThreadType` 就是 `Arc<Thread>` pub trait Scheduler<ThreadType: Clone + Eq>: Default { /// 优先级的类型 t