当前位置: 首页 > 工具软件 > RxJava > 使用案例 >

Rxjava 简介

伯和蔼
2023-12-01

1. 关于 Rxjava

1.1 Rxjava 定义

a library for composing asynchronous and event-based programs using observable sequences for the Java VM

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。

1.2 Rxjava 优点

随着程序逻辑变得越来越复杂,它依然能够保持简洁

<1 MB jar 包

支持 Java 8 lambada 公式

支持 Java 6+ 且 Android 2.3+

同时支持异步和同步

1.3 什么是观察者模式

观察者模式解决的问题:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。

定义:观察者采用订阅的方式,获取被观察者发出的自身状态改变的通知,从而作出反应的这样一种软件设计模式。

四个基本概念:观察者、被观察者、订阅动作、事件

1.4 Rxjava 基于观察者模式

Observable:(被观察者)发射源

Subscriber:(观察者)订阅者

Observer:(观察者)观察者

ActionX:X 的范围为 1~9 之间,它是 Rxjava 中的一个接口,有且只有一个 无参 方法 call() ,无返回值

FuncX:X 的范围为 1~9 之间,它是 Rxjava 中的一个接口,有且只有一个 含参 方法 call() ,有返回值

  • Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

  • 与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。

onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。

onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。

注意:在一个正确运行的事件序列中,* onCompleted() 和 onError() 有且只有一个*(二者互斥,即在队列中只能调用其中一个),并且是事件序列中的最后一个。

1.5 Rxjava 的订阅逻辑

Rxjava 中的订阅关系为:被观察者订阅观察者,注意不要写反了

1.6 Rxjava 的相关文档

中文手册:https://mcxiaoke.gitbooks.io/rxdocs/content/

详细教程:http://gank.io/post/560e15be2dca930e00da1083

github 源码:https://github.com/lzyzsd/Awesome-RxJava

Rejava 结合 Retrofit:http://gank.io/post/56e80c2c677659311bed9841

1.7 Rxjava 的线程控制(Schedule)

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制,介绍如下:

subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
注意: observeOn() 可以多次调用,实现程序线程的多次切换,但是 subscribeOn() 只能被调用一次

1.8 Rxjava 的变换

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。Rxjava 的变换操作指的是 Rxjava 通过 Map、flatMap 等这些操作符就能实现不同对象之间的转变。

1.9 Rxjava 的应用

Rxjava 可以跟很多框架结合起来用,例如 Retrofit、EventBus 等(RxBinding 算是融合过的)

2. Rxjava 的代码实现

2.1 添加依赖

implementation "io.reactivex.rxjava2:rxjava:2.1.16"

2.2 开始你的表演,这里一个最简单的 demo

    package com.demo.thorn.rxjavademo;

    import android.support.v7.app.AppCompatActivity;
    import android.os.Bundle;
    import android.util.Log;
    import io.reactivex.Observable;
    import io.reactivex.ObservableEmitter;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.Observer;
    import io.reactivex.disposables.Disposable;

    public class MainActivity extends AppCompatActivity {

        private static final String TAG = "JIN";

        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);

            //创建一个上游 Observable:发射源
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });

            //创建一个下游 Observer  接收源
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "subscribe");
                }

                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "" + value);
                }

                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "error");
                }

                @Override
                public void onComplete() {
                    Log.d(TAG, "complete");
                }
            };

            //建立发射远和接受源之间的连接
            observable.subscribe(observer);
        }
    }
 类似资料: