1. RxAndroid 2.x 简介
1.1 介绍
近几年 RxJava 逐渐成为 Android 开发的新宠,越来越多的 Android 开发者正在使用或者即将使用 RxJava 。要想在 Android 上使用 RxJava, RxAndroid 必不可少.
RxAndroid GitHub 地址
https://github.com/ReactiveX/RxAndroid
RxAndroid 也隶属于 ReactiveX 组织,它是 RxJava 在 Android 上的一个扩展。从其 GitHub 的官方主页上,可以了解到 RxAndroid 提供了一个调度程序,能够切换到 Android 主线程或者任意指定的 Looper。
RxAndroid 不能单独使用,它只有依赖 RxJava 才能使用。注意,选择所依赖的 RxJava 版本时最好用最新的版本,因为新版本会修复之前版本的 Bug 并且提供一些新的特性。
RxAndroid 的下载:
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
// Because RxAndroid releases are few and far between, it is recommended you also
// explicitly depend on RxJava's latest version for bug fixes and new features.
// (see https://github.com/ReactiveX/RxJava/releases for latest 2.x.x version)
implementation 'io.reactivex.rxjava2:rxjava:2.x.x'
1.2 使用
1.2.1 AndroidSchedulers.mainThread()
在 Android 中使用时需要新增一个调度器,用于将指定的操作切换到 Android 的主线程中运行,方便做一些更新 UI 的操作。 RxAndroid 就提供了这样的调度器。
下面例子展示了从网上获取图片然后将图片转换成 Bitmap,最后显示到 ImageView 上的
过程。这里没有使用 Android 的 AsyncTask 或 Handler,而是通过 subscribeOn, observeOn 不断地切换线程来达到目的的。
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "加载Bitmap的线程名:" + Thread.currentThread().getName());
emitter.onNext(getBitmap());
}
}) // 设置数据加载在子线程中进行
.subscribeOn(Schedulers.io())
// 设置图片在主线程中进行
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Bitmap bitmap) throws Exception {
Log.d(TAG, "Next-> 更新UI的线程名: " + Thread.currentThread().getName());
if (bitmap != null) {
imageView.setImageBitmap(bitmap);
} else {
Log.d(TAG, "Next-> 加载图片失败. 线程名:" + Thread.currentThread().getName());
}
}
});
private Bitmap getBitmap() {
HttpURLConnection connection = null;
try {
URL url = new URL("http://www.designerspics.com/wp-content/uploads/2014/09/grass_shrubs_2_free_photo.jpg");
connection = (HttpURLConnection) url.openConnection();
connection.setConnectTimeout(20000);
connection.connect();
if (connection.getResponseCode() == 200) {
return BitmapFactory.decodeStream(connection.getInputStream());
}
} catch (Exception e) {
Log.d(TAG, "getBitmap: " + e.getMessage());
} finally {
if (connection != null) connection.disconnect();
}
return null;
}
// Log日志输出结果
加载Bitmap的线程名:RxCachedThreadScheduler-1
Next-> 更新UI的线程名: main
1.2.2 AndroidSchedulers.from(Looper looper)
RxAndroid 除了可以切换到主线程,还可以使用任意指定的 Looper
Observable 创建之后先发射图片的 URL,然后再在 map 操作中获取图片的 Bitmap,最后将 Bitmap 显示到 ImageView
final String image_url = "http://www.designerspics.com/wp-content/uploads/2014/09/grass_shrubs_2_free_photo.jpg";
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
Log.d(TAG, "发射的线程名:" + Thread.currentThread().getName());
emitter.onNext(image_url);
}
}).subscribeOn(AndroidSchedulers.from(Looper.getMainLooper()))
.observeOn(Schedulers.io())
.map(new Function() {
@Override
public Bitmap apply(String s) throws Exception {
Log.d(TAG, "转换成Bitmap的线程名:" + Thread.currentThread().getName());
return getBitmap(s);
}
})
// 设置图片在主线程中进行
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override
public void accept(Bitmap bitmap) throws Exception {
Log.d(TAG, "Next-> 更新UI的线程名: " + Thread.currentThread().getName());
if (bitmap != null) {
imageView.setImageBitmap(bitmap);
} else {
Log.d(TAG, "Next-> 加载图片失败. 线程名:" + Thread.currentThread().getName());
}
}
});
// Log日志输出结果
发射的线程名:main
转换成Bitmap的线程名:RxCachedThreadScheduler-1
Next-> 更新UI的线程名: main
2. Retrofit 简介
Retrofit 是一个在 Android 开发中非常流行的网络框架,底层依赖 OkHttp。 Retrofit 和 OkHttp
都出自 Square 的技术团队。
Retrofit 的 GitHub 地址
https://github.com/square/retrofit
应用程序通过 Retrofit 请求网络,实际上是使用 Retrofit 接口层封装请求参数、 Header、URL 等信息,之后由 OkHttp 完成后续的请求操作,在服务端返回数据之后, OkHttp 将原始的结果交给 Retrofit, Retrofit 再根据用户的需求对结果进行解析的过程。
Retrofit 支持大多数的 Http 方法。
Retrofit 的特点如下:
(1) Retrofit 是可插拔的,允许不同的执行机制及其库用于执行 http 调用。允许 API 请求,与应用程序其余部分中任何现有线程模型或任务框架无缝组合。
Retrofit 为常见的框架提供了适配器 ( Adapter ):
RxJaval.x Observable & Single - com.squareup.retrofit2:adapter-rxjava
RxJava2.x Observable, Flowable, Single, Completable & Maybe - com.squareup.retrofit2:adapter-rxjava2
Guava ListenableFuture - com.squareup.retrofit2:adapter-guava
Java 8 CompletableFuture - com.squareup.retrofit2:adapter-java8
(2) )允许不同的序列化格式及其库,用于将 Java 类型转换为其 http 表示形式,并将 http 实体解析为 Java 类型。
Retrofit 为常见的序列化格式提供了转换器 ( Converter ):
Gson: com.squareup.retrofit2:converter-gson
Jackson: com.squareup.retrofit2:converter-jackson
Moshi: com.squareup.retrofit2:converter-moshi
Protobuf: com.squareup.retrofit2:converter-protobuf
Wire: com.squareup.retrofit2:converter-wire
Simple XML: com.squareup.retrofit2:converter-simplexml
Scalars (primitives, boxed, and String): com.squareup.retrofit2:converter-scalars
开源社区也己经为其他库和序列化格式创建了各种第三方转换器 ( Converter ):
LoganSquare - com.github.aurae.retrofit2:converter-logansquare:1.4.1
FastJson - org.ligboy.retrofit2:converter-fastjson:2.1.0 和 org.ligboy.retrofit2:converter-fastjson-android:2.1.0
OkHttp 的特点如下:
支持 HTTP2/SPDY 黑科技
socket 自动选择最优路线,并支持自动重连。
拥有自动维护的 socket 连接池,减少握手次数
拥有队列线程池,轻松写并发
拥有 Interceptors 轻松处理请求与响应(比如透明 GZIP 压缩、 LOGGING)
基于 Headers 的缓存策略。
3. Retrofit 与 RxJava 的完美配合
Retrofit 是一个网络框架,如果想尝试响应式的编程方式,则可以结合 RxJava 一起使用。Retrofit 对 RxJava1 和 RxJava2 都提供了 Adapter。
案例:将苏州市南门地区的 PM2.5、PM10、SO2 的数据展示到 App 上。在 http://pm25.in/上可以找到获取..., pm25.in 是一个公益性的网站,免费提供空气质量数据。在调用这些接口之前,
需要去该网站注册,并申请一个 AppKey
Retrofit 使用步骤如下:
添加 Retrofit 依赖
在 App 的 build.gradle 中添加所需要的 Retrofit 库,以及 RxJava2 的 adapter 库。
implementation 'com.squareup.retrofit2:retrofit:2.7.1'
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'
implementation 'org.ligboy.retrofit2:converter-fastjson-android:2.1.0'
implementation "com.squareup.okhttp3:logging-interceptor:4.3.1"
创建 RetrofitManager
一般需要创建 Retrofit 管理类,在这里创建一个名为 RetrofitManager 类,方便在
整个 App 中使用。
RetrofitManager 代码如下:
public class RetrofitManager {
private static Retrofit retrofit;
public static Retrofit retrofit() {
if (retrofit == null) {
HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor()
.setLevel(HttpLoggingInterceptor.Level.BASIC);
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.writeTimeout(30_1000, TimeUnit.MILLISECONDS)
.readTimeout(20_1000, TimeUnit.MILLISECONDS)
.connectTimeout(15_1000, TimeUnit.MILLISECONDS)
.addInterceptor(loggingInterceptor)
.build();
retrofit = new Retrofit.Builder()
.baseUrl(APIService.API_BASE_SERVER_URL)
.addConverterFactory(FastJsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.client(okHttpClient)
.build();
}
return retrofit;
}
}
创建 APIService
接下来,我们需要定义网络请求的接口。 pm25.in 提供了多个获取空气质量数据的接口,这里选取其中 3 个接口,分别是获取一个城市所有监测点的PM2.5数据、获取一个城市所有监测点的PM10数据、获取一个城市所有监测点的 SO2 数据接口。
public interface APIService {
String API_BASE_SERVER_URL = "http://www.pm25.in/";
@GET("api/querys/pm2_5.json")
Maybe> pm25(@Query("city") String city, @Query("token") String token);
@GET("api/querys/pm10.json")
Maybe> pm10(@Query("city") String city, @Query("token") String token);
@GET("api/querys/so2.json")
Maybe> so2(@Query("city") String city, @Query("token") String token);
}
在 APIService 中,每个方法返回的类型都是 Maybe 类型,其实也可以返回 Observable、Flowable 等类型。
Retrofit 的使用
下面的代码分别调用了 3 个接口,井过滤出了南门地区的相关数据。
APIService apiService = RetrofitManager.retrofit().create(APIService.class);
apiService.pm25(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List pm25Models) throws Exception {
return pm25Models != null && !pm25Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List pm25Models) throws Exception {
for (PM25Model pm25Model : pm25Models) {
if ("南门".equals(pm25Model.position_name)) {
return Maybe.just(pm25Model);
}
}
return null;
}
})
.subscribe(new Consumer() {
@Override
public void accept(PM25Model pm25Model) throws Exception {
Log.d(TAG, "PM25.Success-> " + pm25Model);
}
}, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "PM25.Error-> " + throwable);
}
});
apiService.pm10(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List pm10Models) throws Exception {
return pm10Models != null && !pm10Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List pm10Models) throws Exception {
for (PM10Model pm10Model : pm10Models) {
if ("南门".equals(pm10Model.position_name)) {
return Maybe.just(pm10Model);
}
}
return null;
}
})
.subscribe(new Consumer() {
@Override
public void accept(PM10Model pm10Model) throws Exception {
Log.d(TAG, "PM10.Success-> " + pm10Model);
}
}, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "PM10.Error-> " + throwable);
}
});
apiService.so2(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List so2Models) throws Exception {
return so2Models != null && !so2Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List so2Models) throws Exception {
for (SO2Model so2Model : so2Models) {
if ("南门".equals(so2Model.position_name)) {
return Maybe.just(so2Model);
}
}
return null;
}
})
.subscribe(new Consumer() {
@Override
public void accept(SO2Model so2Model) throws Exception {
Log.d(TAG, "SO2.Error-> " + so2Model);
}
}, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "SO2.Error-> " + throwable);
}
});
这里还使用了 maybeToMain() 方法,它的代码如下:
@JvmStatic
fun maybeToMain(): MaybeTransformer {
return MaybeTransformer { upstream ->
upstream.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}
它用于切换线程,返回 MaybeTransformer 对象。因为 apiService 中每个返回的方法都是
Maybe 类型,所以这里会用到 MaybeTransformer 。使用了 maybeToMain() 后 ,除网络请求是在
io() 线程中运行外,其余的操作都是在主线程中运行的.
也可以让 filter、 flatMap 操作也在 io() 线程中运行,展示数据时才切换回主线程。
接下来列举一些 Retrofit 其余常见的使用场景。
(1) 合并多个网络请求
如:需要在某一个信息流列表中插入多条广告,每一条广告都需要做一次网络请求。这时就可以考虑使用 zip 操作符,将请求信息流,以及请求的多个广告的请求合并起来,等所有请求完成之后,再用合并函数将广告插到信息流固定的位置上,最后以列表的形式呈现给用户。
APIService apiService = RetrofitManager.retrofit().create(APIService.class);
Maybe pm25ModelMaybe = apiService.pm25(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List pm25Models) throws Exception {
return pm25Models != null && !pm25Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List pm25Models) throws Exception {
for (PM25Model pm25Model : pm25Models) {
if ("南门".equals(pm25Model.position_name)) {
return Maybe.just(pm25Model);
}
}
return null;
}
});
Maybe pm10ModelMaybe = apiService.pm10(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List pm10Models) throws Exception {
return pm10Models != null && !pm10Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List pm10Models) throws Exception {
for (PM10Model pm10Model : pm10Models) {
if ("南门".equals(pm10Model.position_name)) {
return Maybe.just(pm10Model);
}
}
return null;
}
});
Maybe so2ModelMaybe = apiService.so2(Constant.CITY, Constant.TOKEN)
.compose(RxJavaUtils.>maybeToMain())
.filter(new Predicate>() {
@Override
public boolean test(List so2Models) throws Exception {
return so2Models != null && !so2Models.isEmpty();
}
})
.flatMap(new Function, MaybeSource>() {
@Override
public MaybeSource apply(List so2Models) throws Exception {
for (SO2Model so2Model : so2Models) {
if ("南门".equals(so2Model.position_name)) {
return Maybe.just(so2Model);
}
}
return null;
}
});
Maybe.zip(pm25ModelMaybe, pm10ModelMaybe, so2ModelMaybe, new Function3() {
@Override
public ZipObject apply(PM25Model pm25Model, PM10Model pm10Model, SO2Model so2Model) throws Exception {
Log.d(TAG, "zip-> \r\n" + pm25Model + "\r\n" + pm10Model + "\r\n" + so2Model);
ZipObject zipObject = new ZipObject();
zipObject.pm2_5 = pm25Model.pm2_5;
zipObject.pm2_5_24h = pm25Model.pm2_5_24h;
zipObject.pm2_5_quality = pm25Model.quality;
zipObject.pm10 = pm10Model.pm10;
zipObject.pm10_24h = pm10Model.pm10_24h;
zipObject.so2 = so2Model.so2;
zipObject.so2_24h = so2Model.so2_24h;
return zipObject;
}
}).subscribe(new Consumer() {
@Override
public void accept(ZipObject zipObject) throws Exception {
Log.d(TAG, "Success-> " + zipObject);
}
}, new Consumer() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.d(TAG, "Error-> " + throwable);
}
});
(2) 返回默认值
有时, 网络请求失败可以使用 onErrorReturn 操作符, 一个空的对象作为默认值。
(3) 多个网络请求嵌套使用
若是 A 请求完成之后,才能去调用 B 请求,则可以考虑使用 flatMap 操作符。
(4) 重试机制
对于一些重要的接口,需要采用重试机制。因为有些时候用户的网络环境比较差,第一次请求接口超时了,那么再一次请求可能就会成功。虽然有一定的延时,但至少返回了数据,保证了用户体验。
apiService.loadContent(params)
.retryWhen(new RetryWithDelay(3, 1000));
在这里 retryWhen 操作符与 RetryWithDelay 一起搭配使用,表示有 3 次重试机会,每次的延迟时间是 1000ms。 RetryWithDelay 是一个工具类,使用kotlin语言编写。
class RetryWithDelay(
private val maxRetries: Int,
private val retryDelayMillis: Int
) : Function, Publisher> {
private var retryCount: Int = 0
init {
this.retryCount = 0
}
override fun apply(attempts: Flowable): Publisher {
return attempts.flatMap { throwable ->
if (++retryCount <= maxRetries) {
Flowable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)
} else {
Flowable.error(throwable)
}
}
}
}
如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)