我想要一个可以观察到的:
这是我所想的一个伪代码--我没有必要的回调来正确地执行此操作。另外,如果我能把它全部用一个可观察的或主题来包装,那就好了。
class RxEventSender {
private val publishSubject = PublishSubject.create<Action>()
val observable: Observable<Action> = publishSubject
private val bufferedActions = arrayListOf<Action>()
private var hasSubscribers = false
fun send(action: Action) {
if (hasSubscribers) {
publishSubject.onNext(action)
} else {
bufferedActions.add(action)
}
}
//Subject was subscribed to -- not a real callback
fun onSubscribed() {
hasSubscribers = true
bufferedActions.forEach {action ->
publishSubject.onNext(action)
}
bufferedActions.clear()
}
//Subject was unsubscribed -- not a real callback
fun onUnsubscribed() {
hasSubscribers = false
}
}
使用replaysubject
。如果您担心缓冲区变得太大,它有无界版本和有界版本。
有谁能推荐一个更好的解决方案吗?
本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:
我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。
我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”
当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。
我正在尝试实现一个RXJava2