当前位置: 首页 > 知识库问答 >
问题:

Rxjava:发生switchmap时发出reduce值

夏飞掣
2023-03-14

减少运算符在可观察的末尾发出值(完成时)。

我正在寻找一种在切换图中使用减少的方法。当外部可观察对象发出值或完成时,我想要无限内部可观察值的sum

@Test
public void emit_value_when_switchmap() throws InterruptedException {

    Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
            .switchMapMaybe(
                    l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                            .reduce(Long::sum)
                            .map(a -> a + ": Final")
            )
            .subscribe(e -> System.out.println(e));


    Thread.sleep(10000);
}

此图说明了想要的行为:

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  

共有1个答案

杭镜
2023-03-14

这可能不是最好的解决方法,但目前它可以解决问题,直到有人想出更奇特的方法来解决您的用例。

请看一下我的测试,我想它解决了你的问题:

环境:(gradle--groovy)

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"

测试3正在从源可观察对象发出。每次发出新值时,都会订阅内部可观察对象。当发出新值时,内部可观察对象完成并将值推送到下游。然后将通过订阅新的内部流来处理新发出的值。

  @Test
  public void takeWhileReduce() {
    TestScheduler scheduler = new TestScheduler();
    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Long> publish = source.publish(
        multicast -> {
          return multicast.flatMap(
              o -> {
                return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                    .takeUntil(multicast)
                    .reduce(Long::sum)
                    .toObservable();
              },
              1);
        });

    TestObserver<Long> test = publish.test();

    source.onNext(42);

    scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);
    // assert - values emitted: 0,1,2,3
    test.assertValuesOnly(6L);

    // next value is flatMapped
    scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1,2
    test.assertValuesOnly(6L, 3L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1
    test.assertValuesOnly(6L, 3L, 1L);
  }
 类似资料:
  • 我是Rx新手,我一直想知道如何正确地一个接一个地发出值(本例中是延迟加载图像)。在不遇到并发问题的情况下,最好的方法是什么? 这是我的代码: 谢谢

  • 我正在与之通信的服务器可以选择将多个调用加入到一个中。所以假设我加入2。。n个调用到一个调用中,响应可以检索0。。n个错误。有没有办法在一次一次性使用中避免多个错误?

  • 我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。

  • 所以我有一个制片人/出版商,它是CompletableSubject 我想要一个供订阅者/观察者使用的只读版本,但和都没有发射任何东西 我错过了什么? 输出: 我找到了奇怪的解决方法 输出:

  • 考虑一个场景,我们有一个发出字符串的流,我们想将字符串保存在文件中。 我正在使用Publish科目,这很好: 但是,这不起作用(只有被交付) 有没有办法让第二个场景也能正常工作? i、 例如,我们是否可以更改PublishSubject以确保它缓冲事件,直到订阅者使用它们? 请注意,BehaviorSubject不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。 我找到了U

  • switchmap的rxjava文档定义相当模糊,它链接到与FlatMap相同的页面。这两个操作员有什么不同?