减少
运算符在可观察的末尾发出值(完成时)。
我正在寻找一种在切换图
中使用减少
的方法。当外部可观察对象发出值或完成时,我想要无限内部可观察值的sum
。
@Test
public void emit_value_when_switchmap() throws InterruptedException {
Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
.switchMapMaybe(
l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.reduce(Long::sum)
.map(a -> a + ": Final")
)
.subscribe(e -> System.out.println(e));
Thread.sleep(10000);
}
此图说明了想要的行为:
//events: --------x-----1----2---1---x-----3--0--------x-1---1----|
//result: ---------------------------4-----------------3----------2
这可能不是最好的解决方法,但目前它可以解决问题,直到有人想出更奇特的方法来解决您的用例。
请看一下我的测试,我想它解决了你的问题:
环境:(gradle--groovy)
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"
测试3正在从源可观察对象发出。每次发出新值时,都会订阅内部可观察对象。当发出新值时,内部可观察对象完成并将值推送到下游。然后将通过订阅新的内部流来处理新发出的值。
@Test
public void takeWhileReduce() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> source = PublishSubject.create();
Observable<Long> publish = source.publish(
multicast -> {
return multicast.flatMap(
o -> {
return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
.takeUntil(multicast)
.reduce(Long::sum)
.toObservable();
},
1);
});
TestObserver<Long> test = publish.test();
source.onNext(42);
scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2,3
test.assertValuesOnly(6L);
// next value is flatMapped
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2
test.assertValuesOnly(6L, 3L);
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1
test.assertValuesOnly(6L, 3L, 1L);
}
我是Rx新手,我一直想知道如何正确地一个接一个地发出值(本例中是延迟加载图像)。在不遇到并发问题的情况下,最好的方法是什么? 这是我的代码: 谢谢
我正在与之通信的服务器可以选择将多个调用加入到一个中。所以假设我加入2。。n个调用到一个调用中,响应可以检索0。。n个错误。有没有办法在一次一次性使用中避免多个错误?
我有以下代码: 第三方DK。doSomeAction回调在主线程上发布,因此发射器也将在主线程上发出,而不是在订阅线程上发出(如果我在flatMap中进一步进行一些网络交互,链将失败)。 如果我在第一个之后添加,它会切换到正确的线程,但是有没有办法在正确的线程上发出?我不能修改行为。
所以我有一个制片人/出版商,它是CompletableSubject 我想要一个供订阅者/观察者使用的只读版本,但和都没有发射任何东西 我错过了什么? 输出: 我找到了奇怪的解决方法 输出:
考虑一个场景,我们有一个发出字符串的流,我们想将字符串保存在文件中。 我正在使用Publish科目,这很好: 但是,这不起作用(只有被交付) 有没有办法让第二个场景也能正常工作? i、 例如,我们是否可以更改PublishSubject以确保它缓冲事件,直到订阅者使用它们? 请注意,BehaviorSubject不是一个选项,因为重新订阅会导致另一个文件保存。它没有“消费事件”的概念。 我找到了U
switchmap的rxjava文档定义相当模糊,它链接到与FlatMap相同的页面。这两个操作员有什么不同?