当前位置: 首页 > 知识库问答 >
问题:

RxJava,一个可以观察到的多个订阅者:publish().autoconnect()

陆飞捷
2023-03-14
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

我是这样理解的,从可观察的角度来看:

>

  • 有人订阅了我,我应该开始发送项目
    [订阅者:1][要发送的项目:1,2,3]

    向订阅服务器发送项“1”
    [订阅服务器:1][要发送的项:2,3]

    ...

    但它不是这样运作的。就像它们是两个独立的可观测物在一个。这让我很困惑,为什么他们不把项目给所有的订户?

    奖金:

    谢了!

  • 共有1个答案

    宇文俊明
    2023-03-14

    这是因为事实上这是两个独立的可观察物。它们是在调用subscribe()时“生成”的。因此,你所提供的步骤在意义上是不正确的,即步骤3和4只是1和2,但在一个不同的可观察的。

    但是,由于日志记录发生在线程上,您将它们视为1 1 1 2 2 2。如果您要删除observeon()部分,那么您将看到以交织的方式发送。要查看下面的运行代码:

    @Test
    public void test() throws InterruptedException {
        final Scheduler single = Schedulers.single();
        final long l = System.nanoTime();
        Observable<Long> dataStream =
                Observable.just(1, 2, 3)
                        .map(i -> System.nanoTime())
                        .subscribeOn(Schedulers.computation());
                        //.observeOn(single);
    
        dataStream.subscribe(i -> System.out.println("1  " + Thread.currentThread().getName() + " " + (i - l)));
        dataStream.subscribe(i -> System.out.println("2  " + Thread.currentThread().getName() + " " + (i - l)));
    
        Thread.sleep(1000);
    }
    

    输出,至少在我的运行中是(请注意线程名):

    1  RxComputationThreadPool-1 135376988
    2  RxComputationThreadPool-2 135376988
    1  RxComputationThreadPool-1 135486815
    2  RxComputationThreadPool-2 135537383
    1  RxComputationThreadPool-1 135560691
    2  RxComputationThreadPool-2 135617580
    
    1  RxSingleScheduler-1 186656395
    1  RxSingleScheduler-1 187919407
    1  RxSingleScheduler-1 187923753
    2  RxSingleScheduler-1 186656790
    2  RxSingleScheduler-1 187860148
    2  RxSingleScheduler-1 187864889
    
    @Test
    public void test() throws InterruptedException {
        final Scheduler single = Schedulers.single();
        final long l = System.nanoTime();
        ConnectableObservable<Long> dataStream =
                Observable.just(1, 2, 3)
                        .map(i -> System.nanoTime())
                        .subscribeOn(Schedulers.computation())
                        .observeOn(single)
                        .publish();
    
        dataStream.subscribe(i -> System.out.println("1  " + (i - l)));
        dataStream.subscribe(i -> System.out.println("2  " + (i - l)));
    
        Thread.sleep(1000);
        dataStream.connect();
        Thread.sleep(1000);
    
    }
    

    您将注意到,在第一秒钟(第一次thread.sleep()调用)没有发生任何事情,而在datastream.connect()被调用之后,就发生了发射。

    refcount()接受ConnectableObservable,并通过计算当前订阅了多少订阅服务器来向订阅服务器隐藏调用Connect()的需要。它所做的是在第一次订阅时调用connect(),在最后一次取消订阅后取消对原始可观察到的订阅。

    至于相互取消publish().autoconnect(),之后您确实得到了一个可观察的,但它有一个特殊的属性,假设最初的可观察的通过Internet进行API调用(持续10秒),当您在不使用share()的情况下使用它时,对服务器的并行查询将与这10秒内的订阅数量一样多。另一方面,对于share(),只有一次调用。

    如果共享的可观察到的工作完成得非常快(就像(1,2,3)),您将看不到它的任何好处。

    autoconnect()/refcount()为您提供了一个您订阅的中间可观测项,而不是原始可观测项。

    如果您感兴趣,请阅读这本书:用RxJava进行反应性编程

     类似资料:
    • 我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1

    • 当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。

    • 我正在用RxJava在Android中制作计时器。我需要在RxJava中制作一个计时器,以便每秒发出一个可观察的信号。我试过以下方法,但没有成功。有没有想过我做错了什么?

    • 学习角得到服务和组件和可观察性。 我正在尝试在我的演示应用程序中实现暗模式。该控件由一个服务完成,该服务根据浏览器默认值(如果找到)设置暗主题。 它在应用程序组件中初始化,以便以后在应用程序中放置控制开关。 暗模式从布尔值开始工作,因此为true或false。据我所知,使用一个可观察对象是不够的,因为我希望多个订阅者都以两种方式绑定到订阅,每种方式在服务中切换这些可观察对象。到目前为止,我很肯定这

    • 我试图用制作一个简单的马里奥游戏。因此,我需要一个,用于杀死(Mario中的主要敌人)。经过一番研究之后,我研究了 我还创建了一个 我相信(不确定我是否完全理解了这一点)这使得成为,并且我需要多个,这意味着我将有多个的 所以我有两个问题:我对有什么不了解,以及如何让多个()在死亡时调用

    • 问题内容: 我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“ java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。 在下面,您找到(最有可能)触发异常的方法: 这是