问题背景:
想要给多个流重复应用"一系列"相同的操作符,该怎么办???,比如,我们使用Rx+Retrofit进行网络请求时,都有遇到这样场景:要在io线程中请求数据,在主线程订阅,更新UI,所以必须频繁使用下面这样的代码:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer)
,如果我们将"让"些操作符,连续的,可重复的 应用到所有流上,
这时候compose操作符就派上用处了.
compose()操作符:
compose() 源码:
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
}
compose 区分于lift(lift是map,flatMap等操作符的根本),compose,是对ObservableSource本身进行操作的,上面的apply(this),里面this就是代表ObservableSource自己,而lift是对ObservableSource发送的数据进行操作的,
compose(transformer) 接收一个参数ObservableTransformer ,
transformer是一个接口,我们实现它,为了避免Object -> Observable的强转,我们在方法里定义了泛型,这个结合自己的返回数据和逻辑自行修改
public class RxSchedulers {
static final ObservableTransformer schedulersTransformer = new ObservableTransformer() {
@Override
public ObservableSource apply(Observable upstream) {
return upstream
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
public static <T> ObservableTransformer<T, T> applySchedulers() {
return (ObservableTransformer<T, T>) schedulersTransformer;
}
}
其中,apply方法里的签名(参数) Observable upstrem,即我们上面的this,即我们要将一系列变换应用在它上面,返回的Observable 就是应用一些列变换之后的Observable.