给定一个可观测的源S,我如何要求RxJava/Rx生成可观测的D,即:
大理石图:
我想使用:
我更喜欢最简单的答案,即使用标准操作员(如果可能的话)。
我的两分钱是,你可以用throttleFirst和throttletest解决这个问题,然后将它们合并在一起。
public class ThrottledEmitter {
public Observable<Integer> createEmitter(Observable<Integer> source, Scheduler scheduler) {
Observable<Integer> first = source.throttleFirst(200, TimeUnit.MILLISECONDS, scheduler);
Observable<Integer> last = source.throttleLatest(200, TimeUnit.MILLISECONDS, scheduler)
.withLatestFrom(first, (f, s) -> new Integer[]{f, s})
.filter(array -> array[0] != array[1])
.map(array -> array[0]);
return first.mergeWith(last);
}
@Test
public void VerifyEmitter() {
TestScheduler testScheduler = new TestScheduler();
Subject<Integer> subject = PublishSubject.create();
Observable<Integer> emitter = createEmitter(subject, testScheduler);
TestObserver<Integer> tObserver = emitter.test();
subject.onNext(100);
subject.onNext(200);
testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
subject.onNext(400);
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subject.onNext(500);
testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subject.onNext(600);
subject.onNext(700);
testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
subject.onNext(800);
subject.onNext(800);
testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
tObserver.assertValueAt(0, 100);
tObserver.assertValueAt(1, 400);
tObserver.assertValueAt(2, 500);
tObserver.assertValueAt(3, 600);
tObserver.assertValueAt(4, 700);
tObserver.assertValueAt(5, 800);
tObserver.assertValueAt(6, 800);
tObserver.assertValueCount(7);
}
}
这还将确保发出的事件基于标识是唯一的。两个流中的相同事件具有相同的标识,因为事件源相同。
如果一个仅限于标准操作符,则可以通过使用发布并在两种收集模式之间切换来实现:直接和随时间缓冲。在后一种模式中,如果缓冲区为空,则切换回直接模式:
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;
public class ThrottleSampleTest {
@Test
public void test() {
TestScheduler tsch = new TestScheduler();
Flowable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v ->
System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS))
);
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
static final Exception RESTART_INDICATOR = new Exception();
static <T> FlowableTransformer<T, T> throttleFirstSample(
long time, TimeUnit unit, Scheduler scheduler) {
return f ->
f
.publish(g ->
g
.take(1)
.concatWith(
g
.buffer(time, unit, scheduler)
.map(v -> {
if (v.isEmpty()) {
throw RESTART_INDICATOR;
}
return v.get(v.size() - 1);
})
)
.retry(e -> e == RESTART_INDICATOR)
)
;
}
}
编辑:另一种方法是使用自定义运算符:
@Test
public void testObservable() {
TestScheduler tsch = new TestScheduler();
Observable.fromArray(
100, // should emit 100 at T=100
110, 120, 130, 150, // should emit 150 at T=200
250, 260, // should emit 260 at T=300
400 // should emit 400 at T=400
)
.flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
.compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
.subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));
tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}
static <T> ObservableTransformer<T, T> throttleFirstSampleObservable(
long time, TimeUnit unit, Scheduler scheduler) {
return f -> new Observable<T>() {
@Override
protected void subscribeActual(Observer<? super T> observer) {
f.subscribe(new ThrottleFirstSampleObserver<T>(
observer, time, unit, scheduler.createWorker()));
}
};
}
static final class ThrottleFirstSampleObserver<T>
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {
private static final long serialVersionUID = 205628968660185683L;
static final Object TIMEOUT = new Object();
final Observer<? super T> actual;
final Queue<Object> queue;
final Worker worker;
final long time;
final TimeUnit unit;
Disposable upstream;
boolean latestMode;
T latest;
volatile boolean done;
Throwable error;
volatile boolean disposed;
ThrottleFirstSampleObserver(Observer<? super T> actual,
long time, TimeUnit unit, Worker worker) {
this.actual = actual;
this.time = time;
this.unit = unit;
this.worker = worker;
this.queue = new ConcurrentLinkedQueue<Object>();
}
@Override
public void onSubscribe(Disposable d) {
upstream = d;
actual.onSubscribe(this);
}
@Override
public void onNext(T t) {
queue.offer(t);
drain();
}
@Override
public void onError(Throwable e) {
error = e;
done = true;
drain();
}
@Override
public void onComplete() {
done = true;
drain();
}
@Override
public boolean isDisposed() {
return upstream.isDisposed();
}
@Override
public void dispose() {
disposed = true;
upstream.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
latest = null;
}
}
@Override
public void run() {
queue.offer(TIMEOUT);
drain();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}
int missed = 1;
Observer<? super T> a = actual;
Queue<Object> q = queue;
for (;;) {
for (;;) {
if (disposed) {
q.clear();
latest = null;
return;
}
boolean d = done;
Object v = q.poll();
boolean empty = v == null;
if (d && empty) {
if (latestMode) {
T u = latest;
latest = null;
if (u != null) {
a.onNext(u);
}
}
Throwable ex = error;
if (ex != null) {
a.onError(ex);
} else {
a.onComplete();
}
worker.dispose();
return;
}
if (empty) {
break;
}
if (latestMode) {
if (v == TIMEOUT) {
T u = latest;
latest = null;
if (u != null) {
a.onNext(u);
worker.schedule(this, time, unit);
} else {
latestMode = false;
}
} else {
latest = (T)v;
}
} else {
latestMode = true;
a.onNext((T)v);
worker.schedule(this, time, unit);
}
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
本文向大家介绍rx-java onBackpressureXXX运算子,包括了rx-java onBackpressureXXX运算子的使用技巧和注意事项,需要的朋友参考一下 示例 大多数开发人员在应用程序失败时会遇到背压,MissingBackpressureException并且异常通常指向observeOn运算符。实际原因通常是对的非背压使用PublishSubject,timer()或者i
我有以下问题: 我需要一张单人床 我尝试过使用zip运算符,因为这是一种常见的方法,但zip的问题是它会等待来自两个来源的数据——这意味着每次我从Room获得新数据时,zip不会传播任何新的emmision,因为它还需要来自Retrofit的新数据。 我目前的解决方案是这个,使用组合最新: 这是有效的,但有几个小问题。首先,UiModel有多个排放。这是组合最新的预期——我从数据库中获取第一个包含
本文向大家介绍Ruby数组和splat(*)运算符,包括了Ruby数组和splat(*)运算符的使用技巧和注意事项,需要的朋友参考一下 示例 的*操作者可用于解压缩变量和数组,使得它们可以作为一个独立的参数的方法进行传递。 如果尚未将单个对象包装在Array中,则可以使用它: 在上面的示例中,该wrap_in_array方法接受一个参数value。 如果value为Array,则将其元素解压缩,并
字段1,为true 字段2,真 字段3,false 字段4,false 字段5,false 结果是: {Field1,Field2,Field3,Field4,Field5} {Field1,Field2,,Field4,Field5} {Field1,Field2,,,Field5} {Field1,Field2,,,} {Field1,Field2,Field3,,Field5} {Field
这是可行的,但当我删除将源代码转换为BlockingObservable的时,程序执行并结束时没有输出。 我通常查看大理石图来正确理解事情:http://reactivex.io/documentation/operators/zip.html 在最后一句中,它说:它只会发射出与可观察源发射出的项数一样多的项,而可观察源发射出的项数最少。 这是否意味着Observable在不到1秒的时间内发出所有
主要内容:RxJava 合并运算符 介绍,RxJava 合并运算符 示例RxJava 合并运算符 介绍 以下是用于从多个 Observable 创建单个 Observable 的运算符。 运算符 描述 And/Then/When 使用模式和计划中介组合项目集。 CombineLatest 通过指定的函数组合每个 Observable 发出的最新项并发出结果项。 Join 如果在第二个 Observable 发射项目的时间范围内发送,则组合两个 Observable 发