Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
我是这样理解的,从可观察的角度来看:
>
有人订阅了我,我应该开始发送项目
[订阅者:1][要发送的项目:1,2,3]
向订阅服务器发送项“1”
[订阅服务器:1][要发送的项:2,3]
...
但它不是这样运作的。就像它们是两个独立的可观测物在一个。这让我很困惑,为什么他们不把项目给所有的订户?
奖金:
谢了!
这是因为事实上这是两个独立的可观察物。它们是在调用subscribe()
时“生成”的。因此,你所提供的步骤在意义上是不正确的,即步骤3和4只是1和2,但在一个不同的可观察的。
但是,由于日志记录发生在线程上,您将它们视为1 1 1 2 2 2。如果您要删除observeon()
部分,那么您将看到以交织的方式发送。要查看下面的运行代码:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
输出,至少在我的运行中是(请注意线程名):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
您将注意到,在第一秒钟(第一次thread.sleep()
调用)没有发生任何事情,而在datastream.connect()
被调用之后,就发生了发射。
refcount()
接受ConnectableObservable,并通过计算当前订阅了多少订阅服务器来向订阅服务器隐藏调用Connect()
的需要。它所做的是在第一次订阅时调用connect()
,在最后一次取消订阅后取消对原始可观察到的订阅。
至于相互取消publish().autoconnect()
,之后您确实得到了一个可观察的,但它有一个特殊的属性,假设最初的可观察的通过Internet进行API调用(持续10秒),当您在不使用share()
的情况下使用它时,对服务器的并行查询将与这10秒内的订阅数量一样多。另一方面,对于share()
,只有一次调用。
如果共享的可观察到的工作完成得非常快(就像(1,2,3)
),您将看不到它的任何好处。
autoconnect()
/refcount()
为您提供了一个您订阅的中间可观测项,而不是原始可观测项。
如果您感兴趣,请阅读这本书:用RxJava进行反应性编程
我有这个问题,我一直在寻找,但找不到解决方案(或者也许我不能根据其他答案做出解决方案)。 我的问题是,我需要找到一种方法来等待可观察的(有自己的订户)并等待另一个可观察的(有自己的订户)完成。 场景是这样的: 奥布1- 奥布斯2 - 我主要担心的是我需要两个订阅者。在我看来,obs1 和 obs2 并行运行,但需要检查 obs1 是否以新的会话令牌完成。也许这不是RxJava的主要目的。 Obs1
当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。
我正在用RxJava在Android中制作计时器。我需要在RxJava中制作一个计时器,以便每秒发出一个可观察的信号。我试过以下方法,但没有成功。有没有想过我做错了什么?
学习角得到服务和组件和可观察性。 我正在尝试在我的演示应用程序中实现暗模式。该控件由一个服务完成,该服务根据浏览器默认值(如果找到)设置暗主题。 它在应用程序组件中初始化,以便以后在应用程序中放置控制开关。 暗模式从布尔值开始工作,因此为true或false。据我所知,使用一个可观察对象是不够的,因为我希望多个订阅者都以两种方式绑定到订阅,每种方式在服务中切换这些可观察对象。到目前为止,我很肯定这
我试图用制作一个简单的马里奥游戏。因此,我需要一个,用于杀死(Mario中的主要敌人)。经过一番研究之后,我研究了 我还创建了一个 我相信(不确定我是否完全理解了这一点)这使得成为,并且我需要多个,这意味着我将有多个的 所以我有两个问题:我对有什么不了解,以及如何让多个()在死亡时调用
问题内容: 我正在使用RxJava计算Android中某些传感器数据的标准化自动相关性。奇怪的是,我的代码引发了一个异常(“ java.lang.IllegalStateException:只允许一个订阅者!”),我不确定该怎么做:我知道GroupedObservables订阅多个订阅者时可能会抛出此异常,但是我不认为我在任何地方都在使用这种东西。 在下面,您找到(最有可能)触发异常的方法: 这是