当前位置: 首页 > 工具软件 > TimeOut > 使用案例 >

Rxjava2.0 超时处理 -- Observable.timeout() 操作符的源码解析

司空温书
2023-12-01

在开发的过程中,有一个需求要求在进行网络请求的时候,响应速度超过500毫秒就取消请求并沿用本地缓存数据。

这时候就想起Rxjava的TimeOut操作符:

* Returns an Observable that mirrors the source ObservableSource but applies a timeout policy for each emitted
* item. If the next item isn't emitted within the specified timeout duration starting from its predecessor,
* the resulting ObservableSource begins instead to mirror a fallback ObservableSource.
(渣渣翻译:返回对每次发射数据都执行超时策略的原始Observable镜像。在指定的timeout时间中,下一个item并没有发射的话,则最后返回的ObservableSource会替代会fallback ObservableSource的镜像)

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other) {
    ObjectHelper.requireNonNull(other, "other is null");
    return timeout0(timeout, timeUnit, other, Schedulers.computation());
}
  • long timeout --> 设定的超时范围
  • TimeUnit timeUnit --> 时间类型
  • ObservableSource<? extends T> other --> 超时后切换的Observable
  • Schedulers.computation() --> 调度器

TimeOut操作符的实现有三种:

  1. timeout(long timeout, TimeUnit timeUnit):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出 TimeoutException,以一个错误通知终止Observable。
  2. timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout 在超时时会切换到使用一个你指定的备用的 Observable。
  3. timeout(Function<> itemTimeoutIndicator):timeout使用一个Function对原始Observable发射的每一项进行观察,如果当这个Function执行完但原始Observable还没有发射下一个数据时,系统就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止原始Observable。

本次解析的第二种方法,当超时后就切换指定的Observable:

1. 首先我们先看到对应的timeOut中return的time0()函数:

[--> Observable.java]

private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other, Scheduler scheduler) {
        ObjectHelper.requireNonNull(timeUnit, "timeUnit is null");
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableTimeoutTimed<T>(this, timeout, timeUnit, scheduler, other));
    }

 2. 见最后一行,传入各种参数来创建一个ObservableTimeoutTimed<>()对象,进来看下这个类:

[>> ObservableTimeoutTimed.java]

public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> {
    ....

    public ObservableTimeoutTimed(Observable<T> source,
            long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource<? extends T> other) {
        super(source);
        this.timeout = timeout;
        this.unit = unit;
        this.scheduler = scheduler;
        this.other = other;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (other == null) {
            TimeoutObserver<T> parent = new TimeoutObserver<T>(observer, timeout, unit, scheduler.createWorker());
            observer.onSubscribe(parent);
            parent.startTimeout(0L);
            source.subscribe(parent);
        } else {
            TimeoutFallbackObserver<T> parent = new TimeoutFallbackObserver<T>(observer, timeout, unit, scheduler.createWorker(), other);
            observer.onSubscribe(parent);
            parent.startTimeout(0L);
            source.subscribe(parent);
        }
    }

 

  • 因为该类继承AbstractObservableWithUpstream,在构造函数中调用父类,是observable拥有原始上游。
  • 在subscribeActual()方法中,假如有指定的observable的话执行下面的语句--> 创建一个TimeoutFallbackObserver
    类的对象
    设置初始timeout为0其他的两步都为常规的订阅操作(绑定生命周期,让用户执行parent中的函数)

3. 按照国际惯例,我们继续了解TimeoutFallbackObserver<>()这个内部类:

[>> ObservableTimeoutTimed.java]

static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable>
    implements Observer<T>, Disposable, TimeoutSupport {

 ...

        TimeoutFallbackObserver(Observer<? super T> actual, long timeout, TimeUnit unit,
                Scheduler.Worker worker, ObservableSource<? extends T> fallback) {
            this.downstream = actual;
            this.timeout = timeout;
            this.unit = unit;
            this.worker = worker;
            this.fallback = fallback;
            this.task = new SequentialDisposable();
            this.index = new AtomicLong();
            this.upstream = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(upstream, d);
        }

        @Override
        public void onNext(T t) {
            long idx = index.get();
            if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) {
                return;
            }

            task.get().dispose();

            downstream.onNext(t);

            startTimeout(idx + 1);
        }

        void startTimeout(long nextIndex) {
            task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
        }

        @Override
        public void onError(Throwable t) {
            if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
                task.dispose();

                downstream.onError(t);

                worker.dispose();
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
                task.dispose();

                downstream.onComplete();

                worker.dispose();
            }
        }

        @Override
        public void onTimeout(long idx) {
            if (index.compareAndSet(idx, Long.MAX_VALUE)) {
                DisposableHelper.dispose(upstream);

                ObservableSource<? extends T> f = fallback;
                fallback = null;

                f.subscribe(new FallbackObserver<T>(downstream, this));

                worker.dispose();
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
            worker.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
  •  从timeout方法的定义来看,timeout的操作都是从原始Observable执行第一次onNext()方法后才进行的
  • 在onNext()方法中,看到第二的if语句,有两个功能 >> 判断是否继续执行该方法 、进行index自加1
  • 在onNext()方法中,看到最后一行“startTimeout(idx + 1)”,看了下startTimeOut()函数,就会发现实现方法模式worker.schedule(new Task())跟rxjava的observable线程切换原理的实现很相似!!!
  • 这时候我们马上想起在timeOut()函数中传入的“Schedulers.computation()”~~

 4. 到这里,我们大概猜到timeOut操作符是利用子线程来执行Timeout的计时工作(*-*好像说了跟没说一样..)。那我们一起来分析这个scheduler从初始化到被调用的整个流程

[ComputationScheduler.java]

public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport {
   ...
   static {
        MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));

        SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));
        SHUTDOWN_WORKER.dispose();

        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true);

        NONE = new FixedSchedulerPool(0, THREAD_FACTORY);
        NONE.shutdown();
    }

    static int cap(int cpuCount, int paramThreads) {
        return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;
    }

    static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport {
        final int cores;

        final PoolWorker[] eventLoops;
        long n;

        FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
            // initialize event loops
            this.cores = maxThreads;
            this.eventLoops = new PoolWorker[maxThreads];
            for (int i = 0; i < maxThreads; i++) {
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }

        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            // simple round robin, improvements to come
            return eventLoops[(int)(n++ % c)];
        }

        public void shutdown() {
            for (PoolWorker w : eventLoops) {
                w.dispose();
            }
        }

        @Override
        public void createWorkers(int number, WorkerCallback callback) {
            int c = cores;
            if (c == 0) {
                for (int i = 0; i < number; i++) {
                    callback.onWorker(i, SHUTDOWN_WORKER);
                }
            } else {
                int index = (int)n % c;
                for (int i = 0; i < number; i++) {
                    callback.onWorker(i, new EventLoopWorker(eventLoops[index]));
                    if (++index == c) {
                        index = 0;
                    }
                }
                n = index;
            }
        }
    }

    public ComputationScheduler() {
        this(THREAD_FACTORY);
    }

    public ComputationScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
        start();
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get().getEventLoop());
    }

    @Override
    public void createWorkers(int number, WorkerCallback callback) {
        ObjectHelper.verifyPositive(number, "number > 0 required");
        pool.get().createWorkers(number, callback);
    }

    @NonNull
    @Override
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) {
        PoolWorker w = pool.get().getEventLoop();
        return w.scheduleDirect(run, delay, unit);
    }

    @NonNull
    @Override
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
        PoolWorker w = pool.get().getEventLoop();
        return w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
    }

    @Override
    public void start() {
        FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory);
        if (!pool.compareAndSet(NONE, update)) {
            update.shutdown();
        }
    }

    @Override
    public void shutdown() {
        for (;;) {
            FixedSchedulerPool curr = pool.get();
            if (curr == NONE) {
                return;
            }
            if (pool.compareAndSet(curr, NONE)) {
                curr.shutdown();
                return;
            }
        }
    }

    static final class EventLoopWorker extends Scheduler.Worker {
        ...

        EventLoopWorker(PoolWorker poolWorker) {
            this.poolWorker = poolWorker;
            this.serial = new ListCompositeDisposable();
            this.timed = new CompositeDisposable();
            this.both = new ListCompositeDisposable();
            this.both.add(serial);
            this.both.add(timed);
        }

        @Override
        public void dispose() {
            if (!disposed) {
                disposed = true;
                both.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }

            return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }

            return poolWorker.scheduleActual(action, delayTime, unit, timed);
        }
    }

    static final class PoolWorker extends NewThreadWorker {
        PoolWorker(ThreadFactory threadFactory) {
            super(threadFactory);
        }
    }
}
  • 首先,在静态模块内创建优先级为5的线程工厂(RxThreadFactory)
  • 在构造函数中执行start()函数 >> 创建FixedSchedulerPool对象、判断当前FixedSchedulerPool不为None值的话就将其shutdown
  • 在FixedSchedulerPool内部类的构造函数中,主要工作是创建PoolWoker对象
  • 再来看看PoolWoker这个内部类,继承newThreadWoker,并传入ThreadFactory通过super()调用父类构造函数 >>> emm..显然易见的操作就是通过这个线程工厂和newThreadWoker()构造函数中的SchedulerPoolFactory.create()方法,利用Executors.newScheduledThreadPool()方法创建核心线程为1的子线程:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    ...

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
}

>>>

public final class SchedulerPoolFactory {
   ...

   public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }
}
  • 还记得创建TimeOutFallBackObserver<>()对象时传入的schduler.createWoker()参数吗,看下调用的createWorker()函数是怎么实现的 >> 也是返回一个EventLoopWoker()内部类的对象
  • 在EventLoopWorker()内部类还有一个重要的schedule()函数,返回一个Disposable >> poolWorker.scheduleActual(action, delayTime, unit, timed),该方法实际上执行了NewThreadWorker()类的scheduleActual()函数,通过submit()或者schedule()方法执行子线程
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
   ...

   @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
            /*
             * Creates and executes a ScheduledFuture that becomes enabled after the 
             * given delay. (任务创建后,先等待给定delaytime后再执行)
            **/
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
}

 

5. 到这里,应该都清楚worker.schedule()的实现原理 >> 利用线程工厂通过Exectuors创建一个子线程,子线程通过subimit或者schedule()方法执行传入的Runnable任务。切换线程的部分已经讲完,继续回到ObservableTimeoutTimed<>()上吧:

继续回到内部类TimeoutFallbackObserver()的startTimeout()函数:

 void startTimeout(long nextIndex) {
            task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
        }
  • 刚说到worker.schedule()执行的是一个Runnable任务,现在传入的是TimeoutTask(),一起看看这个Task吧:
static final class TimeoutTask implements Runnable {

        final TimeoutSupport parent;

        final long idx;

        TimeoutTask(long idx, TimeoutSupport parent) {
            this.idx = idx;
            this.parent = parent;
        }

        @Override
        public void run() {
            parent.onTimeout(idx);
        }
    }
  • 果不其然,TimeoutTask是引用了Runnable,并在重写的run()中调用TimoutSupport.onTimeout() >> 也就是子线程所需要执行执行的任务。接下来继续看TimeoutSupport():
interface TimeoutSupport {
   void onTimeout(long idx);
}
>>>
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable>
    implements Observer<T>, Disposable, TimeoutSupport {
    
    ...
}
  • TimeoutSupport是个接口,定义了一个onTimeout方法,且TimeoutFallbackObserver类引用了这个接口,那我们看下这个接口在内部类中是怎么实现的:
        @Override
        public void onTimeout(long idx) {
            if (index.compareAndSet(idx, Long.MAX_VALUE)) {
                DisposableHelper.dispose(upstream);

                ObservableSource<? extends T> f = fallback;
                fallback = null;

                f.subscribe(new FallbackObserver<T>(downstream, this));

                worker.dispose();
            }
        }
  • onTimeout方法会在执行Runnable的delayTime(设定的时间值)后执行。假如当前的index与传入的idx相同,代表onNext()函数在delay的这段时间内并没有再次执行发射数据(每次调用onNext()方法,index都会自增1),那就符合timeout所设定的场景了。
  • 进入if()语句后,首先把upstream(原来的observable)dispose掉,把要替换的observable(fallback)再次调用subscribe(),将替换的observable与下游observer进行订阅的操作
  • 最后执行worker.dispose()方法,取消在子线程执行的计时任务
  • 最后的最后,我们坚持把FallbackObserver<>()看完吧:
    static final class FallbackObserver<T> implements Observer<T> {

        final Observer<? super T> downstream;

        final AtomicReference<Disposable> arbiter;

        FallbackObserver(Observer<? super T> actual, AtomicReference<Disposable> arbiter) {
            this.downstream = actual;
            this.arbiter = arbiter;
        }

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.replace(arbiter, d);
        }

        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
        }

        @Override
        public void onComplete() {
            downstream.onComplete();
        }
    }
  •  好吧,其实也没干啥特别的 ^ ^ 。1. 让observer获取上游observable订阅的控制权、2.当新Observable调用这些方法将对应的值传到实际代码上重写observer的方法中进行处理

6. 最后的最后的最后...假如当消息发送过程中,没有超时,但是调用了onError()或者onComplete()会怎样呢:

还是先看源码吧:

        @Override
        public void onError(Throwable t) {
            if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
                task.dispose();

                downstream.onError(t);

                worker.dispose();
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) {
                task.dispose();

                downstream.onComplete();

                worker.dispose();
            }
        }
  • 假如之前没有切换过observable的话(切换的时候会把index值设为Long.MAX_VALUE
  • 关闭子线程,停止执行Runnable任务、执行observer的onError / omComplete()方法 

表现上就跟普通的订阅操作是一样的。

 

Observable.timeOut()操作符的整个过程分析基本完了。有纰漏错误的地方,请大神们指点一二~

 

 

 类似资料: