a library for composing asynchronous and event-based programs using observable sequences for the Java VM
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
随着程序逻辑变得越来越复杂,它依然能够保持简洁
<1 MB jar 包
支持 Java 8 lambada 公式
支持 Java 6+ 且 Android 2.3+
同时支持异步和同步
观察者模式解决的问题:A 对象(观察者)对 B 对象(被观察者)的某种变化高度敏感,需要在 B 变化的一瞬间做出反应。
定义:观察者采用订阅的方式,获取被观察者发出的自身状态改变的通知,从而作出反应的这样一种软件设计模式。
四个基本概念:观察者、被观察者、订阅动作、事件
Observable:(被观察者)发射源
Subscriber:(观察者)订阅者
Observer:(观察者)观察者
ActionX:X 的范围为 1~9 之间,它是 Rxjava 中的一个接口,有且只有一个 无参 方法 call() ,无返回值
FuncX:X 的范围为 1~9 之间,它是 Rxjava 中的一个接口,有且只有一个 含参 方法 call() ,有返回值
Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
注意:在一个正确运行的事件序列中,* onCompleted() 和 onError() 有且只有一个*(二者互斥,即在队列中只能调用其中一个),并且是事件序列中的最后一个。
Rxjava 中的订阅关系为:被观察者订阅观察者,注意不要写反了
中文手册:https://mcxiaoke.gitbooks.io/rxdocs/content/
详细教程:http://gank.io/post/560e15be2dca930e00da1083
github 源码:https://github.com/lzyzsd/Awesome-RxJava
Rejava 结合 Retrofit:http://gank.io/post/56e80c2c677659311bed9841
可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制,介绍如下:
subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
注意: observeOn() 可以多次调用,实现程序线程的多次切换,但是 subscribeOn() 只能被调用一次
所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。Rxjava 的变换操作指的是 Rxjava 通过 Map、flatMap 等这些操作符就能实现不同对象之间的转变。
Rxjava 可以跟很多框架结合起来用,例如 Retrofit、EventBus 等(RxBinding 算是融合过的)
implementation "io.reactivex.rxjava2:rxjava:2.1.16"
package com.demo.thorn.rxjavademo;
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "JIN";
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//创建一个上游 Observable:发射源
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
//创建一个下游 Observer 接收源
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
//建立发射远和接受源之间的连接
observable.subscribe(observer);
}
}