RxJava系列
https://blog.csdn.net/weixin_37730482/category_10501619.html
RxJava详解(基于2.X版本)
https://blog.csdn.net/weixin_37730482/article/details/69280013
RxBus详解 基于1.X版本
https://blog.csdn.net/weixin_37730482/article/details/72772261
本章节讲述基于2.X版本的RxBus
implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
<1> RxBus
package com.wjn.myapplication.rxbus;
import android.text.TextUtils;
import android.util.Log;
import java.util.HashMap;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> bus;
private HashMap<String, CompositeDisposable> hashMap;
private RxBus() {
bus = PublishSubject.create().toSerialized();
}
public static RxBus getInstance() {
return Holder.BUS;
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
/**
* 发送消息
*/
public void post(Object obj) {
bus.onNext(obj);
}
/**
* 转换为特定类型的Obserbale
*/
public <T> Observable<T> toObservable(Class<T> tClass) {
return bus.ofType(tClass);
}
/**
* 订阅
*/
public <T> Disposable doDisposable(Class<T> type, Scheduler scheduler, Consumer<T> next, Consumer<Throwable> error) {
if (null == type || null == scheduler || null == next || null == error) {
return null;
}
return toObservable(type)
.observeOn(scheduler)
.subscribe(next, error);
}
/**
* 保存Disposable
*/
public void saveDisposable(Object o, Disposable disposable) {
if (null == o || null == disposable) {
return;
}
if (hashMap == null) {
hashMap = new HashMap<>();
}
String key = o.getClass().getName();
if (!TextUtils.isEmpty(key)) {
if (hashMap.get(key) != null) {
hashMap.get(key).add(disposable);
} else {
CompositeDisposable compositeDisposable = new CompositeDisposable();
compositeDisposable.add(disposable);
hashMap.put(key, compositeDisposable);
}
}
}
/**
* 取消订阅
*/
public void dispose(Object o) {
if (null == hashMap || null == o) {
return;
}
String key = o.getClass().getName();
Log.d("TAG", "取消订阅 key----:" + key);
if (TextUtils.isEmpty(key)) {
return;
}
if (!hashMap.containsKey(key)) {
return;
}
CompositeDisposable compositeDisposable = hashMap.get(key);
if (compositeDisposable != null) {
compositeDisposable.dispose();
}
Log.d("TAG", "取消订阅remove key----:" + key);
hashMap.remove(key);
}
}
<2> 接收消息页面 子线程接收
package com.wjn.myapplication.rxbus;
import android.content.Intent;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.myapplication.R;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
/**
* 拟子线程接收RxBus消息
*/
public class RxBusActivity extends AppCompatActivity {
private TextView mATextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus);
findView();
getMsgWithRxBus();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus_atextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Intent intent = new Intent(RxBusActivity.this, RxBus1Activity.class);
startActivity(intent);
}
});
}
/**
* 接收RxBus消息
*/
private void getMsgWithRxBus() {
Disposable disposable = RxBus.getInstance()
.doDisposable(RxBusBean.class,
Schedulers.newThread(), new Consumer<RxBusBean>() {
@Override
public void accept(RxBusBean rxBusBean) throws Exception {
if (null == rxBusBean) {
return;
}
String id = rxBusBean.getId();
String msg = rxBusBean.getMsg();
String msgType = rxBusBean.getMsgType();
Log.d("TAG", "RxBusActivity接收 内容id:" + id + " msg:" + msg + " msgType:" + msgType);
Log.d("TAG", "RxBusActivity页面接收 线程----:" + Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
RxBus.getInstance().saveDisposable(this, disposable);
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getInstance().dispose(this);
}
}
<3> 接收消息页面 UI线程接收
package com.wjn.myapplication.rxbus;
import android.content.Intent;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.myapplication.R;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
/**
* 模拟UI线程接收RxBus消息
*/
public class RxBus1Activity extends AppCompatActivity {
private TextView mATextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus1);
findView();
getMsgWithRxBus();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus1_atextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Intent intent = new Intent(RxBus1Activity.this, RxBus2Activity.class);
startActivity(intent);
}
});
}
/**
* 接收RxBus消息
*/
private void getMsgWithRxBus() {
Disposable disposable = RxBus.getInstance()
.doDisposable(RxBusBean.class,
AndroidSchedulers.mainThread(),
new Consumer<RxBusBean>() {
@Override
public void accept(RxBusBean rxBusBean) throws Exception {
if (null == rxBusBean) {
return;
}
String id = rxBusBean.getId();
String msg = rxBusBean.getMsg();
String msgType = rxBusBean.getMsgType();
Log.d("TAG", "RxBus1Activity接收 内容id:" + id + " msg:" + msg + " msgType:" + msgType);
Log.d("TAG", "RxBus1Activity页面接收 线程----:" + Thread.currentThread().getName());
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
}
});
RxBus.getInstance().saveDisposable(this, disposable);
}
/**
* onDestroy方法
*/
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getInstance().dispose(this);
}
}
<4> 发送消息页面
package com.wjn.myapplication.rxbus;
import android.os.Bundle;
import android.view.View;
import android.widget.TextView;
import androidx.appcompat.app.AppCompatActivity;
import com.wjn.myapplication.R;
public class RxBus2Activity extends AppCompatActivity {
private TextView mATextView;
private TextView mBTextView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxbus2);
findView();
}
/**
* 初始化各种View
*/
private void findView() {
mATextView = findViewById(R.id.activity_rxbus2_atextview);
mBTextView = findViewById(R.id.activity_rxbus2_btextview);
mATextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("123");
rxBusBean.setMsg("RxBus发送的UI线程消息");
rxBusBean.setMsgType("RxBusTypeMainThread");
RxBus.getInstance().post(rxBusBean);
}
});
mBTextView.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
new Thread(new Runnable() {
@Override
public void run() {
RxBusBean rxBusBean = new RxBusBean();
rxBusBean.setId("456");
rxBusBean.setMsg("RxBus发送的子线程消息");
rxBusBean.setMsgType("RxBusTypeNewThread");
RxBus.getInstance().post(rxBusBean);
}
}).start();
}
});
}
}
<5> 结果
UI线程发送
D/TAG: RxBusActivity接收 内容id:123 msg:RxBus发送的UI线程消息 msgType:RxBusTypeMainThread
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
D/TAG: RxBus1Activity接收 内容id:123 msg:RxBus发送的UI线程消息 msgType:RxBusTypeMainThread
D/TAG: RxBus1Activity页面接收 线程----:main
子线程发送
D/TAG: RxBus1Activity接收 内容id:456 msg:RxBus发送的子线程消息 msgType:RxBusTypeNewThread
D/TAG: RxBusActivity接收 内容id:456 msg:RxBus发送的子线程消息 msgType:RxBusTypeNewThread
D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2
D/TAG: RxBus1Activity页面接收 线程----:main
同理 RxBus详解 基于1.X版本。这里不做过多的赘述。
如果想记住最后一次消息。
private RxBus() {
bus = BehaviorSubject.create().toSerialized();
}
如果想记住全部消息。
private RxBus() {
bus = ReplaySubject.create().toSerialized();
}
详情
https://blog.csdn.net/weixin_37730482/article/details/72772261
由上可知,1.X和2.X两个版本还是有一定的区别的。这主要是因为1.X和2.X版本的RxJava存在着巨大的不同。
详情
https://blog.csdn.net/weixin_37730482/category_10501619.html
那么使用RxBus的功能,两个版本有哪些不同呢,下面总结一下几个常用的不同点。
<1> 创建Subject抽象类的不同
(1) 1.X版本
private RxBus() {
bus = new SerializedSubject<>(PublishSubject.create());
}
(2) 2.X版本
private RxBus() {
bus = PublishSubject.create().toSerialized();
}
<2> 取消订阅的不同
(1) 1.X版本
取消订阅操作中,1.X版本的RxJava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量的取消订阅。
(2) 2.X版本
取消订阅操作中,2.X版本的RxJava中,订阅操作会返回一个Disposable对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Disposable对象,我们可以用一个CompositeDisposable存储起来,以进行批量的取消订阅。
即:1.X版本取消订阅操作的是Subscription+CompositeSubscription。2.X版本取消订阅操作的是Disposable+CompositeDisposable。