当前位置: 首页 > 知识库问答 >
问题:

具有多个订阅者和事件的RxJava并发

乐健
2023-03-14

我正在寻找一种将多个订阅者附加到RxJava可观察流的方法,每个订阅者异步处理发出的事件。

我第一次尝试使用。flatMap(),但这似乎对任何后续订阅服务器都不起作用。所有订阅服务器都在同一线程上处理事件。

.flatMap(s -> Observable.just(s).subscribeOn(Schedulers.newThread()))

最终工作的是通过每次创建一个新的可观察的来消耗新线程中的每个事件:

Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .subscribe(j -> {
                Observable.just(j)
                        .subscribeOn(Schedulers.newThread())
                        .subscribe(i -> {
                            try {
                                Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                        });
            });

输出:

s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-2=>2
s1=>RxNewThreadScheduler-3=>3

以及多个订阅者的最终结果:

ConnectableObservable<String> e = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .publish();

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.subscribe(j -> {
        Observable.just(j)
                .subscribeOn(Schedulers.newThread())
                .subscribe(i -> {
                    try {
                        Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                    System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
                });
    });

    e.connect();

输出:

s2=>RxNewThreadScheduler-4=>2
s1=>RxNewThreadScheduler-1=>1
s1=>RxNewThreadScheduler-3=>2
s2=>RxNewThreadScheduler-6=>3
s2=>RxNewThreadScheduler-2=>1
s1=>RxNewThreadScheduler-5=>3

然而,这似乎有点笨拙。有没有更优雅的解决方案,或者RxJava不是一个很好的用例?

共有3个答案

窦英武
2023-03-14

flatMap中的Observable上的doOnNext将产生与您相同的输出。

始终以顺序方式调用onNext(),因此在平面图之后使用doOnNext也不适用。由于同样的原因,在最终订阅中写入操作在您的案例中不起作用。

下面的代码是使用RxJava2编写的。在RxJava的版本1中,您必须在线程周围添加try-catch块。睡眠。

ConnectableObservable<String> e = Observable.just("1", "2", "3").publish();

e.flatMap(
      s -> Observable.just(s)
          .subscribeOn(Schedulers.newThread())
          .doOnNext(i -> {  // <<<<<<
              Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
              System.out.println("s1=>" + Thread.currentThread().getName() + "=>" + i);
          }))
  .subscribe();

e.flatMap(
      s -> Observable.just(s)
          .subscribeOn(Schedulers.newThread())
          .doOnNext(i -> {  // <<<<<<
              Thread.sleep(ThreadLocalRandom.current().nextInt(100, 500));
              System.out.println("s2=>" + Thread.currentThread().getName() + "=>" + i);
          }))
  .subscribe();

e.connect();
郎仰岳
2023-03-14

如果我正确理解了rx合约,你正在试图做一些事情,这是违背它的。

让我们看看合同

RxJava Observable的约定是事件(onNext()、onCompleted()、onEr ror())永远不能同时发出。换言之,单个可观察流必须始终序列化且线程安全。每个事件都可以从不同的线程发出,只要这些发出不是并发的。这意味着onNext()不能相互离开或同时执行。如果onNext()仍在一个线程上执行,则另一个线程无法再次开始调用它(交错)--Tomasz Nurkiewicz在RxJava反应式编程中的应用

在我看来,您试图通过在外部订阅中使用嵌套订阅来打破合同。对订阅者的onNext调用不再序列化。

为什么不将“异步”工作负载从订阅者移动到flatMap操作符并订阅新的observable:

    ConnectableObservable<String> stringObservable = Observable.from(Arrays.asList(new String[]{"1", "2", "3"}))
            .flatMap(s -> {
                return Observable.just(s).subscribeOn(Schedulers.computation());
            })
            .publish();

    stringObservable
            .flatMap(s -> {
                // do More asyncStuff depending on subscription
                return Observable.just(s).subscribeOn(Schedulers.newThread());
            })
            .subscribe(s -> {
                // use result here
            });

    stringObservable
            .subscribe(s -> {
                // use immediate result here.
            });

    stringObservable.connect();
秦伯寅
2023-03-14

使用<代码>。平面图-

 类似资料:
  • 我正在尝试了解RxJava并发的一些细节,但我不确定我的想法是否正确。我对SubscribeOn/观察的工作原理有很好的了解,但我正在尝试确定池调度程序的一些细节。为此,我正在考虑尽可能简单地实现一个1-N生产者-消费者链,其中消费者的数量与CPU一样多。 根据文档,Schedulers.computation()由与内核一样多的线程池支持。但是,根据Reactive合约,运算符只能获得顺序调用。

  • 我的应用程序有几个部分需要对从其他地方触发的事件做出反应,所以我首先想到的是事件总线。这些是我看到的要求: 订阅者方法应该是类型安全的 实现接口(如

  • 当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。

  • 由于Guava的留档很短,我在这里问它: 是否有办法将事件分派给多个订阅者,或者事件总是由第一个合适的订阅者使用? 如果是后者,为了添加这样的功能或在自己的应用程序中实现整个事件总线逻辑,是否最好扩展EventBus类?

  • 我有一个类处理一个图像,这可能是一个缓慢的过程。当工作完成时,该类包含图像的一些特性,如主色。 我有许多其他的代码想知道主颜色,当他们要求它时,它可能是或可能没有准备好。 我还没有找到使用RXJava2实现这一点的简单方法。有人能帮帮我吗? null ReplaySubject似乎有一些我正在寻找的属性,但我不确定如何正确地实现它。

  • 问题内容: 我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“ java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。 在下面,您找到(最有可能)触发异常的方法: 这是