在开发的过程中,有一个需求要求在进行网络请求的时候,响应速度超过500毫秒就取消请求并沿用本地缓存数据。
这时候就想起Rxjava的TimeOut操作符:
* Returns an Observable that mirrors the source ObservableSource but applies a timeout policy for each emitted * item. If the next item isn't emitted within the specified timeout duration starting from its predecessor, * the resulting ObservableSource begins instead to mirror a fallback ObservableSource. (渣渣翻译:返回对每次发射数据都执行超时策略的原始Observable镜像。在指定的timeout时间中,下一个item并没有发射的话,则最后返回的ObservableSource会替代会fallback ObservableSource的镜像) @CheckReturnValue @SchedulerSupport(SchedulerSupport.COMPUTATION) public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other) { ObjectHelper.requireNonNull(other, "other is null"); return timeout0(timeout, timeUnit, other, Schedulers.computation()); }
- long timeout --> 设定的超时范围
- TimeUnit timeUnit --> 时间类型
- ObservableSource<? extends T> other --> 超时后切换的Observable
- Schedulers.computation() --> 调度器
TimeOut操作符的实现有三种:
- timeout(long timeout, TimeUnit timeUnit):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出 TimeoutException,以一个错误通知终止Observable。
- timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other):每当原始Observable发射了一项数据,computation调度器就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout 在超时时会切换到使用一个你指定的备用的 Observable。
- timeout(Function<> itemTimeoutIndicator):timeout使用一个Function对原始Observable发射的每一项进行观察,如果当这个Function执行完但原始Observable还没有发射下一个数据时,系统就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止原始Observable。
本次解析的第二种方法,当超时后就切换指定的Observable:
1. 首先我们先看到对应的timeOut中return的time0()函数:
[--> Observable.java]
private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other, Scheduler scheduler) { ObjectHelper.requireNonNull(timeUnit, "timeUnit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableTimeoutTimed<T>(this, timeout, timeUnit, scheduler, other)); }
2. 见最后一行,传入各种参数来创建一个ObservableTimeoutTimed<>()对象,进来看下这个类:
[>> ObservableTimeoutTimed.java]
public final class ObservableTimeoutTimed<T> extends AbstractObservableWithUpstream<T, T> { .... public ObservableTimeoutTimed(Observable<T> source, long timeout, TimeUnit unit, Scheduler scheduler, ObservableSource<? extends T> other) { super(source); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; this.other = other; } @Override protected void subscribeActual(Observer<? super T> observer) { if (other == null) { TimeoutObserver<T> parent = new TimeoutObserver<T>(observer, timeout, unit, scheduler.createWorker()); observer.onSubscribe(parent); parent.startTimeout(0L); source.subscribe(parent); } else { TimeoutFallbackObserver<T> parent = new TimeoutFallbackObserver<T>(observer, timeout, unit, scheduler.createWorker(), other); observer.onSubscribe(parent); parent.startTimeout(0L); source.subscribe(parent); } }
- 因为该类继承AbstractObservableWithUpstream,在构造函数中调用父类,是observable拥有原始上游。
- 在subscribeActual()方法中,假如有指定的observable的话执行下面的语句--> 创建一个TimeoutFallbackObserver
类的对象,设置初始timeout为0,其他的两步都为常规的订阅操作(绑定生命周期,让用户执行parent中的函数)。3. 按照国际惯例,我们继续了解TimeoutFallbackObserver<>()这个内部类:
[>> ObservableTimeoutTimed.java]
static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, TimeoutSupport { ... TimeoutFallbackObserver(Observer<? super T> actual, long timeout, TimeUnit unit, Scheduler.Worker worker, ObservableSource<? extends T> fallback) { this.downstream = actual; this.timeout = timeout; this.unit = unit; this.worker = worker; this.fallback = fallback; this.task = new SequentialDisposable(); this.index = new AtomicLong(); this.upstream = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable d) { DisposableHelper.setOnce(upstream, d); } @Override public void onNext(T t) { long idx = index.get(); if (idx == Long.MAX_VALUE || !index.compareAndSet(idx, idx + 1)) { return; } task.get().dispose(); downstream.onNext(t); startTimeout(idx + 1); } void startTimeout(long nextIndex) { task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); } @Override public void onError(Throwable t) { if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { task.dispose(); downstream.onError(t); worker.dispose(); } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { task.dispose(); downstream.onComplete(); worker.dispose(); } } @Override public void onTimeout(long idx) { if (index.compareAndSet(idx, Long.MAX_VALUE)) { DisposableHelper.dispose(upstream); ObservableSource<? extends T> f = fallback; fallback = null; f.subscribe(new FallbackObserver<T>(downstream, this)); worker.dispose(); } } @Override public void dispose() { DisposableHelper.dispose(upstream); DisposableHelper.dispose(this); worker.dispose(); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } }
- 从timeout方法的定义来看,timeout的操作都是从原始Observable执行第一次onNext()方法后才进行的
- 在onNext()方法中,看到第二的if语句,有两个功能 >> 判断是否继续执行该方法 、进行index自加1
- 在onNext()方法中,看到最后一行“startTimeout(idx + 1)”,看了下startTimeOut()函数,就会发现实现方法模式worker.schedule(new Task())跟rxjava的observable线程切换原理的实现很相似!!!
- 这时候我们马上想起在timeOut()函数中传入的“Schedulers.computation()”~~
4. 到这里,我们大概猜到timeOut操作符是利用子线程来执行Timeout的计时工作(*-*好像说了跟没说一样..)。那我们一起来分析这个scheduler从初始化到被调用的整个流程:
[ComputationScheduler.java]
public final class ComputationScheduler extends Scheduler implements SchedulerMultiWorkerSupport { ... static { MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown")); SHUTDOWN_WORKER.dispose(); int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_COMPUTATION_PRIORITY, Thread.NORM_PRIORITY))); THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority, true); NONE = new FixedSchedulerPool(0, THREAD_FACTORY); NONE.shutdown(); } static int cap(int cpuCount, int paramThreads) { return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; } static final class FixedSchedulerPool implements SchedulerMultiWorkerSupport { final int cores; final PoolWorker[] eventLoops; long n; FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) { // initialize event loops this.cores = maxThreads; this.eventLoops = new PoolWorker[maxThreads]; for (int i = 0; i < maxThreads; i++) { this.eventLoops[i] = new PoolWorker(threadFactory); } } public PoolWorker getEventLoop() { int c = cores; if (c == 0) { return SHUTDOWN_WORKER; } // simple round robin, improvements to come return eventLoops[(int)(n++ % c)]; } public void shutdown() { for (PoolWorker w : eventLoops) { w.dispose(); } } @Override public void createWorkers(int number, WorkerCallback callback) { int c = cores; if (c == 0) { for (int i = 0; i < number; i++) { callback.onWorker(i, SHUTDOWN_WORKER); } } else { int index = (int)n % c; for (int i = 0; i < number; i++) { callback.onWorker(i, new EventLoopWorker(eventLoops[index])); if (++index == c) { index = 0; } } n = index; } } } public ComputationScheduler() { this(THREAD_FACTORY); } public ComputationScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<FixedSchedulerPool>(NONE); start(); } @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get().getEventLoop()); } @Override public void createWorkers(int number, WorkerCallback callback) { ObjectHelper.verifyPositive(number, "number > 0 required"); pool.get().createWorkers(number, callback); } @NonNull @Override public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.scheduleDirect(run, delay, unit); } @NonNull @Override public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.schedulePeriodicallyDirect(run, initialDelay, period, unit); } @Override public void start() { FixedSchedulerPool update = new FixedSchedulerPool(MAX_THREADS, threadFactory); if (!pool.compareAndSet(NONE, update)) { update.shutdown(); } } @Override public void shutdown() { for (;;) { FixedSchedulerPool curr = pool.get(); if (curr == NONE) { return; } if (pool.compareAndSet(curr, NONE)) { curr.shutdown(); return; } } } static final class EventLoopWorker extends Scheduler.Worker { ... EventLoopWorker(PoolWorker poolWorker) { this.poolWorker = poolWorker; this.serial = new ListCompositeDisposable(); this.timed = new CompositeDisposable(); this.both = new ListCompositeDisposable(); this.both.add(serial); this.both.add(timed); } @Override public void dispose() { if (!disposed) { disposed = true; both.dispose(); } } @Override public boolean isDisposed() { return disposed; } @NonNull @Override public Disposable schedule(@NonNull Runnable action) { if (disposed) { return EmptyDisposable.INSTANCE; } return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (disposed) { return EmptyDisposable.INSTANCE; } return poolWorker.scheduleActual(action, delayTime, unit, timed); } } static final class PoolWorker extends NewThreadWorker { PoolWorker(ThreadFactory threadFactory) { super(threadFactory); } } }
- 首先,在静态模块内创建优先级为5的线程工厂(RxThreadFactory)
- 在构造函数中执行start()函数 >> 创建FixedSchedulerPool对象、判断当前FixedSchedulerPool不为None值的话就将其shutdown
- 在FixedSchedulerPool内部类的构造函数中,主要工作是创建PoolWoker对象
- 再来看看PoolWoker这个内部类,继承newThreadWoker,并传入ThreadFactory通过super()调用父类构造函数 >>> emm..显然易见的操作就是通过这个线程工厂和newThreadWoker()构造函数中的SchedulerPoolFactory.create()方法,利用Executors.newScheduledThreadPool()方法创建核心线程为1的子线程:
public class NewThreadWorker extends Scheduler.Worker implements Disposable { ... public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } } >>> public final class SchedulerPoolFactory { ... public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); tryPutIntoPool(PURGE_ENABLED, exec); return exec; } }
- 还记得创建TimeOutFallBackObserver<>()对象时传入的schduler.createWoker()参数吗,看下调用的createWorker()函数是怎么实现的 >> 也是返回一个EventLoopWoker()内部类的对象
- 在EventLoopWorker()内部类还有一个重要的schedule()函数,返回一个Disposable >> poolWorker.scheduleActual(action, delayTime, unit, timed),该方法实际上执行了NewThreadWorker()类的scheduleActual()函数,通过submit()或者schedule()方法执行子线程
public class NewThreadWorker extends Scheduler.Worker implements Disposable { ... @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { /* * Creates and executes a ScheduledFuture that becomes enabled after the * given delay. (任务创建后,先等待给定delaytime后再执行) **/ f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } }
5. 到这里,应该都清楚worker.schedule()的实现原理 >> 利用线程工厂通过Exectuors创建一个子线程,子线程通过subimit或者schedule()方法执行传入的Runnable任务。切换线程的部分已经讲完,继续回到ObservableTimeoutTimed<>()上吧:
继续回到内部类TimeoutFallbackObserver()的startTimeout()函数:
void startTimeout(long nextIndex) { task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit)); }
- 刚说到worker.schedule()执行的是一个Runnable任务,现在传入的是TimeoutTask(),一起看看这个Task吧:
static final class TimeoutTask implements Runnable { final TimeoutSupport parent; final long idx; TimeoutTask(long idx, TimeoutSupport parent) { this.idx = idx; this.parent = parent; } @Override public void run() { parent.onTimeout(idx); } }
- 果不其然,TimeoutTask是引用了Runnable,并在重写的run()中调用TimoutSupport.onTimeout() >> 也就是子线程所需要执行执行的任务。接下来继续看TimeoutSupport():
interface TimeoutSupport { void onTimeout(long idx); } >>> static final class TimeoutFallbackObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable, TimeoutSupport { ... }
- TimeoutSupport是个接口,定义了一个onTimeout方法,且TimeoutFallbackObserver类引用了这个接口,那我们看下这个接口在内部类中是怎么实现的:
@Override public void onTimeout(long idx) { if (index.compareAndSet(idx, Long.MAX_VALUE)) { DisposableHelper.dispose(upstream); ObservableSource<? extends T> f = fallback; fallback = null; f.subscribe(new FallbackObserver<T>(downstream, this)); worker.dispose(); } }
- onTimeout方法会在执行Runnable的delayTime(设定的时间值)后执行。假如当前的index与传入的idx相同,代表onNext()函数在delay的这段时间内并没有再次执行发射数据(每次调用onNext()方法,index都会自增1),那就符合timeout所设定的场景了。
- 进入if()语句后,首先把upstream(原来的observable)dispose掉,把要替换的observable(fallback)再次调用subscribe(),将替换的observable与下游observer进行订阅的操作
- 最后执行worker.dispose()方法,取消在子线程执行的计时任务
- 最后的最后,我们坚持把FallbackObserver<>()看完吧:
static final class FallbackObserver<T> implements Observer<T> { final Observer<? super T> downstream; final AtomicReference<Disposable> arbiter; FallbackObserver(Observer<? super T> actual, AtomicReference<Disposable> arbiter) { this.downstream = actual; this.arbiter = arbiter; } @Override public void onSubscribe(Disposable d) { DisposableHelper.replace(arbiter, d); } @Override public void onNext(T t) { downstream.onNext(t); } @Override public void onError(Throwable t) { downstream.onError(t); } @Override public void onComplete() { downstream.onComplete(); } }
- 好吧,其实也没干啥特别的 ^ ^ 。1. 让observer获取上游observable订阅的控制权、2.当新Observable调用这些方法将对应的值传到实际代码上重写observer的方法中进行处理
6. 最后的最后的最后...假如当消息发送过程中,没有超时,但是调用了onError()或者onComplete()会怎样呢:
还是先看源码吧:
@Override public void onError(Throwable t) { if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { task.dispose(); downstream.onError(t); worker.dispose(); } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (index.getAndSet(Long.MAX_VALUE) != Long.MAX_VALUE) { task.dispose(); downstream.onComplete(); worker.dispose(); } }
- 假如之前没有切换过observable的话(切换的时候会把index值设为Long.MAX_VALUE)
- 关闭子线程,停止执行Runnable任务、执行observer的onError / omComplete()方法
表现上就跟普通的订阅操作是一样的。
Observable.timeOut()操作符的整个过程分析基本完了。有纰漏错误的地方,请大神们指点一二~