在Android使用RxJava的时候可能需要频繁的进行线程的切换,如耗时操作放在子线程中执行,执行完后在主线程渲染界面。如下面示例代码:
deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
//执行耗时任务
return "task result";
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//渲染View
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
这是最简单的逻辑:子线程处理耗时任务,然后处理结果。
但是在实际的开发当中可能比这个更加复杂,比如有这样的逻辑,先从本地加载数据(子线程),然后界面展示本地数据(主线程),接着加载线上数据(子线程),然后渲染(主线程)。这就需要频繁的切换线程。
RxJava中通过subscribeOn和observeOn两个操作符进行线程切换。subscribeOn()主要改变的是订阅的线程,即call()执行的线程,observeOn()主要改变的是发送的线程,即onNext()执行的线程。
为了实现上面自由切换的逻辑(子线程->主线程->子线程->->主线程)
deferObservable(new Callable<String>() { //defer observable
@Override
public String call() throws Exception {
Log.d("RxThreadFragment", "defer " + Thread.currentThread().getName());
return "task result";
}
})
.observeOn(AndroidSchedulers.mainThread())//指定下面的 call 在主线程中执行
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.d("RxThreadFragment", "flatMap1 " + Thread.currentThread().getName());
return Observable.just(s);
}
})
.observeOn(Schedulers.io())//指定下面的 call 在子线程中执行
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.d("RxThreadFragment", "flatMap2 " + Thread.currentThread().getName());
return Observable.just(s);
}
})
.subscribeOn(Schedulers.io())//指定上面没有指定所在线程的Observable在IO线程执行
.observeOn(AndroidSchedulers.mainThread())//指定下面的 call 在主线程中执行
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
//etc
Log.d("RxThreadFragment", s + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
输出结果:
defer RxIoScheduler-2
flatMap2 main
flatMap3 RxIoScheduler-3
task result main
从上面的代码可以看出,observeOn 指定该操作符下面相邻的Observable 发射数据所在的线程。
subscribeOn 指定该操作符上面所有没有指定线程的Observable 所在的线程。
例如在刚刚的例子中,subscribeOn操作符上面有3个observable(“defer”,“flatMap1”,“flatMap2”)
由于"flatMap1","flatMap2"已经分别被observeOn指定了schedule了,所以呢,该subscribeOn只会对"defer"有效。
下面我们再来看一个例子
final Observable<String> observable1 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable1 thread name : " + Thread.currentThread().getName());
return "observable1 Schedulers.io()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的线程
final Observable<String> observable2 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable2 thread name : " + Thread.currentThread().getName());
return "observable2 AndroidSchedulers.mainThread()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的线程
final Observable<String> observable3 = RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "observable3 thread name : " + Thread.currentThread().getName());
return "observable3 Schedulers.io()";
}
}).subscribeOn(Schedulers.io());//指定上面call方法所在的线程
RxUtils.deferObservable(new Callable<String>() {
@Override
public String call() throws Exception {
Log.e("RxThreadFragment", "test thread name : " + Thread.currentThread().getName());
return "test thread";
}
})
.subscribeOn(Schedulers.io())//修改上面Observable call所在的线程
.observeOn(AndroidSchedulers.mainThread())//修改下面flatMap1 call所在的线程
.flatMap(new Func1<String, Observable<String>>() {//flatMap1
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap1 thread name : " + Thread.currentThread().getName());
return observable1;
}
})
.observeOn(AndroidSchedulers.mainThread())//修改下面flatMap2 call所在的线程
.flatMap(new Func1<String, Observable<String>>() {//flatMap2
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap2 thread name : " + Thread.currentThread().getName());
return observable2;
}
})
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Log.e("RxThreadFragment", "flatMap3 thread name : " + Thread.currentThread().getName());
return observable3;
}
})
.observeOn(AndroidSchedulers.mainThread())//修改下面subscribe call所在的线程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e("RxThreadFragment", "subscribe Action1 thread name : " + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
throwable.printStackTrace();
}
});
输出结果:
test thread name : RxIoScheduler-2 手动设置为后台线程
flatMap1 thread name : main 手动设置为主线程
observable1 thread name : RxIoScheduler-3 手动设置为后台线程
flatMap2 thread name : main 手动设置为主线程
observable2 thread name : RxIoScheduler-2 手动设置为后台线程
flatMap3 thread name : RxIoScheduler-2 后台线程
observable3 thread name : RxIoScheduler-3 手动设置为后台线程
subscribe Action1 thread name : main 手动设置为主线程
从这个例子中可以看出,flatMap3没有设置所在的线程,会默认使用上一个observable的线程模式, flatMap3 就是使用它上面的 observable2 的线程模式
如果上一个操作符不是flatMap,而是使用map(这样就不是返回observable2),这个时候使用的就是map call所在的线程。
通过上面两个例子我相信对RxJava线程切换应该差不多了,需要注意的是我们上面基本上都是一个subscribeOn和多个observeOn组合实现线程的自由切换的。
如果使用多个subscribeOn没有意思,只有第一个subscribeOn有效。