RxJava
提供一套异步编程的 API
,这套 API
是基于观察者模式的,而且是链式调用的,所以使用 RxJava
编写的代码的逻辑会非常简洁。
当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知依赖它的对象。观察者模式属于行为型模式。应用场景示例:微信公众号的订阅功能。
implementation 'io.reactivex.rxjava2:rxjava:2.2.10'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
这种场景经常会在后台线程取数据,主线程展示取回来的数据
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe{
Log.e(TAG, it.toString())
}
常见的使用场景是联想搜索。 使用
debounce
减少频繁的网络请求。避免每输入(删除)一个字就做一次联想
//
val subscribe4 = RxUtil.textChanges(et_meter_address)
.debounce(1, TimeUnit.SECONDS)//1s 后才发送给观察者
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-------模糊搜索---------")
Log.e(TAG, it.toString())
}
disposables.add(subscribe4)
例子:FrameDemo
例如:注册的时候所有输入信息(邮箱、密码、电话号码等)合法才点亮注册按钮。
// 按钮设置默认不可用
btn_commit.isEnabled = false
// 监听 editText 变化,这里 textChanges 不能返回 Any
val et1 = RxUtil.textChanges(et_meter_address)
val et2 = RxUtil.textChanges(et_commod)
val subscribe7 = Observable.combineLatest(et1, et2, BiFunction<CharSequence?, CharSequence?, Boolean?> { t1, t2 ->
val addressValid = t1.isNotEmpty()
val commondValid = t2.isNotEmpty()
Log.e(TAG, addressValid.toString())
Log.e(TAG, commondValid.toString())
// 传递检验结果
addressValid && commondValid
}).subscribe {
// 设置按钮的可用状态
btn_commit.isEnabled = it!!
}
disposables.add(subscribe7)
例如一组数据来自网络,一组数据来自文件,需要合并两组数据一起展示。
// 4 merge合并数据
val serverData = Observable.create<String> {
it.onNext("hello")
}
val localData = Observable.create<String>{
it.onNext("word")
}
val subscribe3 = Observable.merge(serverData, localData)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------merge合并后的数据-------------------")
Log.e(TAG, it)
}
disposables.add(subscribe3)
// 5 concat 合并数据
val serverData1 = Observable.create<String> {
it.onNext("hello")
// 需要发送 onComplete,否则只收到第一个数据(测试的时候发现,具体原因还未了解)
it.onComplete()
}
val localData1 = Observable.create<String>{
it.onNext("word")
// 需要发送 onComplete,否则只收到第一个数据(测试的时候发现,具体原因还未了解)
it.onComplete()
}
val subscribe2 = Observable.concat(serverData1, localData1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------concat 合并后的数据-------------------")
Log.e(TAG, it)
}
disposables.add(subscribe2)
依次检查memory、disk和network中是否存在数据,任何一步一旦发现数据后面的操作都不执行。
val memoryData = Observable.create<String>{ it.onComplete() }
val diskData = Observable.create<String>{
it.onNext("2")
it.onComplete()
}
val netWordData = Observable.create<String>{
it.onNext("3")
it.onComplete()
}
val subscribe1 = Observable.concat(memoryData, diskData, netWordData)
.firstElement()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------concat firstElement 任何一个被观察者有数据则后面的操作都不执行-------------------")
Log.e(TAG, it)
}
disposables.add(subscribe1)
例如:2秒后输出日志“hello world”,然后结束。
val subscribe = Observable.timer(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------每隔2秒后输出 hello-------------------")
Log.e(TAG, "hello")
}
disposables.add(subscribe)
intervalRange 可以指定执行次数,间隔时间
val subscribe = Observable.intervalRange(0,5,1,1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------每隔1秒输出 hello,5秒后结束-------------------")
Log.e(TAG, "hello")
}
disposables.add(subscribe)
ps:debounce也能达到同样的效果
val subscribe6 = RxUtil.clickView(btn_double_click)
.throttleFirst(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "连续点击,控制间隔一秒处理一次结果")
}
disposables.add(subscribe6)
val list = mutableListOf<Int>(1,2,3,4,5,6)
val subscribe5 = Observable.fromIterable(list)
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
Log.e(TAG, "-----------遍历 for 集合-------------------")
Log.e(TAG, it.toString())
}
disposables.add(subscribe5)
有个 personList 集合,对象里面有 phoneList 集合 集合里面是各个手机。
输出所有人的每个手机信息
双层for嵌套输出
使用 flatMap
data class Person(val name:String, val phoneList:MutableList<Phone>)
data class Phone(val name: String,val price:String)
val persons = mutableListOf<Person>()
val phones1 = mutableListOf<Phone>()
val phones2 = mutableListOf<Phone>()
val phones3 = mutableListOf<Phone>()
persons.add(Person("张三1",phones1))
persons.add(Person("张三2",phones2))
persons.add(Person("张三3",phones3))
phones1.add(Phone("小米1","3000"))
phones2.add(Phone("小米2","3000"))
phones3.add(Phone("小米3","3000"))
val subscribe8 = Observable.fromIterable(persons)
.flatMap {
Observable.fromIterable(it.phoneList)
}.subscribe {
Log.e(TAG, "---------- flatMap 输出 phone-------------------")
Log.e(TAG, it.toString())
}
// 用于在 onDestory 移除
disposables.add(subscribe8)
import android.text.Editable
import android.text.TextWatcher
import android.view.View;
import android.widget.EditText
import io.reactivex.Observable
import io.reactivex.ObservableEmitter
import io.reactivex.ObservableOnSubscribe
object RxUtil{
// 防抖事件
fun clickView(view: View?): Observable<Any> {
return Observable.create(ViewClickOnSubscribe(view!!))
}
// 联想搜索
fun textChanges(view: EditText?): Observable<String> {
return Observable.create(ViewChangeOnSubscribe(view!!))
}
private class ViewChangeOnSubscribe(val view: EditText) : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
val textChange = object :TextWatcher{
override fun afterTextChanged(s: Editable?) {}
override fun beforeTextChanged(s: CharSequence?, start: Int, count: Int, after: Int) {}
override fun onTextChanged(s: CharSequence?, start: Int, before: Int, count: Int) {
if (!emitter.isDisposed) {
//发送消息
emitter.onNext(s.toString())
}
}
}
view.addTextChangedListener(textChange)
}
}
private class ViewClickOnSubscribe(val view: View) : ObservableOnSubscribe<Any> {
override fun subscribe(emitter: ObservableEmitter<Any>) {
val onClickListener = View.OnClickListener {
//订阅没取消
if (!emitter.isDisposed) {
//发送消息
emitter.onNext(1)
}
}
view.setOnClickListener(onClickListener)
}
}
}