当前位置: 首页 > 知识库问答 >
问题:

RxJava:当其中一个流什么都不发出时,如何处理组合Latest()

华衡
2023-03-14

我使用combineLatest()组合了3个可观测数据流。所有这些都结合在一起,以便同时显示UI中的所有数据。现在,有一个场景,其中一个可观察对象不会发出任何东西,因为获取的数据可以为null。

是否有RxJava操作符让订阅者知道由于空数据,不会有任何发射?

编辑

private fun retrieveData() {
    Observable.combineLatest(getCurrentUser.execute(), getLatestGoal.execute(), getLatestLog.execute(),
            Function3<User, Goal, Log, PersonalViewModel> { user, goal, log -> mapToViewModel(user, goal, log) })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .doOnSubscribe { /*todo: animation*/ }
            .doOnNext { view.setViewModel(it) }
            .doOnComplete { view.stopLoading() }
            .doOnError { /*todo: error message*/ }
            .subscribe()
}

第三个流:getLatestLog。当用户有nog日志时,execute()不会发出任何消息。当此流不发射时,整个视图将不可见。

数据从FireBase实时数据库中获取。ChildEventListener的方法如下所示:

override fun onChildAdded(dataSnapshot: DataSnapshot?, p1: String?) {
                val log = dataSnapshot?.getValue(Log::class.java)
                log?.let { subscriber.onNext(it) }
                subscriber.onComplete()
                firebaseDatabase.reference.removeEventListener(this)
            }

共有2个答案

郑宇
2023-03-14

您可以使用public final Single first(T defaultItem)方法。所以代码可能如下所示

getLatestLog.execute()
.first(someDefaultNonNullLog)
.toObservable()
郎正初
2023-03-14

如果您有Java8或一些Optionals,您可以使用此构造:

  @Test
  void name() {
    TestScheduler scheduler = new TestScheduler();

    Observable<Optional<Integer>> o1$ =
        Observable.just(Optional.ofNullable(4)).mergeWith(Observable.never());
    Observable<Optional<Integer>> o2$ =
        Observable.just(Optional.ofNullable(2)).mergeWith(Observable.never());

    Observable<Optional<Integer>> o3$ =
        Observable.<Optional<Integer>>never()
            .timeout(1000, TimeUnit.MILLISECONDS, scheduler)
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Optional<Integer>>never()
                      .mergeWith(Observable.just(Optional.empty()));
                });

    Observable<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._1.isPresent() && t._2.isPresent() && t._3.isPresent());

    TestObserver<Tuple3<Optional<Integer>, Optional<Integer>, Optional<Integer>>> test =
        result.test();

    scheduler.advanceTimeTo(10000, TimeUnit.SECONDS);

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }

如您所知,不允许通过可观察管道发出空值。因此,我们需要一些其他的构造来表示null。在Java8中有一个称为Optional的构造(vavr称之为Option-

在这个例子中,o3$-可观察不会发出任何东西。它也可能出错,也许这更像你的情况。我们将捕获错误(在这种情况下:超时异常)并返回一个带有Optional.empty.的可观察

在组合回调中,我们组合所有三个值。在后面的步骤中,我们过滤掉所有元组,这些元组都有有效值(可选值为Value)。

只有当三个值都已发出一个值时,才会发出一个值。

当不能使用可选类时,还可以定义无效对象,如以下示例所示:

class So51217041 {
  private static Integer INVALID_VALUE = 42;

  @Test
  void name() {
    Observable<Integer> o1$ = Observable.just(4).mergeWith(Observable.never());
    Observable<Integer> o2$ = Observable.just(2).mergeWith(Observable.never());

    Observable<Integer> o3$ =
        Observable.<Integer>never()
            .onErrorResumeNext(
                throwable -> {
                  return Observable.<Integer>never().mergeWith(Observable.just(INVALID_VALUE));
                });

    Observable<Tuple3<Integer, Integer, Integer>> result =
        Observable.combineLatest(
                o1$,
                o2$,
                o3$,
                (integer, integer2, integer3) -> Tuple.of(integer, integer2, integer3))
            .filter(t -> t._3 != INVALID_VALUE); // yeah I know, I want to compare reference, not the content

    TestObserver<Tuple3<Integer, Integer, Integer>> test = result.test();

    test.assertNotComplete().assertNoErrors().assertNoValues();
  }
}

此外,当您希望流以INVALID或NULL开头时,组合最新至少会发出一个值,您可以使用可观察的#start To(INVALID)oder可观察的#start To(Optional.empty())。这将保证可观察的至少会发出一个值。

 类似资料:
  • 我有一个发出随机位/布尔值的可观察对象。我需要制作另一个可观察对象,它结合这些随机位来创建和发出随机整数。每次底层的可观察对象发出一个位时,这个可观察对象都会将该位附加到一个位字符串中,一旦该位字符串达到特定长度,这个可观察对象会将其转换为一个整数并发出它。 下面是我如何使用Android LiveData实现它的: 如何使用RxJava 2实现这一点?

  • 我必须进行N次REST API调用并合并所有调用的结果,如果至少有一次调用失败(返回错误或超时),则会失败。我想使用RxJava,我有一些要求: 能够在某些情况下配置每个api调用的重试。我的意思是,如果我有一个重试=2,我提出3个请求,每个请求最多必须重试2次,总共最多6个请求 如果我想用一个线程发出所有请求,我需要一个异步Http客户端,不是吗? 谢谢

  • 我有一个名为的函数,它将返回一个网站的链接。然后我做一些事情,比如: 找到(找到的是一个列表) 只要爬行返回一个有效的链接,这就可以正常工作,但有时它不会返回任何内容。因此,的值被添加到列表中。 所以我的问题是,是否可以从返回一些不会向列表中添加任何内容的内容?

  • 我有一个单喷口和3个螺栓(a,B,C)的拓扑结构。 我试图通过模拟tuple在bolt B中失败,在C中成功,但是storm将tuple重放到所有bolt中。

  • 考虑以下示例: 这将输出从1到5的数字,然后打印异常。 我想要实现的是使观察器保持订阅状态,并在抛出异常后继续运行,即打印从1到10的所有数字。 我尝试过使用和其他各种错误处理操作符,但正如文档中所述,它们的目的是处理可观察对象本身发出的错误。 最直接的解决方案是将的整个主体包装成一个try-catch块,但对我来说这听起来不是一个好的解决方案。在类似的Rx中。NET问题,提出的解决方案是制作一个

  • 假设我有一个喷口,它将一个主题中的消息发送到两个流(stream1和stream2),两个bolt消耗来自这些流的消息(Bolt1->stream1,bolt2->stream2)。现在,如果bolt 2没有对元组进行确认,那么消息会只被重放到bolt 2吗?根据storm中新的消费者API(apache-storm-1.0.2)实现了一个重试机制,在这个机制中,如果numfail>maxRetr