当前位置: 首页 > 知识库问答 >
问题:

Rx:组合ThrottleFirst和Sample运算符

颜欣怡
2023-03-14

给定一个可观测的源S,我如何要求RxJava/Rx生成可观测的D,即:

  1. 毫无延迟地从S发出第一个项目
  2. 在发射每个项目之后和发射下一个项目L之前等待至少T秒,其中L是S在等待期间发射的最后一个项目
  3. 如果S在等待期间T(从点#2开始)没有产生任何项目,则在它在S中应用后立即发出下一个项目

大理石图:

我想使用:

  • 示例运算符,但它不满足#3的要求
  • Debounce运算符,但它也不满足#3的要求
  • ThrottleFirst运算符,但它不满足要求#2,因为它不记住L(而Sample会记住L)

我更喜欢最简单的答案,即使用标准操作员(如果可能的话)。

共有2个答案

乜昆
2023-03-14

我的两分钱是,你可以用throttleFirst和throttletest解决这个问题,然后将它们合并在一起。

public class ThrottledEmitter {
    public Observable<Integer> createEmitter(Observable<Integer> source, Scheduler scheduler) {
        Observable<Integer> first = source.throttleFirst(200, TimeUnit.MILLISECONDS, scheduler);

        Observable<Integer> last = source.throttleLatest(200, TimeUnit.MILLISECONDS, scheduler)
                .withLatestFrom(first, (f, s) -> new Integer[]{f, s})
                .filter(array -> array[0] != array[1])
                .map(array -> array[0]);

        return first.mergeWith(last);
    }

    @Test
    public void VerifyEmitter() {
        TestScheduler testScheduler = new TestScheduler();
        Subject<Integer> subject = PublishSubject.create();
        Observable<Integer> emitter = createEmitter(subject, testScheduler);
        TestObserver<Integer> tObserver = emitter.test();

        subject.onNext(100);
        subject.onNext(200);
        testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
        subject.onNext(400);
        testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
        testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
        subject.onNext(500);
        testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
        subject.onNext(600);
        subject.onNext(700);
        testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);
        subject.onNext(800);
        subject.onNext(800);
        testScheduler.advanceTimeBy(200, TimeUnit.MILLISECONDS);

        tObserver.assertValueAt(0, 100);
        tObserver.assertValueAt(1, 400);
        tObserver.assertValueAt(2, 500);
        tObserver.assertValueAt(3, 600);
        tObserver.assertValueAt(4, 700);
        tObserver.assertValueAt(5, 800);
        tObserver.assertValueAt(6, 800);
        tObserver.assertValueCount(7);
    }

}

这还将确保发出的事件基于标识是唯一的。两个流中的相同事件具有相同的标识,因为事件源相同。

邵阳辉
2023-03-14

如果一个仅限于标准操作符,则可以通过使用发布并在两种收集模式之间切换来实现:直接和随时间缓冲。在后一种模式中,如果缓冲区为空,则切换回直接模式:

import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;

public class ThrottleSampleTest {

    @Test
    public void test() {
        TestScheduler tsch = new TestScheduler();

        Flowable.fromArray(
                100,                // should emit 100 at T=100
                110, 120, 130, 150, // should emit 150 at T=200 
                250, 260,           // should emit 260 at T=300
                400                 // should emit 400 at T=400
        )
        .flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
        .compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
        .subscribe(v -> 
            System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS))
        );

        tsch.advanceTimeBy(1, TimeUnit.SECONDS);
    }

    static final Exception RESTART_INDICATOR = new Exception();

    static <T> FlowableTransformer<T, T> throttleFirstSample(
            long time, TimeUnit unit, Scheduler scheduler) {
        return f ->
            f
            .publish(g ->
                g
                .take(1)
                .concatWith(
                    g
                    .buffer(time, unit, scheduler)
                    .map(v -> {
                        if (v.isEmpty()) {
                            throw RESTART_INDICATOR;
                        }
                        return v.get(v.size() - 1);
                    })
                )
                .retry(e -> e == RESTART_INDICATOR)
            )
        ;
    }
}

编辑:另一种方法是使用自定义运算符:

@Test
public void testObservable() {
    TestScheduler tsch = new TestScheduler();

    Observable.fromArray(
            100,                // should emit 100 at T=100
            110, 120, 130, 150, // should emit 150 at T=200 
            250, 260,           // should emit 260 at T=300
            400                 // should emit 400 at T=400
    )
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
    .compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
    .subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));

    tsch.advanceTimeBy(1, TimeUnit.SECONDS);
}

static <T> ObservableTransformer<T, T> throttleFirstSampleObservable(
        long time, TimeUnit unit, Scheduler scheduler) {
    return f -> new Observable<T>() {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            f.subscribe(new ThrottleFirstSampleObserver<T>(
                observer, time, unit, scheduler.createWorker()));
        }
    };
}

static final class ThrottleFirstSampleObserver<T> 
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {

    private static final long serialVersionUID = 205628968660185683L;

    static final Object TIMEOUT = new Object();

    final Observer<? super T> actual;

    final Queue<Object> queue;

    final Worker worker;

    final long time;

    final TimeUnit unit;

    Disposable upstream;

    boolean latestMode;

    T latest;

    volatile boolean done;
    Throwable error;

    volatile boolean disposed;

    ThrottleFirstSampleObserver(Observer<? super T> actual, 
            long time, TimeUnit unit, Worker worker) {
        this.actual = actual;
        this.time = time;
        this.unit = unit;
        this.worker = worker;
        this.queue = new ConcurrentLinkedQueue<Object>();
    }

    @Override
    public void onSubscribe(Disposable d) {
        upstream = d;
        actual.onSubscribe(this);
    }

    @Override
    public void onNext(T t) {
        queue.offer(t);
        drain();
    }

    @Override
    public void onError(Throwable e) {
        error = e;
        done = true;
        drain();
    }

    @Override
    public void onComplete() {
        done = true;
        drain();
    }

    @Override
    public boolean isDisposed() {
        return upstream.isDisposed();
    }

    @Override
    public void dispose() {
        disposed = true;
        upstream.dispose();
        worker.dispose();
        if (getAndIncrement() == 0) {
            queue.clear();
            latest = null;
        }
    }

    @Override
    public void run() {
        queue.offer(TIMEOUT);
        drain();
    }

    void drain() {
        if (getAndIncrement() != 0) {
            return;
        }

        int missed = 1;
        Observer<? super T> a = actual;
        Queue<Object> q = queue;

        for (;;) {

            for (;;) {
                if (disposed) {
                    q.clear();
                    latest = null;
                    return;
                }


                boolean d = done;
                Object v = q.poll();
                boolean empty = v == null;

                if (d && empty) {
                    if (latestMode) {
                        T u = latest;
                        latest = null;
                        if (u != null) {
                            a.onNext(u);
                        }
                    }
                    Throwable ex = error;
                    if (ex != null) {
                        a.onError(ex);
                    } else {
                        a.onComplete();
                    }
                    worker.dispose();
                    return;
                }

                if (empty) {
                    break;
                }

                if (latestMode) {
                    if (v == TIMEOUT) {
                        T u = latest;
                        latest = null;
                        if (u != null) {
                            a.onNext(u);
                            worker.schedule(this, time, unit);
                        } else {
                            latestMode = false;
                        }
                    } else {
                        latest = (T)v;
                    }
                } else {
                    latestMode = true;
                    a.onNext((T)v);
                    worker.schedule(this, time, unit);
                }
            }

            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
}
 类似资料:
  • 本文向大家介绍rx-java onBackpressureXXX运算子,包括了rx-java onBackpressureXXX运算子的使用技巧和注意事项,需要的朋友参考一下 示例 大多数开发人员在应用程序失败时会遇到背压,MissingBackpressureException并且异常通常指向observeOn运算符。实际原因通常是对的非背压使用PublishSubject,timer()或者i

  • 我有以下问题: 我需要一张单人床 我尝试过使用zip运算符,因为这是一种常见的方法,但zip的问题是它会等待来自两个来源的数据——这意味着每次我从Room获得新数据时,zip不会传播任何新的emmision,因为它还需要来自Retrofit的新数据。 我目前的解决方案是这个,使用组合最新: 这是有效的,但有几个小问题。首先,UiModel有多个排放。这是组合最新的预期——我从数据库中获取第一个包含

  • 本文向大家介绍Ruby数组和splat(*)运算符,包括了Ruby数组和splat(*)运算符的使用技巧和注意事项,需要的朋友参考一下 示例 的*操作者可用于解压缩变量和数组,使得它们可以作为一个独立的参数的方法进行传递。 如果尚未将单个对象包装在Array中,则可以使用它: 在上面的示例中,该wrap_in_array方法接受一个参数value。 如果value为Array,则将其元素解压缩,并

  • 字段1,为true 字段2,真 字段3,false 字段4,false 字段5,false 结果是: {Field1,Field2,Field3,Field4,Field5} {Field1,Field2,,Field4,Field5} {Field1,Field2,,,Field5} {Field1,Field2,,,} {Field1,Field2,Field3,,Field5} {Field

  • 这是可行的,但当我删除将源代码转换为BlockingObservable的时,程序执行并结束时没有输出。 我通常查看大理石图来正确理解事情:http://reactivex.io/documentation/operators/zip.html 在最后一句中,它说:它只会发射出与可观察源发射出的项数一样多的项,而可观察源发射出的项数最少。 这是否意味着Observable在不到1秒的时间内发出所有

  • 主要内容:RxJava 合并运算符 介绍,RxJava 合并运算符 示例RxJava 合并运算符 介绍 以下是用于从多个 Observable 创建单个 Observable 的运算符。 运算符 描述 And/Then/When 使用模式和计划中介组合项目集。 CombineLatest 通过指定的函数组合每个 Observable 发出的最新项并发出结果项。 Join 如果在第二个 Observable 发射项目的时间范围内发送,则组合两个 Observable 发