当前位置: 首页 > 知识库问答 >
问题:

可以观察到,当没有订阅服务器时缓冲项,然后发出它们,并在订阅时清除缓冲区?

卞经业
2023-03-14

我想要一个可以观察到的:

  1. 可以按需发出项目,但从未真正完成(一个热的可观察的?)
  2. 知道何时有订阅者
  3. 如果没有订阅服务器,它将缓冲我告诉它发出的项
  4. 订阅后,它将按顺序发出缓冲项,然后清除缓冲区,然后继续允许我发出更多项
  5. 取消订阅时(订阅服务器已释放?),它将返回缓冲。

这是我所想的一个伪代码--我没有必要的回调来正确地执行此操作。另外,如果我能把它全部用一个可观察的或主题来包装,那就好了。

class RxEventSender {
    private val publishSubject = PublishSubject.create<Action>()

    val observable: Observable<Action> = publishSubject

    private val bufferedActions = arrayListOf<Action>()

    private var hasSubscribers = false

    fun send(action: Action) {
        if (hasSubscribers) {
            publishSubject.onNext(action)
        } else {
            bufferedActions.add(action)
        }
    }

    //Subject was subscribed to -- not a real callback
    fun onSubscribed() {
        hasSubscribers = true
        bufferedActions.forEach {action ->
            publishSubject.onNext(action)
        }
        bufferedActions.clear()
    }

    //Subject was unsubscribed -- not a real callback
    fun onUnsubscribed() {
        hasSubscribers = false
    }
}

共有1个答案

孔冥夜
2023-03-14

使用replaysubject。如果您担心缓冲区变得太大,它有无界版本和有界版本。

 类似资料:
  • 有谁能推荐一个更好的解决方案吗?

  • 本文向大家介绍system.reactive 订阅/取消订阅可观察对象(IDisposable),包括了system.reactive 订阅/取消订阅可观察对象(IDisposable)的使用技巧和注意事项,需要的朋友参考一下 示例 订阅返回IDisposable: 当您准备取消订阅时,只需处置订阅即可:            

  • 我必须从两个订阅服务器获取数据,但我总是获取第一个订阅服务器的数据。 我有一个数据共享服务: 在离开搜索组件之前,我调用update方法。 现在,我在results组件上。我得到的共享数据如下: 我的问题是:我需要共享数据来订阅另一个可观察的数据。首先,我构造了一个物体乘坐,在我调用搜索方法之后 问题是我总是从数据服务获取数据,而不是从api调用。api工作导致我在存储中拦截结果,而不是在组件中。

  • 我正在尝试使用spring-integration-kafka-2.1.0。在我公司的项目中发布。但是,由于下面列出的例外情况,它不起作用:org。springframework。信息。MessageDeliveryException:Dispatcher没有频道“org”的订户。springframework。网状物上下文WebApplicationContext:/order。“奥Kafka”

  • 当我创建5个observables并用单独的订阅者订阅它们时,直觉上我认为每个订阅者都将获得其observables的相应数据,这些数据通过onNext()调用发出: 然而,我在日志中看到的是一两个“testit onnext”。 如果有任何提示,我将不胜感激。

  • 我正在尝试实现一个RXJava2