当前位置: 首页 > 工具软件 > RX > 使用案例 >

Rx- compose()操作符

方斌
2023-12-01

问题背景:
想要给多个流重复应用"一系列"相同的操作符,该怎么办???,比如,我们使用Rx+Retrofit进行网络请求时,都有遇到这样场景:要在io线程中请求数据,在主线程订阅,更新UI,所以必须频繁使用下面这样的代码:

	.subscribeOn(Schedulers.io())
	.observeOn(AndroidSchedulers.mainThread())
	.subscribe(consumer)

,如果我们将"让"些操作符,连续的,可重复的 应用到所有流上,
这时候compose操作符就派上用处了.
compose()操作符:
compose() 源码:

    public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
        return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
    }

compose 区分于lift(lift是map,flatMap等操作符的根本),compose,是对ObservableSource本身进行操作的,上面的apply(this),里面this就是代表ObservableSource自己,而lift是对ObservableSource发送的数据进行操作的,
compose(transformer) 接收一个参数ObservableTransformer ,
transformer是一个接口,我们实现它,为了避免Object -> Observable的强转,我们在方法里定义了泛型,这个结合自己的返回数据和逻辑自行修改

public class RxSchedulers {

static final ObservableTransformer schedulersTransformer = new ObservableTransformer() {
            @Override
            public ObservableSource apply(Observable upstream) {
                return upstream
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread());
            }
};

public static <T> ObservableTransformer<T, T> applySchedulers() {
    return (ObservableTransformer<T, T>) schedulersTransformer;
}

}

其中,apply方法里的签名(参数) Observable upstrem,即我们上面的this,即我们要将一系列变换应用在它上面,返回的Observable 就是应用一些列变换之后的Observable.

 类似资料: