Rx ,响应式编程,是一个使用可观察数据流进行异步编程的编程接口。Rx 的使用有助于提高工作效率, 优雅的处理复杂业务场景。
Rx 的大部分语言库由 ReactiveX 这个组织负责维护,比较流行的有 RxJava / RxJS / Rx.NET。
Rx = Observables + LINQ + Schedulers.
RxJava 是响应式编程的Java实现,有以下特性:
RxAndroid 是 RxJava 针对 Android 平台的扩展,提供响应式扩展组件,快速开发 Android 程序。
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
RxJava 3 将基准提高到了 Java 8,因此项目的编译目标设置需要更改为 Java 8。
android {
compileOptions {
sourceCompatibility = '1.8'
targetCompatibility = '1.8'
}
}
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable {
try {
for (int i=0; i<10; i++) {
Thread.sleep(1000);
emitter.onNext(i);
}
emitter.onComplete();
} catch (Exception e) {
emitter.onError(e);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(@NonNull Integer integer) {
System.out.println("onNext :" + integer);
}
@Override
public void onError(@NonNull Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
示例说明:
Obervable:基类,发送0~N个数据,不支持背压;
create:创建操作符,实现中添加需要异步处理的过程;
ObservableEmitter:被观察者的发射器,用于将数据或结果通知观察者。其中,onNext可以无限调用,观察者都能接收到;onError和onComplete是互斥的,观察者只能接收到一个;
subscribeOn:指定被观察者的调度器;
Schedulers.io():表示设置被观察者在 io 线程中执行;
observeOn:指定观察者的调度器;
AndroidSchedulers.mainThread():表示回调发生在 Android 主线程中;
subscribe:指定观察者的处理过程;
Observer:在 observeOn 指定的线程中实现观察者的处理回调;
io.reactivex.rxjava3.core.Observable:发送0~N个数据,不支持背压;
io.reactivex.rxjava3.core.Flowable:发送0~N个数据,支持Reactive-Streams和背压;
io.reactivex.rxjava3.core.Single:只能发送单个数据或者一个错误;
io.reactivex.rxjava3.core.Completable:没有发送任何数据,但只处理 onComplete 和 onError 事件;
io.reactivex.rxjava3.core.Maybe:能够发射 0 或者 1 个数据,要么成功,要么失败;
在RxJava中,数据以流的方式组织:一个源数据流后跟着若干个消费者。
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
对于 operator2 来说,operator1 是它的上流,operator3 是它的下流。
当上下流在不同的线程中,通过 Observable 发射、处理、响应数据流时,如果上流发射数据的速度快于下流接收处理数据的速度,这样对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,也不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压(backpressure)问题。
详细介绍可查阅:Android RxJava :图文详解 背压策略
Scheduler 是 RxJava 以一种极其简单的方式解决多线程问题的机制。不设置调度器的话,RxJava遵循哪个线程产生就在哪个线程消费的原则。
AndroidSchedulers.mainThread():是 RxAndroid 库提供的在 Android 平台使用的调度器,用于切换到 UI 线程;
Schedulers.io():用于IO密集型任务,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。
Schedulers.newThread():为每一个任务创建一个新线程,不具有线程缓存机制。虽然使用 Schedulers.io() 的地方,都可以使用 Schedulers.newThread(),但是因为可以重用线程,Schedulers.io() 比 Schedulers.newThread() 的效率更高。
Schedulers.computation():用于CPU密集型计算任务,即不会被I/O等操作限制性能的耗时操作,例如xml、json文件的解析,图片的压缩取样等。默认线程数等于处理器数量。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。buffer,debounce,delay,interval,sample,skip操作符是在该调度器中执行的。
Schedulers.trampoline():在当前线程中立刻执行,如当前线程中有任务在执行则将其暂停,等插入进来的任务执行完成之后,再继续未完成的任务。
Schedulers.single():拥有一个线程单例,所有的任务都在这一线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
Schedulers.from(Executor executor):指定一个线程调度器,由此调度器来控制任务的执行。
Create:创建Observable.
Defer:直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable。
Empty:创建一个不发射任何数据但是正常终止的 Observable;
Never:创建一个不发射数据也不终止的 Observable;
Throw:创建一个不发射数据以一个错误终止的 Observable;
From:将数组、Iterable、Future、Callable、Completable、Maybe、Optional、Publisher、Runnable、Single、Stream等转换为Observable,对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。
Interval:创建一个按固定时间间隔发射整数序列的 Observable。
Just:创建一个发射指定值的Observable,接受1~10个参数,返回一个按参数列表顺序发射这些数据的Observable。
Range:创建一个发射一个范围内的有序整数序列的 Observable。
Repeat:创建一个发射特定数据重复多次的 Observable。
Start:返回一个 Observable, 它发射一个类似于函数声明的值。
Timer:创建一个在给定的时间段之后发射一个简单的数字0的 Observable。
Buffer:定期收集 Observable 的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
Map:对原始Observable发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的Observable。
FlatMap:将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据合并后放进一个单独的Observable。
GroupBy:将原始 Observable 拆分为一些 Observables 集合,它们中的每一个发射原始 Observable 数据序列的一个子序列。
Scan:对原始Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。
Window:和 Buffer 类似,但不是发射来自原始 Observable 的数据包,它发射的是 Observables,这些 Observables 中的每一个都发射原始 Observable 数据的一个自己,最后发射一个 onComplated 通知。
Debounce:仅在过了一段指定的时间还没发射数据时才发射一个数据,会过滤掉发射速率过快的数据项。
Distinct:过滤掉重复的数据项,只允许还没有发射过的数据项通过。
ElementAt:只发射指定索引位置的数据项。
Filter:指定函数过滤数据项。
First:只发射第一个数据项。
IgnoreElements:抑制原始 Observable 发射的所有数据,只允许它的终止通知(onError和onCompleted)通过。
Last:只发射最后一个数据项。
Sample:定时发射 Observable 最近发射的数据项。
Skip:跳过前面n个数据项。
SkipLast:跳过最后n个数据项。
Take:只发射前面n个数据项。
TakeLast:只发射最后n个数据项。
Zip:使用指定函数按顺序结合两个或多个 Observables 发射的数据项,然后发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。
Merge:将多个 Observables 的输出项合并为一个 Observable。
StartWith:在数据序列的开发插入一条指定的项。
CombineLatest:当两个 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。
Join:任何时候,只要在另一个 Observable 发射的数据定义的时间窗口内,这个 Observable 发射了一条数据,就结合两个 Observable 发射的数据。
SwitchOnNext:将一个发射多个 Observables 的Observable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项。
And/Then/When:使用 Pattern 和 Plan 作为中介,将两个或多个 Observable 发射的数据集合并在一起。它们的组合行为类似于zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern对象,然后操作那个Pattern对象,变换为一个Plan。随后将这些Plan变换为Observable的发射物。
catch:拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。
retry:如果原始 Observable 遇到错误,重新订阅它期望它能正常终止。它不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复。