当前位置: 首页 > 面试题库 >

RxJava和观察者代码的并行执行

帅煌
2023-03-14
问题内容

我正在使用RxJava Observable api使用以下代码:

Observable<Info> observable = fileProcessor.processFileObservable(processedFile.getAbsolutePath());
    observable
      .buffer(10000)
      .observeOn(Schedulers.computation())
      .subscribe(recordInfo -> {
        _logger.info("Running stage2 on thread with id : " + Thread.currentThread().getId());
          for(Info info : recordInfo) {
            // some I/O operation logic
         }
      }, 
      exception -> {
      }, 
      () -> {
      });

我的期望是观察代码,即subscribe()方法中的代码,在我指定了计算调度程序后将并行执行。相反,代码仍在单线程上按顺序执行。如何使用RxJava
api使代码并行运行。


问题答案:

RxJava在异步/多线程方面经常被误解。多线程操作的编码很简单,但是了解抽象是另一回事。

关于RxJava的一个常见问题是如何实现并行化,或如何从Observable同时发出多个项目。当然,此定义违反了Observable
Contract,该协议规定onNext()必须被顺序调用,并且一次不能由多个线程同时调用。

要实现并行性,您需要多个Observable。

这在一个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.subscribeOn(Schedulers.computation())
          .map(i -> intenseCalculation(i))
          .subscribe(val -> System.out.println("Subscriber received "
                  + val + " on "
                  + Thread.currentThread().getName()));

这在多个线程中运行:

Observable<Integer> vals = Observable.range(1,10);

vals.flatMap(val -> Observable.just(val)
            .subscribeOn(Schedulers.computation())
            .map(i -> intenseCalculation(i))
).subscribe(val -> System.out.println(val));

代码和文本来自此博客文章。



 类似资料:
  • 我使用的是RxJava和RxAndroid,我想将两个可观察到的东西结合起来,但在这两个东西之间,我需要更新UI,所以在到达订阅服务器之前,我必须在主线程中执行代码。 一个解决方案,而不是flatmapping(这是一个被接受的术语吗?)两个可观察到的,将是在更新UI之后在订阅服务器中调用下一个可观察到的,但我觉得应该有一个更优雅的解决方案,比如: 当然,很可能map不是我这里需要使用的运算符。那

  • 我是RxJava的新手,正在尝试从link执行多个观测值的并行执行示例:RxJava并行获取观测值 虽然上面链接中提供的示例是并行执行可观察对象,但是当我在foreach方法中添加一个Thread.sleep(TIME_IN_MILLISECONDS)时,系统开始一次执行一个可观察对象。请帮助我理解为什么Thread.sleep停止可观察对象的并行执行。 下面是导致多个观测值同步执行的修改示例:

  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 我已经阅读了ReactiveX留档几次,仍然无法完全理解当观察者订阅可观察文件时会发生什么。 我们来看一个简单的例子: StackBlitz代码。 我的问题: 传递给可观察对象的

  • 在RxJava的世界里,我们有四种角色: Observable Observer Subscriber Subjects Observables和Subjects是两个“生产”实体,Observers和Subscribers是两个“消费”实体。

  • 问题内容: 我一直在阅读Observer模式,以保持UI处于最新状态,但仍然看不到它的用途。即使在我的特定对象中通知了我的MainActivity然后运行update();方法我仍然无法使用Pet对象来获取更新值,因为该对象是在Oncreate中创建的…而我只是无法创建新对象,因为那时变量会有所不同..这是我的实施,它似乎不起作用。 观察者/ MainActivity 可观察/宠物 问题答案: 首