当前位置: 首页 > 知识库问答 >
问题:

RxJava observeOn调度程序线程查询

狄承望
2023-03-14

我必须根据传入的请求写入文件。由于多个请求可能同时出现,我不希望多个线程试图一起覆盖文件内容,这可能会导致丢失一些数据。

因此,我尝试使用实例变量PublishSubject收集所有请求的数据。我在初始化期间订阅了publishSubject,此订阅将在应用程序的整个生命周期内保持不变。此外,我还在一个单独的线程(由Vertx事件循环提供)上观察到相同的实例,该线程调用负责编写文件的方法。

private PublishSubject<FileData> publishSubject = PublishSubject.create();

private void init() {
    publishSubject.observeOn(RxHelper.blockingScheduler(vertx)).subscribe(fileData -> writeData(fileData));
}

稍后在请求处理过程中,我调用onNext,如下所示:

handleRequest() {
   //do some task
   publishSubject.onNext(fileData);
}

我知道,当我调用onNext时,数据将排队,由操作员指定的特定线程写入文件。然而,我想了解的是

  1. 此线程是否仅针对此任务在等待状态下被阻止?或者,
  2. 当没有文件写入时,它还会用于其他活动吗?我不想让vertx事件循环中的一个线程在等待状态中浪费掉,从而使用这种方法。此外,如果有更好的方法,请提出建议

提前谢谢。

共有1个答案

艾翼
2023-03-14

实际上,RxJava会为您做到这一点,根据定义,排放将以串行方式进行:

观察者必须连续(而不是并行)向观察者发出通知。它们可能会从不同的线程发出这些通知,但在通知之间的关系之前必须有一个正式的事件发生。(可观察合同)

因此,只要您在订阅服务器上的onNext()中运行阻塞调用(并且不会手动将工作转移到其他线程),您就可以了,不会发生并行写入。

实际上,你的担忧应该来自相反的方向——背压
您应该在这里选择背压策略,好像请求会更快,然后您会处理它们(写入文件),您可能会溢出缓冲区并陷入麻烦。(考虑使用Flowable,并根据需要选择背压策略。

关于您的问题,这取决于调度器,您使用的是RxHelper。blockingScheduler(vertx)这似乎是您的自定义代码,所以我无法判断,如果调度程序以工作队列方式使用共享线程,那么它将不会保持空闲<无论如何,Rx不会为您确定这一点,调度器的责任是根据其逻辑将工作分配给某个线程。

 类似资料:
  • 但没有。在应用程序中创建的dispatcher线程使我在优化dispatcher配置时束手无策。每次重新启动应用程序时,我都看到创建了不同数量的dispatcher线程(每次启动应用程序后,我都通过线程转储检查这一点)。 甚至线程数也不等于我在Parallelism-min中定义的线程数。由于这个低线程数,我的应用程序的处理速度非常慢。一查号码。通过下面的代码: GetRuntime().Avai

  • 调度器 调度器的算法有许多种,我们将它提取出一个 trait 作为接口 os/src/algorithm/src/scheduler/mod.rs /// 线程调度器 /// /// 这里 `ThreadType` 就是 `Arc<Thread>` pub trait Scheduler<ThreadType: Clone + Eq>: Default { /// 优先级的类型 t

  • 主要内容:1 Java 线程调度程序,2 抢占式调度与时间片调度的区别1 Java 线程调度程序 Java中的线程调度程序是JVM(Java虚拟机)的一部分,它决定应该运行哪个线程。 我们无法保证线程调度程序将会选择哪个线程来运行。 一次只能在一个进程中运行一个线程。线程调度程序主要使用抢占式或时间片调度来调度线程。 2 抢占式调度与时间片调度的区别 在抢占式调度下,最高优先级的任务会一直执行,直到进入等待状态或死机状态或存在更高优先级的任务为止。 在时间分片调度下

  • 我有一个MainClass,一个Worker类和一个Supervisor类。在MainClass中,我创建了10个Worker类和一个Supervisor类,它们在不同的线程中运行。 . . 我不知道如何实现这个,因为每个线程中的条件是相互独立的,所以我不需要同步,所以我不能使用等待通知。

  • 我正在做一个项目,我们计划使用WLP (WebSphere liberty)代替传统的WAS。 代码使用 WAS 调度程序来调度活动。 liberty 是否也具有与 WAS 中存在的相同级别的调度程序支持/功能? 如何将调度程序任务从webphere迁移到自由?

  • 问题内容: 根据我一直在阅读的定义: 线程基本上是并发(同时)运行的代码段 。 但是,如何在存在线程调度程序的情况下同时运行它们? 我读到,线程调度程序基本上是从线程池中随机选择一个线程在某个时刻运行。从中我得到一个确切的时间点,只有一个可运行线程真正处于运行状态(运行)。( 所有这些均来自SCJP Sun认证程序员学习指南 )有人可以澄清吗? 这些线程是否真正同时运行? 问题答案: 但是,如何在