Android - RxJava2 极简入门

上官彬
2023-12-01

定义

RxJava 提供一套异步编程的 API,这套 API 是基于观察者模式的,而且是链式调用的,所以使用 RxJava 编写的代码的逻辑会非常简洁。

相关基础

观察者模式

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。应用场景示例:微信公众号的订阅功能。

观察者模式简单实现

参考: 设计模式-通过微信公众号示例讲解观察者模式

使用场景

引入依赖

implementation 'io.reactivex.rxjava2:rxjava:2.2.10'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

1、Scheduler线程切换

这种场景经常会在后台线程取数据,主线程展示取回来的数据

Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe{
        Log.e(TAG, it.toString())
    }

2、使用debounce做textSearch

常见的使用场景是联想搜索。 使用debounce减少频繁的网络请求。避免每输入(删除)一个字就做一次联想

// 
val subscribe4 = RxUtil.textChanges(et_meter_address)
        .debounce(1, TimeUnit.SECONDS)//1s 后才发送给观察者
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-------模糊搜索---------")
            Log.e(TAG, it.toString())
        }
disposables.add(subscribe4)

3、Retrofit 结合 RxJava 做网络请求框架

例子:FrameDemo

4、RxJava 代替 EventBus 进行数据传递:RxBus

Android-使用 RxJava2 封装 RxBus

5、使用combineLatest合并最近N个结点

例如:注册的时候所有输入信息(邮箱、密码、电话号码等)合法才点亮注册按钮。

// 按钮设置默认不可用
btn_commit.isEnabled = false

// 监听 editText 变化,这里 textChanges 不能返回 Any
val et1 = RxUtil.textChanges(et_meter_address)
val et2 = RxUtil.textChanges(et_commod)

val subscribe7 = Observable.combineLatest(et1, et2, BiFunction<CharSequence?, CharSequence?, Boolean?> { t1, t2 ->
    val addressValid = t1.isNotEmpty()
    val commondValid = t2.isNotEmpty()
    Log.e(TAG, addressValid.toString())
    Log.e(TAG, commondValid.toString())

    // 传递检验结果
    addressValid && commondValid
}).subscribe {
	// 设置按钮的可用状态
    btn_commit.isEnabled = it!!
}
disposables.add(subscribe7)

6、使用merge, concat 合并两个数据源。

例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。

// 4 merge合并数据
val serverData = Observable.create<String> {
    it.onNext("hello")
}
val localData = Observable.create<String>{
    it.onNext("word")
}
val subscribe3 = Observable.merge(serverData, localData)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------merge合并后的数据-------------------")
            Log.e(TAG, it)
        }
disposables.add(subscribe3)

// 5 concat 合并数据
val serverData1 = Observable.create<String> {
    it.onNext("hello")
    // 需要发送 onComplete,否则只收到第一个数据(测试的时候发现,具体原因还未了解)
    it.onComplete()
}
val localData1 = Observable.create<String>{
    it.onNext("word")
    // 需要发送 onComplete,否则只收到第一个数据(测试的时候发现,具体原因还未了解)
    it.onComplete()
}
val subscribe2 = Observable.concat(serverData1, localData1)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------concat 合并后的数据-------------------")
            Log.e(TAG, it)
        }
disposables.add(subscribe2)

7、使用concat和first做缓存

依次检查memory、disk和network中是否存在数据,任何一步一旦发现数据后面的操作都不执行。

val memoryData = Observable.create<String>{ it.onComplete() }

val diskData = Observable.create<String>{
    it.onNext("2")
    it.onComplete()
}

val netWordData = Observable.create<String>{
    it.onNext("3")
    it.onComplete()
}

val subscribe1 = Observable.concat(memoryData, diskData, netWordData)
        .firstElement()
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------concat firstElement 任何一个被观察者有数据则后面的操作都不执行-------------------")
            Log.e(TAG, it)
        }
disposables.add(subscribe1)

8、使用timer做定时操作。当有“x秒后执行y操作”类似的需求的时候,想到使用timer

例如:2秒后输出日志“hello world”,然后结束。

val subscribe = Observable.timer(2, TimeUnit.SECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------每隔2秒后输出 hello-------------------")
            Log.e(TAG, "hello")

        }
disposables.add(subscribe)

9、使用 interval 做周期性操作。当有“每隔xx秒后执行yy操作”类似的需求的时候,想到使用 interval

intervalRange 可以指定执行次数,间隔时间

val subscribe = Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------每隔1秒输出 hello,5秒后结束-------------------")
            Log.e(TAG, "hello")

        }
disposables.add(subscribe)

10、使用throttleFirst防止按钮重复点击

ps:debounce也能达到同样的效果

val subscribe6 = RxUtil.clickView(btn_double_click)
        .throttleFirst(1, TimeUnit.SECONDS)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "连续点击,控制间隔一秒处理一次结果")
        }
disposables.add(subscribe6)

11、RxJava进行数组、list的遍历

val list = mutableListOf<Int>(1,2,3,4,5,6)
val subscribe5 = Observable.fromIterable(list)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe {
            Log.e(TAG, "-----------遍历 for 集合-------------------")
            Log.e(TAG, it.toString())
        }
disposables.add(subscribe5)

12、解决嵌套回调(callback hell)问题

有个 personList 集合,对象里面有 phoneList 集合 集合里面是各个手机。
输出所有人的每个手机信息

  1. 双层for嵌套输出

  2. 使用 flatMap


data class Person(val name:String, val phoneList:MutableList<Phone>)
data class Phone(val name: String,val price:String)


val persons = mutableListOf<Person>()
val phones1 = mutableListOf<Phone>()
val phones2 = mutableListOf<Phone>()
val phones3 = mutableListOf<Phone>()
persons.add(Person("张三1",phones1))
persons.add(Person("张三2",phones2))
persons.add(Person("张三3",phones3))

phones1.add(Phone("小米1","3000"))
phones2.add(Phone("小米2","3000"))
phones3.add(Phone("小米3","3000"))

val subscribe8 = Observable.fromIterable(persons)
        .flatMap {
            Observable.fromIterable(it.phoneList)
        }.subscribe {
            Log.e(TAG, "---------- flatMap 输出 phone-------------------")
            Log.e(TAG, it.toString())
        }
// 用于在 onDestory 移除
disposables.add(subscribe8)

RxUtil 处理按钮重复点击、EditText监听问题

import android.text.Editable
import android.text.TextWatcher
import android.view.View;
import android.widget.EditText
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.ObservableOnSubscribe


object RxUtil{

    // 防抖事件
    fun clickView(view: View?): Observable<Any> {
        return Observable.create(ViewClickOnSubscribe(view!!))
    }

    // 联想搜索
    fun textChanges(view: EditText?): Observable<String> {
        return Observable.create(ViewChangeOnSubscribe(view!!))
    }

    private class ViewChangeOnSubscribe(val view: EditText) : ObservableOnSubscribe<String> {

        override fun subscribe(emitter: ObservableEmitter<String>) {
            val textChange = object :TextWatcher{
                override fun afterTextChanged(s: Editable?) {}

                override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}

                override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
                    if (!emitter.isDisposed) {
                        //发送消息
                        emitter.onNext(s.toString())
                    }
                }
            }

            view.addTextChangedListener(textChange)
        }

    }

    private class ViewClickOnSubscribe(val view: View) : ObservableOnSubscribe<Any> {

        override fun subscribe(emitter: ObservableEmitter<Any>) {
            val onClickListener = View.OnClickListener {
                //订阅没取消
                if (!emitter.isDisposed) {
                    //发送消息
                    emitter.onNext(1)
                }
            }
            view.setOnClickListener(onClickListener)
        }

    }
}

总结

熟悉观察者模式

熟悉 RxJava 不同操作符对应的使用场景

  • 结合 Retrofit 实现网络请求功能
  • 事件总线功能
  • 点击防抖 throttleFirst
  • 联想搜索 debounce
  • 集合遍历 from
  • 数据合并 merge concat
  • 解决嵌套 flapMap
  • 定时操作 timer
  • 周期操作 interval
  • 表单校验 combineLatest
  • 线程切换 Scheduler

参考

 类似资料: