Rxjava源码分析之IO.Reactivex.Observable

施靖
2023-12-01

Rxjava 源码系列目录

  1. Rxjava源码分析之IO.Reactivex.Observer
  2. Rxjava源码分析之IO.Reactivex.CompositeDisposable
  3. Rxjava源码分析之IO.Reactivex.Observable

博客创建时间:2020.04.26
博客更新时间:2021.04.12

以Android studio build=4.1.3,gradle=6.5,SdkVersion 30来分析讲解。如图文和网上其他资料不一致,可能是别的资料版本较低而已


前言

Observable是一个non-backpressured,基于响应class的多值提供工厂方法,中间运算操作以及同步异步响应流。

它的许多操作接收ObservableSource作为参数 (这类non-backpressured flows 基本响应接口类,Observable也实现了)。

Observable的操作默认情况下以128个元素的缓冲区运行,该大小可以通过修改系统的参数进行改变。 因为大多数操作都能重载,所以可以显示的设置内部的缓冲区大小。

所有的运算符操作可能在特定计算机中无法使用运行


常用方法

Observable的源码几千行实在是太多了,只能找几个主要的运算符操作来源码分析了。主要的操作运算符使用方法将进行逐步更新补充完整。

1. create

 /**
     * TODO
     * Provides an API (via a cold {@code Observable}) that bridges the reactive world with the callback-style world.
     * 提供一个API(通过冷{@code Observable}),将反应式世界与回调式世界联系起来。
     * <p>
     * Whenever an {@link Observer} subscribes to the returned {@code Observable},
     * the provided {@link ObservableOnSubscribe} callback is invoked with a fresh instance of an {@link ObservableEmitter}
     * that will interact only with that specific {@code Observer}.
     * 每当{@link Observer}订阅返回的{@code Observable}时,ObservableEmitter将会
     * 调用ObservableOnSubscribe提供的回调,并且只会与特定的Observer进行互动
     * <p>
     * If this {@code Observer} disposes the flow (making {@link ObservableEmitter#isDisposed} return {@code true}),
     * 如果Observer处理了flow,使得ObservableEmitter#isDisposed=true
     * other observers subscribed to the same returned {@code Observable} are not affected.
     * 其他的observers订阅返回来的同一个Observable将不受影响
     * <p>
     * You should call the {@code ObservableEmitter}'s {@code onNext}, {@code onError} and {@code onComplete} methods in a serialized fashion.
     * 你应该以序列化的方式调用ObservableEmitter的    onNext、onError或onComplete方法
     * The rest of its methods are thread-safe.  其他的方法是线程安全的
     * <p>
     * TODO 某些特定的计算机,Create方法不会运行
     *
     * @param source the emitter that is called when an {@code Observer} subscribes to the returned {@code Observable}
     *               当Observer订阅返回的Observable时调用的发射器。
     */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

2. defer


    /**
     * Returns an {@code Observable} that calls an {@link ObservableSource} factory to create an {@code ObservableSource} for each new {@link Observer}that subscribes.
     * 返回一个Observable,ObservableSource工厂方法来创建一个ObservableSource为每一个订阅的Observer
     * That is, for each subscriber, the actual {@code ObservableSource} that subscriber observes is determined by the factory function.
     * 也就是说对每个订阅者来说,订阅者的观察的实际的ObservableSource通过工厂功能决定
     * <p>
     * The {@code defer} operator allows you to defer or delay emitting items from an {@code ObservableSource} until such time as an
     * {@code Observer} subscribes to the {@code ObservableSource}.
     * defer操作符允许你通过ObservableSource推迟或者延迟发射items,直到Observer订阅观察该ObservableSource
     * <p>
     * This allows an {@code Observer} to easily obtain updates or a refreshed version of the sequence.
     * 该方法允许能够轻松地获取更新或者版本刷新
     * <p>
     * TODO  同理,特定计算机该操作符无法生效
     *
     * @param supplier the {@code ObservableSource} factory function to invoke for each {@code Observer} that subscribes to the
     *                 resulting {@code Observable}
     *                 {@code ObservableSource}工厂函数,用于为每个订阅了*结果{@code Observable}的{@code Observer}调用
     */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> defer(@NonNull Supplier<? extends ObservableSource<? extends T>> supplier) {
        Objects.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(new ObservableDefer<>(supplier));
    }

3. empty

   * Returns an {@code Observable} that emits no items to the {@link Observer} and immediately invokes its
     * {@link Observer#onComplete onComplete} method.
     * 返回一个{@code Observable},它不会向{@link Observer}发送任何项item,并立即调用其* {@link Observer#onComplete}方法。
     * <p>
     * TODO  同理,特定计算机该操作符无法生效
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    @SuppressWarnings("unchecked")
    @NonNull
    public static <T> Observable<T> empty() {
        return RxJavaPlugins.onAssembly((Observable<T>) ObservableEmpty.INSTANCE);
    }

4. error

    /**
     * Returns an {@code Observable} that invokes an {@link Observer}'s {@link Observer#onError onError} method when the
     * {@code Observer} subscribes to it.
     * <p>
     * <img width="640" height="221" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/error.supplier.png" alt="">
     * TODO  同理,特定计算机该操作符无法生效
     *
     * @param supplier a {@link Supplier} factory to return a {@link Throwable} for each individual {@code Observer}
     *                 Supplier工厂产生一个Throwable提供给每个Observer,这样可以在OnError方法中使用
     */
    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> error(@NonNull Supplier<? extends Throwable> supplier) {
        Objects.requireNonNull(supplier, "supplier is null");
        return RxJavaPlugins.onAssembly(new ObservableError<>(supplier));
    }


总结

Observable的多种创建和使用,为Rxjava带来了个多样的选择和强大的功能。
本测试Demo源码
gitee:https://gitee.com/luofaxin/RxJava3Analysis.git
github:https://github.com/l424533553/RxJava3Analysis.git


相关链接

  1. Rxjava源码分析之IO.Reactivex.Observer
  2. Rxjava源码分析之IO.Reactivex.CompositeDisposable
  3. Rxjava源码分析之IO.Reactivex.Observable

扩展链接:

  1. 最通俗易懂的教你使用RxJava3(一)
  2. 最通俗易懂的教你使用RxJava3(二)
  3. 最通俗易懂的教你使用RxJava3(三)

扩展训练:

  1. Observable几个常用的方法
  2. Observable的几种常见创建方式

博客书写不易,您的点赞收藏是我前进的动力,千万别忘记点赞、 收藏 ^ _ ^ !

 类似资料: