RxJava系列
https://blog.csdn.net/weixin_37730482/category_10501619.html
RxJava详解(基于1.X版本)
https://blog.csdn.net/weixin_37730482/article/details/68059274
RxBus不是一个库,而是一个文件。是基于RxJava实现的。
在RxJava中有个Subject类,它继承Observable类,同时实现了Observer接口,因此Subject可以同时担当订阅者和被订阅者的角色。
public abstract class Subject<T> extends Observable<T> implements Observer<T> {}
Subject有四个实现类
<1> PublishSubject类:先订阅后收到消息 一般使用该类创建Subject。因为符合一般的观察者流程(只有注册的观察者才能收到被观察者发送的消息)。
public final class PublishSubject<T> extends Subject<T, T> {}
<2> BehaviorSubject类:发送离订阅最近的上一个值,没有上一个值的时候会发送默认值。即可以缓存一次被观察者发送的数据。
public final class BehaviorSubject<T> extends Subject<T, T> {}
<3> ReplaySubject类:无论何时订阅,都会将所有历史订阅内容全部发出。
ublic final class ReplaySubject<T> extends Subject<T, T> {}
<4> AsyncSubject类:很少使用。
public final class AsyncSubject<T> extends Subject<T, T> {}
我们使用Subject的子类PublishSubject来创建一个Subject对象(PublishSubject只有被订阅后才会把接收到的事件立刻发送给订阅者)。在需要接收事件的地方,订阅该Subject对象,之后如果Subject对象接收到事件,则会发射给该订阅者,此时Subject对象充当被订阅者的角色。
完成了订阅,在需要发送事件的地方将事件发送给之前被订阅的Subject对象,则此时Subject对象作为订阅者接收事件,然后会立刻将事件转发给订阅该Subject对象的订阅者,以便订阅者处理相应事件,到这里就完成了事件的发送与处理。
最后就是取消订阅的操作了,RxJava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。
构造方法
(1) PublishSubject类创建Subject类对象
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
public static <T> PublishSubject<T> create() {
return new PublishSubject<T>(new PublishSubjectState<T>());
}
每次都新建。所以不会缓存。只有注册过才会收到消息。
由于 Subject类是非线程安全的,所以我们通过它的 子类SerializedSubject 将 PublishSubject 转换成一个线程安全的Subject对象。之后可通过单例方法 getInstance() 进行 RxBus 的初始化。
(2) BehaviorSubject类创建Subject类对象
private RxBusBehaviorSubject() {
bus = new SerializedSubject<>(BehaviorSubject.create());
}
public static <T> BehaviorSubject<T> create() {
return create(null, false);
}
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
if (hasDefault) {
state.setLatest(NotificationLite.next(defaultValue));
}
state.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
o.emitFirst(state.getLatest());
}
};
state.onTerminated = state.onAdded;
return new BehaviorSubject<T>(state, state);
}
设置最后一次。所以会缓存最后一次发送的消息。
(3) ReplaySubject类创建Subject类对象
private RxBusReplaySubject() {
bus = new SerializedSubject<>(ReplaySubject.create());
}
public static <T> ReplaySubject<T> create() {
return create(16);
}
public static <T> ReplaySubject<T> create(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("capacity > 0 required but it was " + capacity);
}
ReplayBuffer<T> buffer = new ReplayUnboundedBuffer<T>(capacity);
ReplayState<T> state = new ReplayState<T>(buffer);
return new ReplaySubject<T>(state);
}
也就是说ReplaySubject类缓存全部的消息。构造方法可以传入缓存的个数,默认16个。
<1> Gradle依赖 RxJava&RxAndroid 版本须一致 即都是1.x版本
//RxJava
implementation 'io.reactivex:rxjava:1.3.2'
//RxAndroid
implementation 'io.reactivex:rxandroid:1.2.1'
<2> RxBus
package rxbus;
import android.text.TextUtils;
import android.util.Log;
import java.util.HashMap;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
public class RxBus {
private final Subject<Object, Object> bus;
private HashMap<String, CompositeSubscription> hashMap;
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
private static class SingletonHolder {
private static final RxBus defaultRxBus = new RxBus();
}
public static RxBus getInstance() {
return SingletonHolder.defaultRxBus;
}
/*
* 发送消息
*/
public void post(Object o) {
bus.onNext(o);
}
/*
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
/**
* 订阅
*/
public <T> Subscription doSubscription(Class<T> type, Scheduler scheduler, Action1<T> next, Action1<Throwable> error) {
if (null == type || null == scheduler || null == next || null == error) {
return null;
}
return toObservable(type)
.observeOn(scheduler)
.subscribe(next, error);
}
/**
* 保存Subscription
*/
public void saveSubscription(Object o, Subscription subscription) {
if (null == o || null == subscription) {
return;
}
if (hashMap == null) {
hashMap = new HashMap<>();
}
String key = o.getClass().getName();
if (!TextUtils.isEmpty(key)) {
if (hashMap.get(key) != null) {
hashMap.get(key).add(subscription);
} else {
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(subscription);
hashMap.put(key, compositeSubscription);
}
}
}
/**
* 取消订阅
*/
public void unSubscribe(Object o) {
if (null == hashMap || null == o) {
return;
}
String key = o.getClass().getName();
Log.d("TAG", "取消订阅 key----:" + key);
if (TextUtils.isEmpty(key)) {
return;
}
if (!hashMap.containsKey(key)) {
return;
}
CompositeSubscription compositeSubscription = hashMap.get(key);
if (compositeSubscription != null) {
compositeSubscription.unsubscribe();
}
Log.d("TAG", "取消订阅remove key----:" + key);
hashMap.remove(key);
}
}
<3> 接收消息页面 子线程接收
package rxbus;
import android.content.Intent;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import com.wjn.lubandemo.R;
import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
/**
* 模拟子线程接收RxBus消息
*/
public class RxBusActivity extends AppCompatActivity {
private TextView mATextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus);
findView();
getMsgWithRxBus();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus_atextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Intent intent = new Intent(RxBusActivity.this, RxBus1Activity.class);
startActivity(intent);
}
});
}
/**
* 接收RxBus消息
*/
private void getMsgWithRxBus() {
Subscription subscription = RxBus.getInstance()
.doSubscription(RxBusBean.class,
Schedulers.newThread(),
new Action1<RxBusBean>() {
@Override
public void call(RxBusBean rxBusBean) {
if (null == rxBusBean) {
return;
}
String id = rxBusBean.getId();
String msg = rxBusBean.getMsg();
String msgType = rxBusBean.getMsgType();
Log.d("TAG", "RxBusActivity接收 内容id:" + id + " msg:" + msg + " msgType:" + msgType);
Log.d("TAG", "RxBusActivity页面接收 线程----:" + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
RxBus.getInstance().saveSubscription(this, subscription);
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getInstance().unSubscribe(this);
}
}
<4> 接收消息页面 UI线程接收
package rxbus;
import android.content.Intent;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import com.wjn.lubandemo.R;
import rx.Subscription;
import rx.android.schedulers.AndroidSchedulers;
import rx.functions.Action1;
/**
* 模拟UI线程接收RxBus消息
*/
public class RxBus1Activity extends AppCompatActivity {
private TextView mATextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus1);
findView();
getMsgWithRxBus();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus1_atextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Intent intent = new Intent(RxBus1Activity.this, RxBus2Activity.class);
startActivity(intent);
}
});
}
/**
* 接收RxBus消息
*/
private void getMsgWithRxBus() {
Subscription subscription = RxBus.getInstance()
.doSubscription(RxBusBean.class,
AndroidSchedulers.mainThread(),
new Action1<RxBusBean>() {
@Override
public void call(RxBusBean rxBusBean) {
if (null == rxBusBean) {
return;
}
String id = rxBusBean.getId();
String msg = rxBusBean.getMsg();
String msgType = rxBusBean.getMsgType();
Log.d("TAG", "RxBus1Activity接收 内容id:" + id + " msg:" + msg + " msgType:" + msgType);
Log.d("TAG", "RxBus1Activity页面接收 线程----:" + Thread.currentThread().getName());
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
});
RxBus.getInstance().saveSubscription(this, subscription);
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getInstance().unSubscribe(this);
}
}
<5> 发送消息页面
package rxbus;
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.view.View;
import android.widget.TextView;
import com.wjn.lubandemo.R;
public class RxBus2Activity extends AppCompatActivity {
private TextView mATextView;
private TextView mBTextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus2);
findView();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus2_atextview);
mBTextView = findViewById(R.id.activity_rxbus2_btextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("123");
rxBusBean.setMsg("RxBus发送的UI线程消息");
rxBusBean.setMsgType("RxBusTypeMainThread");
RxBus.getInstance().post(rxBusBean);
}
});
mBTextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
new Thread(new Runnable() {
@Override
public void run() {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("456");
rxBusBean.setMsg("RxBus发送的子线程消息");
rxBusBean.setMsgType("RxBusTypeNewThread");
RxBus.getInstance().post(rxBusBean);
}
}).start();
}
});
}
}
<6> 结果
UI线程发送
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息 msgType:RxBusTypeMainThread
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息 msgType:RxBusTypeMainThread
D/TAG: RxBus1Activity页面接收 线程----:main
子线程发送
D/TAG: RxBusActivity接收 内容id:456 msg:RxBus发送的子线程消息 msgType:RxBusTypeNewThread
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:456 msg:RxBus发送的子线程消息 msgType:RxBusTypeNewThread
D/TAG: RxBus1Activity页面接收 线程----:main
由于EventBus具有粘性事件。所以如果想使用RxBus完全替换EventBus。那么RxBus也要具有粘性事件。其实粘性事件项目中很少用到的。因为一般的观察者模式。都是希望注册后才收到信息。这样才是最新最有用的信息。
EventBus的粘性事件
https://blog.csdn.net/weixin_37730482/article/details/71973907
上述可知可以使用
BehaviorSubject类:缓存最近的一次消息。ReplaySubject类:缓存所有的消息。
代码实现
<1> BehaviorSubject类:缓存最近的一次消息
[1] RxBusBehaviorSubject
package rxbus;
import android.text.TextUtils;
import android.util.Log;
import java.util.HashMap;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.BehaviorSubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
public class RxBusBehaviorSubject {
private final Subject<Object, Object> bus;
private HashMap<String, CompositeSubscription> hashMap;
private RxBusBehaviorSubject() {
bus = new SerializedSubject<>(BehaviorSubject.create());
}
private static class SingletonHolder {
private static final RxBusBehaviorSubject defaultRxBus = new RxBusBehaviorSubject();
}
public static RxBusBehaviorSubject getInstance() {
return SingletonHolder.defaultRxBus;
}
/*
* 发送消息
*/
public void post(Object o) {
bus.onNext(o);
}
/*
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
/**
* 订阅
*/
public <T> Subscription doSubscription(Class<T> type, Scheduler scheduler, Action1<T> next, Action1<Throwable> error) {
if (null == type || null == scheduler || null == next || null == error) {
return null;
}
return toObservable(type)
.observeOn(scheduler)
.subscribe(next, error);
}
/**
* 保存Subscription
*/
public void saveSubscription(Object o, Subscription subscription) {
if (null == o || null == subscription) {
return;
}
if (hashMap == null) {
hashMap = new HashMap<>();
}
String key = o.getClass().getName();
if (!TextUtils.isEmpty(key)) {
if (hashMap.get(key) != null) {
hashMap.get(key).add(subscription);
} else {
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(subscription);
hashMap.put(key, compositeSubscription);
}
}
}
/**
* 取消订阅
*/
public void unSubscribe(Object o) {
if (null == hashMap || null == o) {
return;
}
String key = o.getClass().getName();
Log.d("TAG", "取消订阅 key----:" + key);
if (TextUtils.isEmpty(key)) {
return;
}
if (!hashMap.containsKey(key)) {
return;
}
CompositeSubscription compositeSubscription = hashMap.get(key);
if (compositeSubscription != null) {
compositeSubscription.unsubscribe();
}
Log.d("TAG", "取消订阅remove key----:" + key);
hashMap.remove(key);
}
}
即 修改RxBus类
private RxBusBehaviorSubject() {
bus = new SerializedSubject<>(BehaviorSubject.create());
}
即:使用BehaviorSubject类创建Subject对象。其他代码不变。
[2] 两个接收页面 RxBus类替换成RxBusBehaviorSubject类。其他代码不变。
[3] 发送页面 为了测试修改了发送内容 Int类型数++。
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("123");
rxBusBean.setMsg("RxBus发送的UI线程消息mCount:" + mCount);
rxBusBean.setMsgType("RxBusTypeMainThread:" + mCount);
RxBusBehaviorSubject.getInstance().post(rxBusBean);
mCount++;
}
});
[4] 结果
(1) 先注册 再发送 为了测试是否记录最后一次 发了3次。
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBus1Activity页面接收 线程----:main
即 先注册 后发送 发送3次 接收3次 没有问题。
(2) (1)操作完成后 重新进入两个接收页面。
重新进入页面1
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
重新进入页面2
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBus1Activity页面接收 线程----:main
即 重新进入页面 接收的是第3次也就是最后一次消息。没有问题 证明BehaviorSubject类创建Subject对象就是缓存最后一次的消息。
<2> ReplaySubject类:缓存所有的消息。
RxBusReplaySubject
package rxbus;
import android.text.TextUtils;
import android.util.Log;
import java.util.HashMap;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;
import rx.subjects.SerializedSubject;
import rx.subjects.Subject;
import rx.subscriptions.CompositeSubscription;
public class RxBusReplaySubject {
private final Subject<Object, Object> bus;
private HashMap<String, CompositeSubscription> hashMap;
private RxBusReplaySubject() {
bus = new SerializedSubject<>(ReplaySubject.create());
}
private static class SingletonHolder {
private static final RxBusReplaySubject defaultRxBus = new RxBusReplaySubject();
}
public static RxBusReplaySubject getInstance() {
return SingletonHolder.defaultRxBus;
}
/*
* 发送消息
*/
public void post(Object o) {
bus.onNext(o);
}
/*
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> type) {
return bus.ofType(type);
}
/**
* 订阅
*/
public <T> Subscription doSubscription(Class<T> type, Scheduler scheduler, Action1<T> next, Action1<Throwable> error) {
if (null == type || null == scheduler || null == next || null == error) {
return null;
}
return toObservable(type)
.observeOn(scheduler)
.subscribe(next, error);
}
/**
* 保存Subscription
*/
public void saveSubscription(Object o, Subscription subscription) {
if (null == o || null == subscription) {
return;
}
if (hashMap == null) {
hashMap = new HashMap<>();
}
String key = o.getClass().getName();
if (!TextUtils.isEmpty(key)) {
if (hashMap.get(key) != null) {
hashMap.get(key).add(subscription);
} else {
CompositeSubscription compositeSubscription = new CompositeSubscription();
compositeSubscription.add(subscription);
hashMap.put(key, compositeSubscription);
}
}
}
/**
* 取消订阅
*/
public void unSubscribe(Object o) {
if (null == hashMap || null == o) {
return;
}
String key = o.getClass().getName();
Log.d("TAG", "取消订阅 key----:" + key);
if (TextUtils.isEmpty(key)) {
return;
}
if (!hashMap.containsKey(key)) {
return;
}
CompositeSubscription compositeSubscription = hashMap.get(key);
if (compositeSubscription != null) {
compositeSubscription.unsubscribe();
}
Log.d("TAG", "取消订阅remove key----:" + key);
hashMap.remove(key);
}
}
即 修改RxBus类
private RxBusReplaySubject() {
bus = new SerializedSubject<>(ReplaySubject.create());
}
即:使用RxBusReplaySubject类创建Subject对象。其他代码不变。
[2] 两个接收页面 RxBus类替换成RxBusReplaySubject类。其他代码不变。
[3] 发送页面 为了测试修改了发送内容 Int类型数++。
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("123");
rxBusBean.setMsg("RxBus发送的UI线程消息mCount:" + mCount);
rxBusBean.setMsgType("RxBusTypeMainThread:" + mCount);
RxBusReplaySubject.getInstance().post(rxBusBean);
mCount++;
}
});
[4] 结果
(1) 先注册 再发送 为了测试是否记录最后一次 发了3次。
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-1
D/TAG: RxBus1Activity页面接收 线程----:main
即 先注册 后发送 发送3次 接收3次 没有问题。
(2) (1)操作完成后 重新进入两个接收页面。
重新进入页面1
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
重新进入页面2
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:1 msgType:RxBusTypeMainThread:1
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:2 msgType:RxBusTypeMainThread:2
D/TAG: RxBus1Activity页面接收 线程----:main
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息mCount:3 msgType:RxBusTypeMainThread:3
D/TAG: RxBus1Activity页面接收 线程----:main
即 重新进入页面 接收的是第1+2+3次的消息。也就是全部的消息。没有问题 证明ReplaySubject类创建Subject对象就是缓存全部的消息。