当前位置: 首页 > 工具软件 > xBus > 使用案例 >

RxBus详解(基于2.X版本)

巫马自明
2023-12-01

RxJava系列

https://blog.csdn.net/weixin_37730482/category_10501619.html

 

RxJava详解(基于2.X版本)

https://blog.csdn.net/weixin_37730482/article/details/69280013

 

RxBus详解 基于1.X版本

https://blog.csdn.net/weixin_37730482/article/details/72772261

 

 

本章节讲述基于2.X版本的RxBus

 

 

 

一.Gradle依赖 RxJava&RxAndroid 版本须一致 即都是2.x版本

implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
 
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

 

 

 

 

 

 

二.代码实现

 

<1> RxBus

package com.wjn.myapplication.rxbus;

import android.text.TextUtils;
import android.util.Log;

import java.util.HashMap;

import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;

public class RxBus {

    private final Subject<Object> bus;
    private HashMap<String, CompositeDisposable> hashMap;

    private RxBus() {
        bus = PublishSubject.create().toSerialized();
    }

    public static RxBus getInstance() {
        return Holder.BUS;
    }

    private static class Holder {
        private static final RxBus BUS = new RxBus();
    }

    /**
     * 发送消息
     */

    public void post(Object obj) {
        bus.onNext(obj);
    }

    /**
     * 转换为特定类型的Obserbale
     */

    public <T> Observable<T> toObservable(Class<T> tClass) {
        return bus.ofType(tClass);
    }

    /**
     * 订阅
     */

    public <T> Disposable doDisposable(Class<T> type, Scheduler scheduler, Consumer<T> next, Consumer<Throwable> error) {
        if (null == type || null == scheduler || null == next || null == error) {
            return null;
        }

        return toObservable(type)
                .observeOn(scheduler)
                .subscribe(next, error);
    }

    /**
     * 保存Disposable
     */

    public void saveDisposable(Object o, Disposable disposable) {
        if (null == o || null == disposable) {
            return;
        }

        if (hashMap == null) {
            hashMap = new HashMap<>();
        }

        String key = o.getClass().getName();
        if (!TextUtils.isEmpty(key)) {
            if (hashMap.get(key) != null) {
                hashMap.get(key).add(disposable);
            } else {
                CompositeDisposable compositeDisposable = new CompositeDisposable();
                compositeDisposable.add(disposable);
                hashMap.put(key, compositeDisposable);
            }
        }
    }

    /**
     * 取消订阅
     */

    public void dispose(Object o) {
        if (null == hashMap || null == o) {
            return;
        }

        String key = o.getClass().getName();
        Log.d("TAG", "取消订阅 key----:" + key);
        if (TextUtils.isEmpty(key)) {
            return;
        }

        if (!hashMap.containsKey(key)) {
            return;
        }

        CompositeDisposable compositeDisposable = hashMap.get(key);
        if (compositeDisposable != null) {
            compositeDisposable.dispose();
        }
        Log.d("TAG", "取消订阅remove key----:" + key);
        hashMap.remove(key);
    }
}

 

 

 

<2> 接收消息页面 子线程接收

package com.wjn.myapplication.rxbus;

import android.content.Intent;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.myapplication.R;

import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;

/**
 * 拟子线程接收RxBus消息
 */

public class RxBusActivity extends AppCompatActivity {

    private TextView mATextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxbus);
        findView();
        getMsgWithRxBus();
    }

    /**
     * 初始化各种View
     */

    private void findView() {
        mATextView = findViewById(R.id.activity_rxbus_atextview);

        mATextView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Intent intent = new Intent(RxBusActivity.this, RxBus1Activity.class);
                startActivity(intent);
            }
        });
    }

    /**
     * 接收RxBus消息
     */

    private void getMsgWithRxBus() {
        Disposable disposable = RxBus.getInstance()
                .doDisposable(RxBusBean.class,
                        Schedulers.newThread(), new Consumer<RxBusBean>() {
                            @Override
                            public void accept(RxBusBean rxBusBean) throws Exception {
                                if (null == rxBusBean) {
                                    return;
                                }

                                String id = rxBusBean.getId();
                                String msg = rxBusBean.getMsg();
                                String msgType = rxBusBean.getMsgType();

                                Log.d("TAG", "RxBusActivity接收 内容id:" + id + "   msg:" + msg + "  msgType:" + msgType);
                                Log.d("TAG", "RxBusActivity页面接收 线程----:" + Thread.currentThread().getName());
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {

                            }
                        });

        RxBus.getInstance().saveDisposable(this, disposable);
    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().dispose(this);
    }

}

 

 

 

<3> 接收消息页面 UI线程接收

package com.wjn.myapplication.rxbus;

import android.content.Intent;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.myapplication.R;

import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;

/**
 * 模拟UI线程接收RxBus消息
 */

public class RxBus1Activity extends AppCompatActivity {

    private TextView mATextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxbus1);
        findView();
        getMsgWithRxBus();
    }

    /**
     * 初始化各种View
     */

    private void findView() {
        mATextView = findViewById(R.id.activity_rxbus1_atextview);

        mATextView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                Intent intent = new Intent(RxBus1Activity.this, RxBus2Activity.class);
                startActivity(intent);
            }
        });
    }

    /**
     * 接收RxBus消息
     */

    private void getMsgWithRxBus() {
        Disposable disposable = RxBus.getInstance()
                .doDisposable(RxBusBean.class,
                        AndroidSchedulers.mainThread(),
                        new Consumer<RxBusBean>() {
                            @Override
                            public void accept(RxBusBean rxBusBean) throws Exception {
                                if (null == rxBusBean) {
                                    return;
                                }

                                String id = rxBusBean.getId();
                                String msg = rxBusBean.getMsg();
                                String msgType = rxBusBean.getMsgType();

                                Log.d("TAG", "RxBus1Activity接收 内容id:" + id + "   msg:" + msg + "  msgType:" + msgType);
                                Log.d("TAG", "RxBus1Activity页面接收 线程----:" + Thread.currentThread().getName());
                            }
                        }, new Consumer<Throwable>() {
                            @Override
                            public void accept(Throwable throwable) throws Exception {

                            }
                        });
        RxBus.getInstance().saveDisposable(this, disposable);
    }

    /**
     * onDestroy方法
     */

    @Override
    protected void onDestroy() {
        super.onDestroy();
        RxBus.getInstance().dispose(this);
    }

}

 

 

 

<4> 发送消息页面

package com.wjn.myapplication.rxbus;

import android.os.Bundle;
import android.view.View;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.wjn.myapplication.R;

public class RxBus2Activity extends AppCompatActivity {

    private TextView mATextView;
    private TextView mBTextView;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_rxbus2);
        findView();
    }

    /**
     * 初始化各种View
     */

    private void findView() {
        mATextView = findViewById(R.id.activity_rxbus2_atextview);
        mBTextView = findViewById(R.id.activity_rxbus2_btextview);

        mATextView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                RxBusBean rxBusBean = new RxBusBean();
                rxBusBean.setId("123");
                rxBusBean.setMsg("RxBus发送的UI线程消息");
                rxBusBean.setMsgType("RxBusTypeMainThread");
                RxBus.getInstance().post(rxBusBean);
            }
        });

        mBTextView.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        RxBusBean rxBusBean = new RxBusBean();
                        rxBusBean.setId("456");
                        rxBusBean.setMsg("RxBus发送的子线程消息");
                        rxBusBean.setMsgType("RxBusTypeNewThread");
                        RxBus.getInstance().post(rxBusBean);
                    }
                }).start();
            }
        });
    }

}

 

 

 

<5> 结果 

UI线程发送

D/TAG: RxBusActivity接收 内容id:123   msg:RxBus发送的UI线程消息  msgType:RxBusTypeMainThread

D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2



D/TAG: RxBus1Activity接收 内容id:123   msg:RxBus发送的UI线程消息  msgType:RxBusTypeMainThread

D/TAG: RxBus1Activity页面接收 线程----:main

 

子线程发送

D/TAG: RxBus1Activity接收 内容id:456   msg:RxBus发送的子线程消息  msgType:RxBusTypeNewThread

D/TAG: RxBusActivity接收 内容id:456   msg:RxBus发送的子线程消息  msgType:RxBusTypeNewThread



D/TAG: RxBusActivity页面接收 线程----:RxNewThreadScheduler-2

D/TAG: RxBus1Activity页面接收 线程----:main

 

 

 

 

 

 

三.RxBus粘性事件

 

同理 RxBus详解 基于1.X版本。这里不做过多的赘述。

 

如果想记住最后一次消息

private RxBus() {
    bus = BehaviorSubject.create().toSerialized();
}

 

如果想记住全部消息

private RxBus() {
    bus = ReplaySubject.create().toSerialized();
}

 

详情

https://blog.csdn.net/weixin_37730482/article/details/72772261

 

 

 

 

 

 

 

四.1.X和2.X两个版本的对比

 

由上可知,1.X和2.X两个版本还是有一定的区别的。这主要是因为1.X和2.X版本的RxJava存在着巨大的不同。

 

详情

https://blog.csdn.net/weixin_37730482/category_10501619.html

 

那么使用RxBus的功能,两个版本有哪些不同呢,下面总结一下几个常用的不同点。

 

 

<1> 创建Subject抽象类的不同

 

(1) 1.X版本

private RxBus() {
    bus = new SerializedSubject<>(PublishSubject.create());
}

 

(2) 2.X版本

private RxBus() {
    bus = PublishSubject.create().toSerialized();
}

 

 

 

<2> 取消订阅的不同

 

(1) 1.X版本

取消订阅操作中,1.X版本的RxJava中,订阅操作会返回一个Subscription对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Subscription对象,我们可以用一个CompositeSubscription存储起来,以进行批量取消订阅

 

 

(2) 2.X版本

取消订阅操作中,2.X版本的RxJava中,订阅操作会返回一个Disposable对象,以便在合适的时机取消订阅,防止内存泄漏,如果一个类产生多个Disposable对象,我们可以用一个CompositeDisposable存储起来,以进行批量取消订阅

 

 

即:1.X版本取消订阅操作的是Subscription+CompositeSubscription。2.X版本取消订阅操作的是Disposable+CompositeDisposable

 

 

 

 

 

 

 类似资料: