当前位置: 首页 > 知识库问答 >
问题:

RxJava:动态观察集

郎嘉树
2023-03-14

我有一个中心班,叫它central。它可以添加1-N个观测值。我需要动态添加这些,然后知道最终的onComplete()何时执行。

怎么做?

代码示例:

public class Central {
  public void addObservable(Observable o){
    // Add the new observable to subscriptions
  }
}

使现代化

我已经研究这个有一段时间了。使用@DaveMoten的答案,我已经接近了。

下面是我的方法,可以随意添加一个新的observable,并在所有observable都完成时收到通知(伪代码):

class central {
  PublishSubject<Observable<T>> subject = PublishSubject.create();
  int commandCount = 0;
  int concatCount = 0;
  addObservable(Observable newObs){
    commandCount++;
    concatCount++;
    subject.concatMap(o -> 
          o.doOnCompleted(
                  () -> { 
                    concatCount--;
                    LOG.warn("inner completed, concatCount: " + concatCount);
                  })
          ).doOnNext(System.out::println)
            .doOnCompleted(() -> { 
                System.out.println("OUTER completed");
            } ) .subscribe();

    onNext(newObs);
    subject.onCompleted(); // If this is here, it ends prematurely (after first onComplete)
    // if its not, OUTER onComplete never happens

    newObs.subscribe((e) ->{},
    (error) -> {},
    () -> {
        commandCount--;
        LOG.warn("inner end: commandCount: " + commandCount);
    });
  }
}
// ... somewhere else in the app:
Observable ob1 = Observable.just(t1);
addObservable.addObservable(ob1);
// ... and possibly somewhere else:
Observable ob2 = Observable.just(t2, t3);
addObservable(ob2);

// Now, possibly somewhere else:
ob1.onNext(1);
// Another place:
ob2.onNext(2);

日志记录看起来是这样的:

19:51:16.248 [main] WARN  execute: commandCount: 1
19:51:17.340 [main] WARN  execute: commandCount: 2
19:51:23.290 [main] WARN  execute: commandCount: 3
9:51:26.969 [main] WARN   inner completed, concatCount: 2
19:51:27.004 [main] WARN  inner end: commandCount: 2
19:51:27.008 [main] WARN  inner completed, concatCount: 1
19:51:27.009 [main] WARN  inner end: commandCount: 1
19:51:51.745 [ProcessIoCompletion0] WARN  inner completed, concatCount: 0
19:51:51.750 [ProcessIoCompletion0] WARN  inner completed, concatCount: -1
19:51:51.751 [ProcessIoCompletion0] WARN  inner end: commandCount: 0
19:51:51.752 [ProcessIoCompletion0] WARN  inner completed, concatCount: -2
19:51:51.753 [ProcessIoCompletion0] WARN  inner completed, concatCount: -3

更新:我添加了一些计数器,这表明我不明白concatMap发生了什么。你可以看到lambda订阅的观测者本身正确地倒计时到0,但是concatMap oncomplete倒计时到-3!而外部的完整永远不会发生。

共有1个答案

丌官瀚
2023-03-14

使用PublishSubject

PublishSubject<Observable<T>> subject = 
    PublishSubject.create();
subject
    // protect against concurrent calls to subject (optional)
    .serialize()
    .concatMap(o -> 
      o.doOnCompleted(() -> System.out.println("inner completed")))
    .doOnNext(System.out::println)
    .doOnCompleted(() -> System.out.println("completed"))
    .subscribe(subscriber);

subject.onNext(Observable.just(t1));
subject.onNext(Observable.just(t2, t3));
subject.onCompleted();

  .
 类似资料:
  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 在android 6.0.1 Samsung s6 Edge+上的测试 当device screen脱机并从debug中拔出时,可观察到的只是停止发射项目。如果设备打开,则开始发射对象。另一个问题是,在停止接收项目之前,我会按照相同项目的顺序随机地得到2/3个重复调用 ____________________________edit_________________________________

  • 我做了一个RxJava2实验,包括以下步骤: 从对象列表创建一个可观察对象。 使用EquiMapSing()将可观察对象发出的每个项目映射到SingleSource。在平图单()中,对可观察对象发出的每个项目执行异步操作。 收集ListB中的所有项目,使用Observable.toList() 结果:由于异步操作,ListB的项目顺序与ListA不同。 因此,flatMapSingle()的工作原

  • 我需要将另一个改装请求中的图像合并到其特定系列中。听起来很简单的任务把我带进了反应式的地狱,没有一丝希望。 具体来说,我还通过可观察创建服务,因为我需要获取身份验证令牌来创建服务。 我的尝试基于以下答案:https://stackoverflow.com/a/28418503/2192545. 我有点不知所措。我只是在Observable的Func2部分得到了“无法推断函数接口类型”。在IDE中,

  • 所以我有一个制片人/出版商,它是CompletableSubject 我想要一个供订阅者/观察者使用的只读版本,但和都没有发射任何东西 我错过了什么? 输出: 我找到了奇怪的解决方法 输出:

  • 问题内容: 给定汽车清单(),我可以这样做: 有没有办法我可以从一个到一个序列? 像没有参数的东西 问题答案: 您可以这样映射到: 请注意,flatMapping可能不会保留源可观察的顺序。如果订单对您很重要,请使用。