当前位置: 首页 > 知识库问答 >
问题:

为什么多个RXJava可观察对象没有并行执行?

薛华奥
2023-03-14

我是RxJava的新手,正在尝试从link执行多个观测值的并行执行示例:RxJava并行获取观测值

虽然上面链接中提供的示例是并行执行可观察对象,但是当我在foreach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)时,系统开始一次执行一个可观察对象。请帮助我理解为什么Thread.sleep停止可观察对象的并行执行。

下面是导致多个观测值同步执行的修改示例:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking()
        .forEach(x -> { try{Thread.sleep(4000);}catch(Exception ex){}; 
        System.out.println(x + " " + Thread.currentThread().getId());});
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(i);
            s.onCompleted();
        });
    }
}

在上面的示例中,我们使用了observable的subscribeOn方法,并提供了一个线程池(Schedules.io)来执行,因此每个observable的订阅将在单独的线程上进行。

共有1个答案

厉高逸
2023-03-14

实际上,在您的示例中,并行执行正在发生,您只是看得不正确,在执行工作的位置和发出通知的位置之间存在差异。

如果你将日志与线程id在Observable.create,你会注意到每个可观察是在不同的线程同时执行。

 类似资料:
  • 我正在开发利用RxJava、realm和改进的应用程序。 我需要创建非常具体的数据处理链。我需要在io调度程序上执行改装调用,然后在我的自定义单线程领域调度程序上处理提供的数据,然后将结果推送到主线程调度程序上的ui。我试图通过使用多个组合来实现这一点,包括观察(observeOn)和订阅(subscribeOn),但我无法让中间部分在调度程序(scheduler)上执行。 我的目标是这样的

  • 我使用的是RxJava和RxAndroid,我想将两个可观察到的东西结合起来,但在这两个东西之间,我需要更新UI,所以在到达订阅服务器之前,我必须在主线程中执行代码。 一个解决方案,而不是flatmapping(这是一个被接受的术语吗?)两个可观察到的,将是在更新UI之后在订阅服务器中调用下一个可观察到的,但我觉得应该有一个更优雅的解决方案,比如: 当然,很可能map不是我这里需要使用的运算符。那

  • 问题内容: 我正在使用RxJava Observable api使用以下代码: 我的期望是观察代码,即subscribe()方法中的代码,在我指定了计算调度程序后将并行执行。相反,代码仍在单线程上按顺序执行。如何使用RxJava api使代码并行运行。 问题答案: RxJava在异步/多线程方面经常被误解。多线程操作的编码很简单,但是了解抽象是另一回事。 关于RxJava的一个常见问题是如何实现并

  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出

  • 我正在开发一个简单的REST应用程序,它利用RxJava向远程服务器发送请求(1)。对于REST API的每个传入请求,都会向(1)发送一个请求(使用RxJava和RxNetty)。一切正常,但现在我有了一个新的用例: 为了不让太多的请求轰炸(1),我需要实施速率限制。解决这个问题的一种方法(我假设)是将在向(1)发送请求时创建的每个可观察的(2)添加到另一个执行实际速率限制的(2)中。(2) 然