当前位置: 首页 > 知识库问答 >
问题:

使用 RxJava 和事件总线将事件发送回活动/片段

田仲卿
2023-03-14
Android Studio 3.2 Canary 8
com.squareup:otto:1.3.8
io.reactivex:rxjava:1.3.7
kotlin 1.2.31

我试图使用otto EventBus将一个事件发送回我的活动。

但是,我使用RxJava来执行一些后台工作,并且需要在第一个事件完成后发送事件。然而事后。活动从不接收它。

此事件必须在主线程上执行此操作。RxJava在IO线程上。我不确定执行此操作的最佳方法是什么:

下面是我为执行RxJava和EventBus post的交互器编写的代码

class Interactors(private val eventBus: Bus) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .andThen(requestTransmission())
    }

    private fun insertTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(4000)
            System.out.println("insertTransmission doing some long operation")
        }
    }

    private fun requestTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(2000)
            System.out.println("requestTransmission doing some long operation")
        }
    }

    /* Need to send this event back to the activity/fragment */
    private fun onTransmissionChanged(): Completable {
        return Completable.fromCallable {
            System.out.println("onTransmissionChanged send event to activity")
            eventBus.post(TransmissionChanged())
        }
    }
}

活动:

public class HomeActivity extends AppCompatActivity {
    private Bus eventBus = new Bus();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_home);

        eventBus.register(this);

        new Interactors(eventBus).transmitMessage()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }

    @Override
    protected void onDestroy() {
        eventBus.unregister(this);
        super.onDestroy();
    }

    @Subscribe
    public void onTransmissionChangedEvent(TransmissionChanged transmissionChanged) {
        System.out.println("onTransmissionChangedEvent");
    }
}

和EventBus类:

class TransmissionChanged

这是我运行应用程序时的输出:

insertTransmission doing some long operation
onTransmissionChanged

我不确定 eventBus.post(..)是否被阻止。实际上,这应该在主线程中完成,就像回发到活动以在UI中执行某些更新一样。

共有3个答案

易俊友
2023-03-14

您的活动/片段在发布事件时可能未启动/附加,因此他们尚未注册到 eventBus。到那时,该事件已经发布,但没有订阅者(或者可能在其他地方还有其他订阅者)。也许您应该使用粘性事件使该事件“唤醒”,以便您的活动/片段仍然能够处理它。

为了使用EventBus事件作为RxJava代码,我做了如下操作:

public class EventBusRx {
    private static EventBusRx ourInstance = new EventBusRx();

    public static EventBusRx getInstance() {
        return ourInstance;
    }

    private EventBusRx() {}

    public final Subject<Integer> eventName = PublishSubject.create();`
}`

然后听这样的事件:

EventBusRx.getInstance().eventName
            .subscribeOn(AndroidSchedulers.mainThread())
            .doOnNext(o -> someAction())
            .subscribe();

对于发布事件:

public void postSomeEvent(int eventValue) {
    EventBusRx.getInstance().eventName.onNext(eventValue);
}

另外读一下RxJava的Replay,可能对你有帮助。

羊舌和安
2023-03-14

不允许接收者指定它希望在哪个线程上接收事件是Otto的一个缺点。它强制要求所有调用都必须在同一个线程上(默认为主线程)。由调用者决定是否在正确的线程上。我更喜欢绿色机器人的< code>EventBus。您可以通过注释来更改要接收的线程。所以,我的第一个建议是,如果你还不太喜欢Otto,可以考虑使用< code>EventBus来代替。

如果您无法修改所有事件总线代码,则可以通过分配Handler将post回主循环器。这很快也很容易,但感觉有点像走出rx框架

private fun onTransmissionChanged(): Completable {
    return Completable.fromCallable {
        System.out.println("onTransmissionChanged send event to activity")
        Handler(Looper.getMainLooper()).post {
            eventBus.post(TransmissionChanged())
        }
    }
}

如果您经常调用这个函数,您可能希望缓存< code>Handler并将其传递给< code>Interactors构造函数。

如果你想坚持使用RxJava调度程序,你可以将调度程序传递到你的构造函数中,以指示你想要在哪里做你的后台工作,而不是使用buzzonOn。在传输消息中,使用它调度后台操作,同时强制 eventBus.post 到主线程,如下所示 -

class Interactors(private val eventBus: Bus, private val scheduler: Scheduler) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .subscribeOn(scheduler)
                .observeOn(AndroidSchedulers.mainThread())
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .observeOn(scheduler)
                .andThen(requestTransmission())
    }
    // Rest of the class is unchanged

}

在这种情况下,您将在“主页活动”中使用它,如下所示 -

new Interactors(eventBus, Schedulers.io()).transmitMessage()
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe();
白吕恭
2023-03-14

您真的需要混合使用EventBusRxJava吗?对我来说,这引入了额外的复杂性,但没有带来很多好处。您的用例似乎是使用Rx流的完美示例,对每次发射都做了一些工作(在您的案例中,通过onTransmissionChangedEvent()更新UI)。

我会将< code>transmitMessage()方法改为如下形式:

fun transmitMessage(): Observable<TransmissionChanged> {
    return Observable.create(ObservableOnSubscribe<TransmissionChanged> { emitter ->
        insertTransmission()

        emitter.onNext(TransmissionChanged())  // perform this after the long running operation from insertTransmission() has finished

        requestTransmission()

        emitter.onComplete()   // after the long running operation from requestTransmission() has finished
    })
}

我猜你需要一些额外的数据来相应地更新你的UI——这被封装在< code>TransmissionChanged类中——包括你需要的任何东西。需要注意的一点是——在RxJava 1中使用< code>Observable.create()是危险的。我不记得这样做的安全方法是什么,也没有使用RxJava 1进行实验的项目...但是在< code>Observable类中有一个html" target="_blank">工厂方法可以安全地完成这项工作。

使用上述功能,您的活动代码也会变得更加清晰。不再需要 Otto,因为您的所有操作都通过单个 Rx 流进行处理。

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_home);

    new Interactors()
        .transmitMessage()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(transmission -> onTransmissionChangedEvent(transmission),
                throwable -> handleError(throwable),
                () -> handleCompletion()
        );
}
 类似资料:
  • 消息发送失败有其他原因,包括: 没有可用来向其发送消息的handlers 收件人已明确地使用fail(失败)的消息 在所有情况下回复处理程序将调用具体的失败。

  • 发送答复处理消息时你可以在DeliveryOptions中指定超时. 如果在该时间内没有收到答复,答复处理程序将调用失败。 默认的超时时间为 30 秒。

  • 发送一条消息将导致只有一个Handlers在接收该消息的地址注册。这是点对点的消息传递模式。这个Handlers的选择是一个非严格的循环方式。 发送一条消息时,可以send eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");

  • 我想将发送到Azure服务总线主题的事件发送到事件中心。这可能吗? 详细信息:我与我公司的另一个团队合作,该团队接收Azure Service Bus主题的第三方事件(通过webhook),并在不同的应用程序中进一步使用。 我的团队现在希望使用我们现有的事件中心并使用Azure捕获将这些事件存储到存储帐户来收听/订阅此主题。 我做了以下工作: 我在他们的Azure服务总线中创建了他们主题的订阅。我

  • 让我们跳进 API 获取事件总线 你获取到事件总线的引用,如下所示: EventBus eb = vertx.eventBus(); 还有每个 Vert.x 实例事件总线的单个实例。 注册处理程序 这个最简单的方法来注册一个处理程序用consumer。下面是一个示例: EventBus eb = vertx.eventBus(); eb.consumer("news.uk.sport", mess

  • 注:本节未经校验,如有问题欢迎提issue 最初设想是为了提供一种向多个actor群发消息的方法,之后EventBus被一般化为一组实现一个简单接口的可组合的特质: /** * Attempts to register the subscriber to the specified Classifier * @return true if successful and false if not