我使用combineLatest()组合了3个可观测数据流。所有这些都结合在一起,以便同时显示UI中的所有数据。现在,有一个场景,其中一个可观察对象不会发出任何东西,因为获取的数据可以为null。
是否有RxJava操作符让订阅者知道由于空数据,不会有任何发射?
编辑
private fun retrieveData() {
Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe { /*todo: animation*/ }
.doOnNext { view.setViewModel(it) }
.doOnComplete { view.stopLoading() }
.doOnError { /*todo: error message*/ }
.subscribe()
}
第三个流:getLatestLog。当用户有nog日志时,execute()不会发出任何消息。当此流不发射时,整个视图将不可见。
数据从FireBase实时数据库中获取。ChildEventListener的方法如下所示:
override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
val log = dataSnapshot?.getValue(Log::class.java)
log?.let { subscriber.onNext(it) }
subscriber.onComplete()
firebaseDatabase.reference.removeEventListener(this)
}
您可以使用public final Single first(T defaultItem)方法。所以代码可能如下所示
getLatestLog.execute()
.first(someDefaultNonNullLog)
.toObservable()
如果您有Java8或一些Optionals,您可以使用此构造:
@Test
void name() {
TestScheduler scheduler = new TestScheduler();
Observable<Optional<Integer>> o1$ =
Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
Observable<Optional<Integer>> o2$ =
Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());
Observable<Optional<Integer>> o3$ =
Observable.<Optional<Integer>>never()
.timeout(1000, TimeUnit.MILLISECONDS, scheduler)
.onErrorResumeNext(
throwable -> {
return Observable.<Optional<Integer>>never()
.mergeWith(Observable.just(Optional.empty()));
});
Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =
Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());
TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
result.test();
scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);
test.assertNotComplete().assertNoErrors().assertNoValues();
}
如您所知,不允许通过可观察管道发出空值。因此,我们需要一些其他的构造来表示null。在Java8中有一个称为Optional的构造(vavr称之为Option-
在这个例子中,o3$-可观察不会发出任何东西。它也可能出错,也许这更像你的情况。我们将捕获错误(在这种情况下:超时异常)并返回一个带有Optional.empty.的可观察
在组合回调中,我们组合所有三个值。在后面的步骤中,我们过滤掉所有元组,这些元组都有有效值(可选值为Value)。
只有当三个值都已发出一个值时,才会发出一个值。
当不能使用可选类时,还可以定义无效对象,如以下示例所示:
class So51217041 {
private static Integer INVALID_VALUE = 42;
@Test
void name() {
Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());
Observable<Integer> o3$ =
Observable.<Integer>never()
.onErrorResumeNext(
throwable -> {
return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
});
Observable<Tuple3<Integer, Integer, Integer>> result =
Observable.combineLatest(
o1$,
o2$,
o3$,
(integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
.filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content
TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();
test.assertNotComplete().assertNoErrors().assertNoValues();
}
}
此外,当您希望流以INVALID或NULL开头时,组合最新至少会发出一个值,您可以使用可观察的#start To(INVALID)oder可观察的#start To(Optional.empty())。这将保证可观察的至少会发出一个值。
我有一个发出随机位/布尔值的可观察对象。我需要制作另一个可观察对象,它结合这些随机位来创建和发出随机整数。每次底层的可观察对象发出一个位时,这个可观察对象都会将该位附加到一个位字符串中,一旦该位字符串达到特定长度,这个可观察对象会将其转换为一个整数并发出它。 下面是我如何使用Android LiveData实现它的: 如何使用RxJava 2实现这一点?
我必须进行N次REST API调用并合并所有调用的结果,如果至少有一次调用失败(返回错误或超时),则会失败。我想使用RxJava,我有一些要求: 能够在某些情况下配置每个api调用的重试。我的意思是,如果我有一个重试=2,我提出3个请求,每个请求最多必须重试2次,总共最多6个请求 如果我想用一个线程发出所有请求,我需要一个异步Http客户端,不是吗? 谢谢
我有一个名为的函数,它将返回一个网站的链接。然后我做一些事情,比如: 找到(找到的是一个列表) 只要爬行返回一个有效的链接,这就可以正常工作,但有时它不会返回任何内容。因此,的值被添加到列表中。 所以我的问题是,是否可以从返回一些不会向列表中添加任何内容的内容?
我有一个单喷口和3个螺栓(a,B,C)的拓扑结构。 我试图通过模拟tuple在bolt B中失败,在C中成功,但是storm将tuple重放到所有bolt中。
考虑以下示例: 这将输出从1到5的数字,然后打印异常。 我想要实现的是使观察器保持订阅状态,并在抛出异常后继续运行,即打印从1到10的所有数字。 我尝试过使用和其他各种错误处理操作符,但正如文档中所述,它们的目的是处理可观察对象本身发出的错误。 最直接的解决方案是将的整个主体包装成一个try-catch块,但对我来说这听起来不是一个好的解决方案。在类似的Rx中。NET问题,提出的解决方案是制作一个
假设我有一个喷口,它将一个主题中的消息发送到两个流(stream1和stream2),两个bolt消耗来自这些流的消息(Bolt1->stream1,bolt2->stream2)。现在,如果bolt 2没有对元组进行确认,那么消息会只被重放到bolt 2吗?根据storm中新的消费者API(apache-storm-1.0.2)实现了一个重试机制,在这个机制中,如果numfail>maxRetr