RXAndroid 起源于RXJava,体现了响应式的编程规范,可以方便地处理异步数据流,强调对数据的观察与处理的流程
RxAndroid 我们真正要使用的其实是Rx,安不安卓无所谓,Rx即reactivex,翻译过来就是反应式,应用过来就是体现了响应式编程的思想,一个事件下来从开始到结束都能很方便地在链式结构中查看,能知道每一步的变换情况,让这个事件变成一个整体,对于后期拓展或修改都很方便,如果没有响应式编程的思想,一个需求写下来将是零散的,每一步的结构并不明朗,对于定位问题或拓展需求将带来巨大的麻烦
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
使用的总体思路:事件从一个起点出发(Observable),经过各种变换(map等),到达终点接收(Observer)
Observable
被观察者,事件由被观察者发出
Observer
观察者,事件由观察者接收
Subscribe
订阅,将观察者与被观察者建立关系,返回Disposable对象,可用于取消订阅,事件可保持在CompositeDisposable 中统一处理,事件从subscriber触发执行,到达onSubscribe接口处理初始化工作,然后有被观察者调用创建事件操作符开始事件传递,每一次事件触发一个onNext,所有的事件完成触发onComplete
SubscriberWith
会把方法参数返回回去接收的是ResourceSubscriber,
而ResourceSubscriber实现了Disposable接口所以,一般subscribeWith用到使用Rx请求接口的这种情况,订阅后把请求参数返回回去,可以添加到CompositeDisposable中方便绑定Activity生命周期取消
AndroidObservable
针对Android的拓展,可绑定activity(bindActivity())或fragment,生命周期结束时会停止发送消息
Flowable
支持背压的被观察者,为处理事件太多处理不过来导致的内存溢出问题
FlowableSubscriber
支持背压的观察者,通过request告知发送多少数据,背压报错会默认回调到onError,程序不会崩溃
Flowable.just("1","2","3").subscribe(new FlowableSubscriber<String>() {
Subscription sub;
@Override
public void onSubscribe(@NonNull Subscription s) {
sub=s;
s.request(1);//代表接收的次数,如果次数小于总的next次数,不会回调onComplete反之结束会调用onComplete
}
@Override
public void onNext(String s) {
sub.request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
Single
被观察者,只支持发送一次事件
Maybe
被观察者 能发送0或1个事件,要么成功,要么失败
Completable
被观察者 只发送完成或错误事件
操作类型 | 操作符 |
---|---|
创建操作 | create,defer,empty,never,throw,from,interval,just,range,repeat,start,timer |
变换操作 | map,buffer,flatMap,groupBy,scan,window |
过滤操作 | fliter,debounce,distinct,elementAt,first,ignoreElements,last,sample,skip,skipLast,take,takeLast |
组合操作 | and,then,when,combineLatest,join,Merge,startWith,switch,zip,compose |
错误处理 | catch,retry |
辅助操作 | delay,do,materialize,dematerialize,observerOn,subscribeOn,serialize,subscribe,timeInterval,timeout,timestamp,using |
条件操作 | all,amb,contains,defaultlfEmpty,sequenceEqual,skipUntil,skipWhile,takeUntil,takeWhile |
算术集合操作 | average,concat,count,max,min,reduce,sum |
转换操作 | to |
连接操作 | connect,publish,refCount,replay |
create
创建一个被观察者,通过emitter操作发送事件
Observable<String> observable=Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("1");
emitter.onError(new Throwable());
emitter.onComplete();
}
});
fromXXX()
将一个源快速发射出去,这个源可以是列表,可以是action,类似于observable中T的角色
Observable<String> observable=Observable.fromArray(list);
megreArray
与concatArray
是将多个源混合发送,相当于共用一个接收者的意思,不同的是megre
不保证发送的顺序,concat
保证发射的顺序,同时concat
在每组结束后要显示调用onComplete
。
Publisher[] sources = new Publisher[localPaths.size()];
sources[0]=new Publisher<String>(){
@Override
public void subscribe(Subscriber<? super String> observer){
observer.onNext("1");
observer.onComplete();
}
}
....
Flowable.concatArray(sources).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(new FlowableSubscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(sources.length);
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
just
快速创建一个被观察者,发射一个just内的事件然后结束
Observable<String> observable=Observable.just("111");
Consumer
快速创建一个观察者,只实现接收功能
Consumer<String> consumer=new Consumer<String>() {
@Override
public void accept(String s) throws Throwable {
text.setText(s);
}
};
Action
快速创建一个观察者,只实现完成功能
Action action=new Action() {
@Override
public void run() throws Throwable {
doOver();
}
};
observable.subscribe(consumer,null,action);
compose
组合封装常用的固定操作
subscribeOn
指定被观察者的发送线程,传入一个线程调度器
observeOn
指定观察者的接收线程,传入一个线程调度器
Schedulers
线程调度器,包含4种调度模式
1.Schedulers.io() IO操作,内部是个无数量上限的线程池实现,可重用空闲线程
2.Schedulers.newThread() 开启新线程
3.Schedulers.trampoline() 当前线程,如果当前线程有任务立即停止,执行完该任务后继续
4.Schedulers.computation() 计算所使用,固定的线程数,等于CPU核数
5.AndroidSchedulers.mainThread() Android主线程调度器,针对Android的拓展
6.Schedulers.from()指定线程池
下方指定发送在IO线程,接收在主线程,也可以多次指定,表面调用一次后就会切换一次
observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer,null,action);
map()
基本变换操作符,传入一个function对象,通过实现apply方法将事件转换对象发射出去
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Throwable {
return null;
}
})
flatmap()
扁平映射,将数据放入Observables集合
scan()
扫描操作
fliter()
过滤变换操作,传入predicate对象,实现test接口,返回false表示拦截此次发射
.filter(new Predicate<Integer>() {
@Override
public boolean test(Integer integer) throws Throwable {
return false;
}
})
doOnNext()
执行该方法再执行下一步,执行顺序与书写的位置有关,不会改变正在发射的数据
.doOnNext(new Consumer<String>() {
@Override
public void accept(String integer) throws Throwable {
}
})
take()
最多保留操作,限制最多保留几个事件
.take(2)
zip()
打包变换操作,将2个不同的的订阅源数据通过apply变换得到数据发射出去,适用于多个数据源结合显示的情况,2个数据源有一边发射的数据少时,按最少的处理,多发的将不会被匹配上
Observable.zip(observable, observable, new BiFunction<String, String, Integer>() {
@Override
public Integer apply(String s, String s2) throws Throwable {
return null;
}
}).subscribe(consumer);
针对AndroidUI操作给出一系列简单使用的操作符
GitHub地址 RxBinding
导入Rxbinding库,无需再导入rxjava和rxAndroid
api 'com.jakewharton.rxbinding4:rxbinding:4.0.0'
导入时发现导入4.0.0版本不能引用RxView,导入2.0.0就能用?暂时不清楚原因,有大神知道的望评论告知
rxbinding库主要是对UI操作事件封装成被观察者,使用订阅的形式对事件进行操作,主要的操作有:
throttleFirst
设定指定时间内只响应一次事件,处理点击抖动
textChanges
文本变动事件,可用于输入监听,结合其他操作符如combineLatest可用于判空设置按钮是否可点
debounce
设置多长事件未改变就执行,一般用于判断输入完成执行搜索等动作
Rxjava3文档级教程
Rxjava subscribe()和subscribeWith()使用场景分析
Rxbinding 的基本使用
RxJava Compose的作用
observeOn()与subscribeOn()的详解