当前位置: 首页 > 工具软件 > RxAndroid > 使用案例 >

android rxjava工具类,RxJava(十三):RxAndroid 2.x 和 Retrofit 的使用

相弘和
2023-12-01

1. RxAndroid 2.x 简介

1.1 介绍

近几年 RxJava 逐渐成为 Android 开发的新宠,越来越多的 Android 开发者正在使用或者即将使用 RxJava 。要想在 Android 上使用 RxJava, RxAndroid 必不可少.

RxAndroid GitHub 地址

https://github.com/ReactiveX/RxAndroid

RxAndroid 也隶属于 ReactiveX 组织,它是 RxJava 在 Android 上的一个扩展。从其 GitHub 的官方主页上,可以了解到 RxAndroid 提供了一个调度程序,能够切换到 Android 主线程或者任意指定的 Looper。

RxAndroid 不能单独使用,它只有依赖 RxJava 才能使用。注意,选择所依赖的 RxJava 版本时最好用最新的版本,因为新版本会修复之前版本的 Bug 并且提供一些新的特性。

RxAndroid 的下载:

implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

// Because RxAndroid releases are few and far between, it is recommended you also

// explicitly depend on RxJava's latest version for bug fixes and new features.

// (see https://github.com/ReactiveX/RxJava/releases for latest 2.x.x version)

implementation 'io.reactivex.rxjava2:rxjava:2.x.x'

1.2 使用

1.2.1 AndroidSchedulers.mainThread()

在 Android 中使用时需要新增一个调度器,用于将指定的操作切换到 Android 的主线程中运行,方便做一些更新 UI 的操作。 RxAndroid 就提供了这样的调度器。

下面例子展示了从网上获取图片然后将图片转换成 Bitmap,最后显示到 ImageView 上的

过程。这里没有使用 Android 的 AsyncTask 或 Handler,而是通过 subscribeOn, observeOn 不断地切换线程来达到目的的。

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

Log.d(TAG, "加载Bitmap的线程名:" + Thread.currentThread().getName());

emitter.onNext(getBitmap());

}

}) // 设置数据加载在子线程中进行

.subscribeOn(Schedulers.io())

// 设置图片在主线程中进行

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Bitmap bitmap) throws Exception {

Log.d(TAG, "Next-> 更新UI的线程名: " + Thread.currentThread().getName());

if (bitmap != null) {

imageView.setImageBitmap(bitmap);

} else {

Log.d(TAG, "Next-> 加载图片失败. 线程名:" + Thread.currentThread().getName());

}

}

});

private Bitmap getBitmap() {

HttpURLConnection connection = null;

try {

URL url = new URL("http://www.designerspics.com/wp-content/uploads/2014/09/grass_shrubs_2_free_photo.jpg");

connection = (HttpURLConnection) url.openConnection();

connection.setConnectTimeout(20000);

connection.connect();

if (connection.getResponseCode() == 200) {

return BitmapFactory.decodeStream(connection.getInputStream());

}

} catch (Exception e) {

Log.d(TAG, "getBitmap: " + e.getMessage());

} finally {

if (connection != null) connection.disconnect();

}

return null;

}

// Log日志输出结果

加载Bitmap的线程名:RxCachedThreadScheduler-1

Next-> 更新UI的线程名: main

1.2.2 AndroidSchedulers.from(Looper looper)

RxAndroid 除了可以切换到主线程,还可以使用任意指定的 Looper

Observable 创建之后先发射图片的 URL,然后再在 map 操作中获取图片的 Bitmap,最后将 Bitmap 显示到 ImageView

final String image_url = "http://www.designerspics.com/wp-content/uploads/2014/09/grass_shrubs_2_free_photo.jpg";

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(ObservableEmitter emitter) throws Exception {

Log.d(TAG, "发射的线程名:" + Thread.currentThread().getName());

emitter.onNext(image_url);

}

}).subscribeOn(AndroidSchedulers.from(Looper.getMainLooper()))

.observeOn(Schedulers.io())

.map(new Function() {

@Override

public Bitmap apply(String s) throws Exception {

Log.d(TAG, "转换成Bitmap的线程名:" + Thread.currentThread().getName());

return getBitmap(s);

}

})

// 设置图片在主线程中进行

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(Bitmap bitmap) throws Exception {

Log.d(TAG, "Next-> 更新UI的线程名: " + Thread.currentThread().getName());

if (bitmap != null) {

imageView.setImageBitmap(bitmap);

} else {

Log.d(TAG, "Next-> 加载图片失败. 线程名:" + Thread.currentThread().getName());

}

}

});

// Log日志输出结果

发射的线程名:main

转换成Bitmap的线程名:RxCachedThreadScheduler-1

Next-> 更新UI的线程名: main

2. Retrofit 简介

Retrofit 是一个在 Android 开发中非常流行的网络框架,底层依赖 OkHttp。 Retrofit 和 OkHttp

都出自 Square 的技术团队。

Retrofit 的 GitHub 地址

https://github.com/square/retrofit

应用程序通过 Retrofit 请求网络,实际上是使用 Retrofit 接口层封装请求参数、 Header、URL 等信息,之后由 OkHttp 完成后续的请求操作,在服务端返回数据之后, OkHttp 将原始的结果交给 Retrofit, Retrofit 再根据用户的需求对结果进行解析的过程。

Retrofit 支持大多数的 Http 方法。

Retrofit 的特点如下:

(1) Retrofit 是可插拔的,允许不同的执行机制及其库用于执行 http 调用。允许 API 请求,与应用程序其余部分中任何现有线程模型或任务框架无缝组合。

Retrofit 为常见的框架提供了适配器 ( Adapter ):

RxJaval.x Observable & Single - com.squareup.retrofit2:adapter-rxjava

RxJava2.x Observable, Flowable, Single, Completable & Maybe - com.squareup.retrofit2:adapter-rxjava2

Guava ListenableFuture - com.squareup.retrofit2:adapter-guava

Java 8 CompletableFuture - com.squareup.retrofit2:adapter-java8

(2) )允许不同的序列化格式及其库,用于将 Java 类型转换为其 http 表示形式,并将 http 实体解析为 Java 类型。

Retrofit 为常见的序列化格式提供了转换器 ( Converter ):

Gson: com.squareup.retrofit2:converter-gson

Jackson: com.squareup.retrofit2:converter-jackson

Moshi: com.squareup.retrofit2:converter-moshi

Protobuf: com.squareup.retrofit2:converter-protobuf

Wire: com.squareup.retrofit2:converter-wire

Simple XML: com.squareup.retrofit2:converter-simplexml

Scalars (primitives, boxed, and String): com.squareup.retrofit2:converter-scalars

开源社区也己经为其他库和序列化格式创建了各种第三方转换器 ( Converter ):

LoganSquare - com.github.aurae.retrofit2:converter-logansquare:1.4.1

FastJson - org.ligboy.retrofit2:converter-fastjson:2.1.0 和 org.ligboy.retrofit2:converter-fastjson-android:2.1.0

OkHttp 的特点如下:

支持 HTTP2/SPDY 黑科技

socket 自动选择最优路线,并支持自动重连。

拥有自动维护的 socket 连接池,减少握手次数

拥有队列线程池,轻松写并发

拥有 Interceptors 轻松处理请求与响应(比如透明 GZIP 压缩、 LOGGING)

基于 Headers 的缓存策略。

3. Retrofit 与 RxJava 的完美配合

Retrofit 是一个网络框架,如果想尝试响应式的编程方式,则可以结合 RxJava 一起使用。Retrofit 对 RxJava1 和 RxJava2 都提供了 Adapter。

案例:将苏州市南门地区的 PM2.5、PM10、SO2 的数据展示到 App 上。在 http://pm25.in/上可以找到获取..., pm25.in 是一个公益性的网站,免费提供空气质量数据。在调用这些接口之前,

需要去该网站注册,并申请一个 AppKey

Retrofit 使用步骤如下:

添加 Retrofit 依赖

在 App 的 build.gradle 中添加所需要的 Retrofit 库,以及 RxJava2 的 adapter 库。

implementation 'com.squareup.retrofit2:retrofit:2.7.1'

implementation 'com.squareup.retrofit2:adapter-rxjava2:2.7.1'

implementation 'org.ligboy.retrofit2:converter-fastjson-android:2.1.0'

implementation "com.squareup.okhttp3:logging-interceptor:4.3.1"

创建 RetrofitManager

一般需要创建 Retrofit 管理类,在这里创建一个名为 RetrofitManager 类,方便在

整个 App 中使用。

RetrofitManager 代码如下:

public class RetrofitManager {

private static Retrofit retrofit;

public static Retrofit retrofit() {

if (retrofit == null) {

HttpLoggingInterceptor loggingInterceptor = new HttpLoggingInterceptor()

.setLevel(HttpLoggingInterceptor.Level.BASIC);

OkHttpClient okHttpClient = new OkHttpClient.Builder()

.writeTimeout(30_1000, TimeUnit.MILLISECONDS)

.readTimeout(20_1000, TimeUnit.MILLISECONDS)

.connectTimeout(15_1000, TimeUnit.MILLISECONDS)

.addInterceptor(loggingInterceptor)

.build();

retrofit = new Retrofit.Builder()

.baseUrl(APIService.API_BASE_SERVER_URL)

.addConverterFactory(FastJsonConverterFactory.create())

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())

.client(okHttpClient)

.build();

}

return retrofit;

}

}

创建 APIService

接下来,我们需要定义网络请求的接口。 pm25.in 提供了多个获取空气质量数据的接口,这里选取其中 3 个接口,分别是获取一个城市所有监测点的PM2.5数据、获取一个城市所有监测点的PM10数据、获取一个城市所有监测点的 SO2 数据接口。

public interface APIService {

String API_BASE_SERVER_URL = "http://www.pm25.in/";

@GET("api/querys/pm2_5.json")

Maybe> pm25(@Query("city") String city, @Query("token") String token);

@GET("api/querys/pm10.json")

Maybe> pm10(@Query("city") String city, @Query("token") String token);

@GET("api/querys/so2.json")

Maybe> so2(@Query("city") String city, @Query("token") String token);

}

在 APIService 中,每个方法返回的类型都是 Maybe 类型,其实也可以返回 Observable、Flowable 等类型。

Retrofit 的使用

下面的代码分别调用了 3 个接口,井过滤出了南门地区的相关数据。

APIService apiService = RetrofitManager.retrofit().create(APIService.class);

apiService.pm25(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List pm25Models) throws Exception {

return pm25Models != null && !pm25Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List pm25Models) throws Exception {

for (PM25Model pm25Model : pm25Models) {

if ("南门".equals(pm25Model.position_name)) {

return Maybe.just(pm25Model);

}

}

return null;

}

})

.subscribe(new Consumer() {

@Override

public void accept(PM25Model pm25Model) throws Exception {

Log.d(TAG, "PM25.Success-> " + pm25Model);

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "PM25.Error-> " + throwable);

}

});

apiService.pm10(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List pm10Models) throws Exception {

return pm10Models != null && !pm10Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List pm10Models) throws Exception {

for (PM10Model pm10Model : pm10Models) {

if ("南门".equals(pm10Model.position_name)) {

return Maybe.just(pm10Model);

}

}

return null;

}

})

.subscribe(new Consumer() {

@Override

public void accept(PM10Model pm10Model) throws Exception {

Log.d(TAG, "PM10.Success-> " + pm10Model);

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "PM10.Error-> " + throwable);

}

});

apiService.so2(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List so2Models) throws Exception {

return so2Models != null && !so2Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List so2Models) throws Exception {

for (SO2Model so2Model : so2Models) {

if ("南门".equals(so2Model.position_name)) {

return Maybe.just(so2Model);

}

}

return null;

}

})

.subscribe(new Consumer() {

@Override

public void accept(SO2Model so2Model) throws Exception {

Log.d(TAG, "SO2.Error-> " + so2Model);

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "SO2.Error-> " + throwable);

}

});

这里还使用了 maybeToMain() 方法,它的代码如下:

@JvmStatic

fun maybeToMain(): MaybeTransformer {

return MaybeTransformer { upstream ->

upstream.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

}

}

它用于切换线程,返回 MaybeTransformer 对象。因为 apiService 中每个返回的方法都是

Maybe 类型,所以这里会用到 MaybeTransformer 。使用了 maybeToMain() 后 ,除网络请求是在

io() 线程中运行外,其余的操作都是在主线程中运行的.

也可以让 filter、 flatMap 操作也在 io() 线程中运行,展示数据时才切换回主线程。

接下来列举一些 Retrofit 其余常见的使用场景。

(1) 合并多个网络请求

如:需要在某一个信息流列表中插入多条广告,每一条广告都需要做一次网络请求。这时就可以考虑使用 zip 操作符,将请求信息流,以及请求的多个广告的请求合并起来,等所有请求完成之后,再用合并函数将广告插到信息流固定的位置上,最后以列表的形式呈现给用户。

APIService apiService = RetrofitManager.retrofit().create(APIService.class);

Maybe pm25ModelMaybe = apiService.pm25(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List pm25Models) throws Exception {

return pm25Models != null && !pm25Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List pm25Models) throws Exception {

for (PM25Model pm25Model : pm25Models) {

if ("南门".equals(pm25Model.position_name)) {

return Maybe.just(pm25Model);

}

}

return null;

}

});

Maybe pm10ModelMaybe = apiService.pm10(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List pm10Models) throws Exception {

return pm10Models != null && !pm10Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List pm10Models) throws Exception {

for (PM10Model pm10Model : pm10Models) {

if ("南门".equals(pm10Model.position_name)) {

return Maybe.just(pm10Model);

}

}

return null;

}

});

Maybe so2ModelMaybe = apiService.so2(Constant.CITY, Constant.TOKEN)

.compose(RxJavaUtils.>maybeToMain())

.filter(new Predicate>() {

@Override

public boolean test(List so2Models) throws Exception {

return so2Models != null && !so2Models.isEmpty();

}

})

.flatMap(new Function, MaybeSource>() {

@Override

public MaybeSource apply(List so2Models) throws Exception {

for (SO2Model so2Model : so2Models) {

if ("南门".equals(so2Model.position_name)) {

return Maybe.just(so2Model);

}

}

return null;

}

});

Maybe.zip(pm25ModelMaybe, pm10ModelMaybe, so2ModelMaybe, new Function3() {

@Override

public ZipObject apply(PM25Model pm25Model, PM10Model pm10Model, SO2Model so2Model) throws Exception {

Log.d(TAG, "zip-> \r\n" + pm25Model + "\r\n" + pm10Model + "\r\n" + so2Model);

ZipObject zipObject = new ZipObject();

zipObject.pm2_5 = pm25Model.pm2_5;

zipObject.pm2_5_24h = pm25Model.pm2_5_24h;

zipObject.pm2_5_quality = pm25Model.quality;

zipObject.pm10 = pm10Model.pm10;

zipObject.pm10_24h = pm10Model.pm10_24h;

zipObject.so2 = so2Model.so2;

zipObject.so2_24h = so2Model.so2_24h;

return zipObject;

}

}).subscribe(new Consumer() {

@Override

public void accept(ZipObject zipObject) throws Exception {

Log.d(TAG, "Success-> " + zipObject);

}

}, new Consumer() {

@Override

public void accept(Throwable throwable) throws Exception {

Log.d(TAG, "Error-> " + throwable);

}

});

(2) 返回默认值

有时, 网络请求失败可以使用 onErrorReturn 操作符, 一个空的对象作为默认值。

(3) 多个网络请求嵌套使用

若是 A 请求完成之后,才能去调用 B 请求,则可以考虑使用 flatMap 操作符。

(4) 重试机制

对于一些重要的接口,需要采用重试机制。因为有些时候用户的网络环境比较差,第一次请求接口超时了,那么再一次请求可能就会成功。虽然有一定的延时,但至少返回了数据,保证了用户体验。

apiService.loadContent(params)

.retryWhen(new RetryWithDelay(3, 1000));

在这里 retryWhen 操作符与 RetryWithDelay 一起搭配使用,表示有 3 次重试机会,每次的延迟时间是 1000ms。 RetryWithDelay 是一个工具类,使用kotlin语言编写。

class RetryWithDelay(

private val maxRetries: Int,

private val retryDelayMillis: Int

) : Function, Publisher> {

private var retryCount: Int = 0

init {

this.retryCount = 0

}

override fun apply(attempts: Flowable): Publisher {

return attempts.flatMap { throwable ->

if (++retryCount <= maxRetries) {

Flowable.timer(retryDelayMillis.toLong(), TimeUnit.MILLISECONDS)

} else {

Flowable.error(throwable)

}

}

}

}

如果我的文章对您有帮助,不妨点个赞鼓励一下(^_^)

 类似资料: