当前位置: 首页 > 知识库问答 >
问题:

RxJava2-间隔和调度程序

墨星鹏
2023-03-14

假设我有一个间隔,我给了它一个计算调度器。这样地:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .flatMap { ... }

那么,平面图{...}中发生的一切是否也会被调度在计算线程上?

在Observable.interval的源代码中,它说:

 * @param scheduler
 * the Scheduler on which the waiting happens and items are emitted

作为RxJava的初学者,我很难理解这个评论。我知道间隔计时器/等待逻辑发生在计算线程上。但是,关于要发出的项目的最后一部分是否也意味着发出的项目将在同一个线程上使用?还是需要观察?这样地:

Observable
    .interval(0, 1, TimeUnit.SECONDS, computationScheduler)
    .observeOn(computationScheduler)
    .flatMap { ... }

如果我想在计算线程上处理emits,是否需要进行观察?

共有1个答案

郁明诚
2023-03-14

这很容易验证:只需打印当前线程即可查看运算符在哪个线程上执行:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这将始终打印:

on subscribe: main
--> 1
on flatmap: main
on subscribe: main
--> 2
on flatmap: main
on subscribe: main
--> 3
on flatmap: main
on subscribe: main
--> 4
on flatmap: main
on subscribe: main
--> 5
on flatmap: main
on subscribe: main
--> 6
on flatmap: main
on subscribe: main
--> 7
on flatmap: main
on subscribe: main
--> 8
on flatmap: main
on subscribe: main
--> 9

按顺序处理,因为所有事情都发生在一个线程中-

观察将更改下游执行线程:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .observeOn(Schedulers.computation())
    .flatMap(e -> {
         System.out.println("on flatmap: " + Thread.currentThread().getName());
         return Observable.just(e).map(x -> "--> " + x);
     })
     .observeOn(Schedulers.io())
     .subscribe(s -> {
         System.out.println("on subscribe: " + Thread.currentThread().getName());
         System.out.println(s);
      });

每次执行的结果都不同,但flatmap和subscribe将在不同的线程中处理:

on flatmap: RxComputationThreadPool-1
on subscribe: RxCachedThreadScheduler-1

间隔将充当观察并更改下游执行线程(调度程序):

Observable.interval(0, 1, TimeUnit.SECONDS, Schedulers.computation())
    .flatMap(e -> {
        System.out.println("on flatmap: " + Thread.currentThread().getName());
        return Observable.just(e).map(x -> "--> " + x);
    })
    .subscribe(s -> {
        System.out.println("on subscribe: " + Thread.currentThread().getName());
        System.out.println(s);
    });

这一次执行是在计算调度程序的一个线程中顺序执行的:

on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 0
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 1
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 2
on flatmap: RxComputationThreadPool-1
on subscribe: RxComputationThreadPool-1
--> 3
...

默认情况下,间隔将使用计算调度程序,您不需要将其作为参数传递,也不需要观察

 类似资料:
  • 问题内容: 我正在寻找一个节点作业计划,该计划将允许我以不同的时间间隔计划许多任务。例如, 每30秒调用一次函数A 每60秒调用一次功能B 每7天调用一次函数C 我还希望能够启动和停止该过程。 到目前为止,我已经看过: 稍后 -语法使我感到困惑,显然您也不能安排一个月以上的任务 议程 -似乎是最有前途的,但是我对数据库功能感到困惑 时间计划 -太简单了,无法启动和停止 我发现后者的语法令人困惑。

  • 我使用RxJava2 Android网络的网络调用。我面临的问题是,当我试图通过命中API时,有时它不会给出任何响应,而当我试图通过命中API时,它总是给出回应 和

  • 本文向大家介绍浅析javascript的间隔调用和延时调用,包括了浅析javascript的间隔调用和延时调用的使用技巧和注意事项,需要的朋友参考一下 用 setInterval方法可以以指定的间隔实现循环调用函数,直到clearInterval方法取消循环 用clearInterval方法取消循环时,必须将setInterval方法的调用赋值给一个变量,然后clearInterval方法引用该变

  • 我有一组不重叠的,不相邻的区间,例如[{10,15},{30,35},{20,25}]。它们没有排序,但如果需要,我可以对它们进行排序。 现在,我得到了一些新的区间,例如{5,32},并希望生成一组新的区间来描述差异:这个新区间所覆盖的范围不在该集合中。在这个例子中,答案是:[{5,9},{16,19},{26,29}]。 计算这个的快速算法是什么?请注意,集合中通常有1个,有时有2个,很少有3个

  • 我正在与石英时间表和一切工作完美根据要求。但有一件事我想实现,即我希望我的下一个作业执行将触发(currentFinishTime+intervalOfScheduler) 间隔为30秒的作业执行示例: 请帮助我解决我的问题。

  • 我正在尝试理解这些调度算法: 先到先得(FCFS) 最短作业优先(SJF) 最短剩余时间(SRT) 循环赛(RR) 因此,给定一些输入: FCFS将安排为。 我似乎无法弄清楚其余的。有人可以帮助我解释差异吗? 我试过谷歌搜索,但我为SJF得到的结果有点令人困惑。