当前位置: 首页 > 知识库问答 >
问题:

为什么subscribeOn在RXJava中对PublishSubject不起作用?

能烨华
2023-03-14
fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    for (i in 1..3) {
        Thread {
            queuSubject.onNext("$i")
        }.start()
    }
    Thread.sleep(15000)
}

我试图在不同的IO线程中运行map块和subscribe的onnext块。但是输出是这样的:

map 3 called Thread-2 
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1 
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0 
thread in subscription RxCachedThreadScheduler-2

如您所见,调用subscribeon似乎对publissubject流没有影响,thread-0、thread-1和thread-2引用调用onnext方法的线程。

另外,请考虑下面的代码

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    queuSubject.onNext("1")
    queuSubject.onNext("2")
    queuSubject.onNext("3")
    Thread.sleep(15000)
}
map 1 called main 
thread in subscription RxCachedThreadScheduler-1
map 2 called main 
thread in subscription RxCachedThreadScheduler-1
map 3 called main 
thread in subscription RxCachedThreadScheduler-1

这些代码的问题是什么?谢了。

共有1个答案

裴令秋
2023-03-14

因为subscribeon只影响源的订阅副作用。如果源在观察者订阅时就开始发出事件,则会产生这样的副作用:

Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();

PublishSubject没有订阅副作用,因为它只将信号从其onxxx方法中继到观察者的onxxx方法

但是,subscribeon具有时间效应,因为它会延迟对源的实际订阅,因此在publissubject的情况下,它可能无法及时看到注册的观察者,因为其他线程调用了它的onxxx方法。

val queuSubject = PublishSubject.create<String>()
    queuSubject
        .observeOn(Schedulers.io()) // <----------------------------------------
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
 类似资料:
  • Rxjava可以使用observeOn方法和subscribeOn方法更改observable和subscriber的线程调度。observeOn和subscribeOn使用lift方法返回一个新的observable。 lift方法将创建一个新的observable,我认为它像一个克隆方法,它将创建一个新的订阅者和一个新的OnSubscribe。 在observeOn()中:新的OnSubscr

  • 主要内容:RxJava PublishSubject类 介绍,RxJava PublishSubject类 声明,RxJava PublishSubject类 示例RxJava PublishSubject类 介绍 PublishSubject 向当前订阅的观察者和终端事件发送项目到当前或晚期观察者。 RxJava PublishSubject类 声明 RxJava PublishSubject类 示例 输出结果为:

  • 问题内容: 我在这里有点困惑。如果我将变量传递给json_decode,它将不起作用: 第一个回显正确显示了我传递的JSON字符串,例如 第二个回显显示NULL。因此,我从第一个回显中获取了字符串,并编写了以下代码: 你怎么说,它向我展示了正确解码的数组。字符串绝对相同,我什至保留转义字符。也许是问题所在? 问题答案: 看起来您的服务器已启用。无论是将其禁用或运行通过使用它之前。

  • 问题内容: 我有几列使用flex给出相等的宽度。每个都包含标签,我希望这些图像显示尺寸。 如本演示中所示,图像没有调整大小。这是为什么? 问题答案: 从规格: 该属性指定如何将替换元素的内容装配到通过其使用的高度和宽度建立的框中。 关键术语是: 根据其使用的高度和宽度安装到盒子中 图像将被替换,而不是其容器。并且由其使用的高度和宽度确定的框与图像本身有关,而不与容器有关。 因此,报废容器并使图像本

  • getUserWalable方法发出一些用户,我需要知道为什么他把. subbeOn(Schedulers.io()),而他已经在main funcion上调用它,我提供了这两种方法的截取,我知道,subbeOn将使进程发生在后台线程上,但当他调用它两次时,这会有什么不同,我不知道,因为我理解只是在getUsersWalable中调用一次就足够了

  • 问题内容: 为什么这项工作有效- 但这不是- 第二种情况下的输出为。你能解释一下输出吗? 问题答案: 该方法没有返回值。它会在适当的位置更改列表,并且由于您没有将分配给任何变量,因此只是“迷失在空间” 我没有重载所有有问题的方法,但是概念应该很清楚。