当前位置: 首页 > 知识库问答 >
问题:

RxJava-在subscribeOn()线程上发出

申高峯
2023-03-14

我有以下代码

Single.create { emitter ->
   // I/O thread here

   ThirdPartySDK.doSomeAction {
       // Main thread here

      emitter.onSuccess(someValue)
   }
}
.flatMap {
  someOtherSingle(it) // Executes on main thread
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({},{})

第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。

如果我在第一个之后添加观察(Schedulers.io()),它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改第三方SDK行为。

共有1个答案

郎鹤龄
2023-03-14

将在给定的调度程序上调用subscribeActual lambda

将线程切换到给定的调度程序。每个上游onNext调用都将从ObserveOn调度程序线程调用

正如您已经说过的,订阅只会在给定的Scheduler-Thread上调用订阅时的订阅实际方法调用。这并不意味着下游发出的将在同一个线程上。在您的情况下,将从不同的线程(例如Database/Http-ThreadPool等)调用on成功发出。

on成功将从未知线程(在您的情况下是主线程)调用。下游调用将从主线程调用。因此从主线程调用平面图。平面图中主线程上的网络调用可能会失败,因为它不允许“阻塞”主线程。

如何解决这个问题?只需在单个#create之后放置一个observeOn。主线程调用onSuces。observeOn订阅者将从主线程调用。observeOn用户将onSuccess下行呼叫(例如flatMap)重新定向到给定的observeOn调度程序线程。因此,给出了从非主循环线程调用flatMap的方法。

例子:

@Test
fun wurst() {
    val thirdPartySDKImpl = ThirdPartySDKImpl()
    Single.create<String> { emitter ->
        thirdPartySDKImpl.doSomeAction {
            emitter.onSuccess(it)
        }
    }
        // .subscribeOn(Schedulers.computation())
        // move emit from unknown thread to computation thread
        .observeOn(Schedulers.computation())
        // Single.just will be subscribe from a computation thread
        .flatMap { Single.just(123) }
        // move onSucess/ onError emit from computation thread to main-thread
        .observeOn(AndroidSchedulers.mainThread())
        // subscribe onNext / onError will be called from the main-android-thread
        .subscribe({}, {})
}

interface ThirdPartySDK {
    fun doSomeAction(callback: (v: String) -> Unit)
}

class ThirdPartySDKImpl : ThirdPartySDK {
    override fun doSomeAction(callback: (v: String) -> Unit) {
        // <- impl-detail ->
        callback("whatever")
    }

}

注意:如果create lambda不阻塞或执行一些cpu繁忙的操作,则不需要subscribeOn。如果它只订阅将从其他线程调用的回调,则不需要subscribeOn。

但有没有办法在正确的线程上发射呢?

您不应该在运算符中使用任何并发。您可能会想,您可以这样做:

    Single.create<String> { emitter ->
        thirdPartySDKImpl.doSomeAction {
            Schedulers.io().scheduleDirect {
                emitter.onSuccess(it)
            }
        }
    }

但不建议这样做,因为您可能会破坏序列化的onNext约定^1。这个示例将确保onSucess下游调用将在预期的线程上发生,但取消/取消订阅没有处理,并且可能存在其他陷阱。

如果您有一个非反应性API并且您想强制执行一些线程模型,我建议将sync包装起来。API与异步API并提供适当的观察/订阅运算符。稍后仅使用异步API。

interface ThirdPartySDKAsync {
    fun doSomeAction(): Single<String>
}

class ThirdPartySDKAsyncImpl(private val sdk: ThirdPartySDK, private val scheduler: Scheduler) :
    ThirdPartySDKAsync {
    override fun doSomeAction(): Single<String> {
        return Single.create<String> { emitter ->
            sdk.doSomeAction {
                emitter.onSuccess(it)
            }
        }.observeOn(scheduler)
    }
}

进一步阅读:https://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html

^1一次只允许一个线程调用onNext/onSuccess/onError/onComplete

 类似资料:
  • 问题内容: 我的一项活动遇到了一个奇怪的问题。从拍照/录像回来时,我正在显示一个对话框,允许用户命名相机。用户按下“确定”后,我将使用所请求的文件名发送给主题,该主题将复制文件(并显示进度对话框)。 由于某种原因,即使我调用,总是在主线程上调用执行复制的函数。 更改呼叫以解决问题。我还是想知道为什么它不起作用… 问题答案: 并且是那里最混乱的运营商。前者确保订阅副作用在指定的调度程序(线程)上发生

  • 我正在运行RxJava并创建一个主题以使用方法生成数据。我正在使用Spring。 这是我的设置: 在RxJava流上生成新数据的方式是通过Autowire private SubjectObserver SubjectObserver,然后调用SubjectObserver。发布(newDataObjGenerated) 无论我为subscribeOn()指定了什么 Schedulers.io()

  • 我在Android上使用RxJava和Retrofit2.0来处理网络请求。 在创建可观察对象时,我向其添加以下内容: 是否有一个优雅的解决方案来解决这个问题,而不必手动地用一个块包装我的实现,以便将一些东西发布到主线程?

  • 我试图在不同的IO线程中运行块和块。但是输出是这样的: 如您所见,调用似乎对流没有影响,引用调用方法的线程。 另外,请考虑下面的代码: 这些代码的问题是什么?谢了。

  • 我正在尝试学习一些RxJava和RxAndroid,我认为我遇到的问题可以很容易地使用这样的工具来解决。问题是:我们可以在一个活动中有“N”个视图,并且每个视图都用于满足某些条件。当用户按“保存”时,我们想检查所有视图中是否满足所有条件,如果不满足,请用户分别确认每个视图。所以这是我在没有RxJava的情况下如何处理这个问题的示例: 显然,我需要某种类型的监听器来确认结果,并且在条件被确认后(使用

  • 考虑一个场景,我们有一个发出字符串的流,我们想将字符串保存在文件中。 我正在使用Publish科目,这很好: 但是,这不起作用(只有被交付) 有没有办法让第二个场景也能正常工作? i、 例如,我们是否可以更改PublishSubject以确保它缓冲事件,直到订阅者使用它们? 请注意,BehaviorSubject不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。 我找到了U