Android Studio 3.2 Canary 8
com.squareup:otto:1.3.8
io.reactivex:rxjava:1.3.7
kotlin 1.2.31
我试图使用otto EventBus将一个事件发送回我的活动。
但是,我使用RxJava来执行一些后台工作,并且需要在第一个事件完成后发送事件。然而事后。活动从不接收它。
此事件必须在主线程上执行此操作。RxJava在IO线程上。我不确定执行此操作的最佳方法是什么:
下面是我为执行RxJava和EventBus post的交互器编写的代码
class Interactors(private val eventBus: Bus) {
fun transmitMessage(): Completable {
return insertTransmission()
.andThen(onTransmissionChanged()) /* Send event to the activity */
.andThen(requestTransmission())
}
private fun insertTransmission(): Completable {
return Completable.fromCallable {
Thread.sleep(4000)
System.out.println("insertTransmission doing some long operation")
}
}
private fun requestTransmission(): Completable {
return Completable.fromCallable {
Thread.sleep(2000)
System.out.println("requestTransmission doing some long operation")
}
}
/* Need to send this event back to the activity/fragment */
private fun onTransmissionChanged(): Completable {
return Completable.fromCallable {
System.out.println("onTransmissionChanged send event to activity")
eventBus.post(TransmissionChanged())
}
}
}
活动:
public class HomeActivity extends AppCompatActivity {
private Bus eventBus = new Bus();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_home);
eventBus.register(this);
new Interactors(eventBus).transmitMessage()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
}
@Override
protected void onDestroy() {
eventBus.unregister(this);
super.onDestroy();
}
@Subscribe
public void onTransmissionChangedEvent(TransmissionChanged transmissionChanged) {
System.out.println("onTransmissionChangedEvent");
}
}
和EventBus类:
class TransmissionChanged
这是我运行应用程序时的输出:
insertTransmission doing some long operation
onTransmissionChanged
我不确定 eventBus.post(..)是否被阻止。实际上,这应该在主线程中完成,就像回发到活动以在UI中执行某些更新一样。
您的活动/片段在发布事件时可能未启动/附加,因此他们尚未注册到 eventBus。到那时,该事件已经发布,但没有订阅者(或者可能在其他地方还有其他订阅者)。也许您应该使用粘性事件使该事件“唤醒”,以便您的活动/片段仍然能够处理它。
为了使用EventBus事件作为RxJava代码,我做了如下操作:
public class EventBusRx {
private static EventBusRx ourInstance = new EventBusRx();
public static EventBusRx getInstance() {
return ourInstance;
}
private EventBusRx() {}
public final Subject<Integer> eventName = PublishSubject.create();`
}`
然后听这样的事件:
EventBusRx.getInstance().eventName
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext(o -> someAction())
.subscribe();
对于发布事件:
public void postSomeEvent(int eventValue) {
EventBusRx.getInstance().eventName.onNext(eventValue);
}
另外读一下RxJava的Replay,可能对你有帮助。
不允许接收者指定它希望在哪个线程上接收事件是Otto的一个缺点。它强制要求所有调用都必须在同一个线程上(默认为主线程)。由调用者决定是否在正确的线程上。我更喜欢绿色机器人的< code>EventBus。您可以通过注释来更改要接收的线程。所以,我的第一个建议是,如果你还不太喜欢Otto,可以考虑使用< code>EventBus来代替。
如果您无法修改所有事件总线代码,则可以通过分配Handler
将post回主循环器。这很快也很容易,但感觉有点像走出rx框架。
private fun onTransmissionChanged(): Completable {
return Completable.fromCallable {
System.out.println("onTransmissionChanged send event to activity")
Handler(Looper.getMainLooper()).post {
eventBus.post(TransmissionChanged())
}
}
}
如果您经常调用这个函数,您可能希望缓存< code>Handler并将其传递给< code>Interactors构造函数。
如果你想坚持使用RxJava调度程序,你可以将调度程序
传递到你的构造函数中,以指示你想要在哪里做你的后台工作,而不是使用buzzonOn
。在传输消息中
,使用它调度后台操作,同时强制 eventBus.post
到主线程,如下所示 -
class Interactors(private val eventBus: Bus, private val scheduler: Scheduler) {
fun transmitMessage(): Completable {
return insertTransmission()
.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.andThen(onTransmissionChanged()) /* Send event to the activity */
.observeOn(scheduler)
.andThen(requestTransmission())
}
// Rest of the class is unchanged
}
在这种情况下,您将在“主页活动”
中使用它,如下所示 -
new Interactors(eventBus, Schedulers.io()).transmitMessage()
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
您真的需要混合使用EventBus
和RxJava
吗?对我来说,这引入了额外的复杂性,但没有带来很多好处。您的用例似乎是使用Rx流的完美示例,对每次发射都做了一些工作(在您的案例中,通过onTransmissionChangedEvent()
更新UI)。
我会将< code>transmitMessage()方法改为如下形式:
fun transmitMessage(): Observable<TransmissionChanged> {
return Observable.create(ObservableOnSubscribe<TransmissionChanged> { emitter ->
insertTransmission()
emitter.onNext(TransmissionChanged()) // perform this after the long running operation from insertTransmission() has finished
requestTransmission()
emitter.onComplete() // after the long running operation from requestTransmission() has finished
})
}
我猜你需要一些额外的数据来相应地更新你的UI——这被封装在< code>TransmissionChanged类中——包括你需要的任何东西。需要注意的一点是——在RxJava 1中使用< code>Observable.create()是危险的。我不记得这样做的安全方法是什么,也没有使用RxJava 1进行实验的项目...但是在< code>Observable类中有一个html" target="_blank">工厂方法可以安全地完成这项工作。
使用上述功能,您的活动
代码也会变得更加清晰。不再需要 Otto
,因为您的所有操作都通过单个 Rx 流进行处理。
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_home);
new Interactors()
.transmitMessage()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(transmission -> onTransmissionChangedEvent(transmission),
throwable -> handleError(throwable),
() -> handleCompletion()
);
}
消息发送失败有其他原因,包括: 没有可用来向其发送消息的handlers 收件人已明确地使用fail(失败)的消息 在所有情况下回复处理程序将调用具体的失败。
发送答复处理消息时你可以在DeliveryOptions中指定超时. 如果在该时间内没有收到答复,答复处理程序将调用失败。 默认的超时时间为 30 秒。
发送一条消息将导致只有一个Handlers在接收该消息的地址注册。这是点对点的消息传递模式。这个Handlers的选择是一个非严格的循环方式。 发送一条消息时,可以send eventBus.send("news.uk.sport", "Yay! Someone kicked a ball");
我想将发送到Azure服务总线主题的事件发送到事件中心。这可能吗? 详细信息:我与我公司的另一个团队合作,该团队接收Azure Service Bus主题的第三方事件(通过webhook),并在不同的应用程序中进一步使用。 我的团队现在希望使用我们现有的事件中心并使用Azure捕获将这些事件存储到存储帐户来收听/订阅此主题。 我做了以下工作: 我在他们的Azure服务总线中创建了他们主题的订阅。我
让我们跳进 API 获取事件总线 你获取到事件总线的引用,如下所示: EventBus eb = vertx.eventBus(); 还有每个 Vert.x 实例事件总线的单个实例。 注册处理程序 这个最简单的方法来注册一个处理程序用consumer。下面是一个示例: EventBus eb = vertx.eventBus(); eb.consumer("news.uk.sport", mess
注:本节未经校验,如有问题欢迎提issue 最初设想是为了提供一种向多个actor群发消息的方法,之后EventBus被一般化为一组实现一个简单接口的可组合的特质: /** * Attempts to register the subscriber to the specified Classifier * @return true if successful and false if not