当前位置: 首页 > 知识库问答 >
问题:

如何在RxJava中正确转换多播可观察对象

酆勇
2023-03-14

假设我有一个要转换为反应流的事件发出数据源。数据源由资源(例如,定期发送更新状态的套接字)绑定,所以我希望共享对该资源的单个订阅。使用带有回放的单个observable(新订阅者可以立即获得当前值)和refCount操作符似乎非常适合这样做。例如,这是他的MyDataProvider的外观:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}

然而,现在假设我有另一个数据源,它需要第一个数据源的结果来计算其自身的值:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}

在这里,我的设置开始崩溃。第一个可观察到的多播仅在refCount操作员之前有效。之后,一切都再次单播。这意味着,如果对另一个数据提供者进行了两次单独的订阅,则会调用两次flatMap操作符。我认为有两种解决方法,但我不喜欢这两种方法:

对于我来说,最简单的解决方法似乎是在进行多播操作之前,将myDataObservable的单播变体保存在某个地方,然后在另一个DataObservable中执行该多播操作。然而,如果这两个Observable位于不同的模块中,这种解决方法会使代码非常不美观,要求MyDataProvider公开两个看起来返回相同数据的不同观测值。

第二个解决方法似乎是在antherDataWatable中再次应用那些replayrefCount运算符。但这会导致效率低下,因为myDataWatable中的第一个多播运算符已经应用,但现在什么也不做,除了浪费内存和CPU周期。

这两种解决方法还涉及将另一个数据提供者耦合到MyDataProvider。如果将来MyDataProvider发生变化,不再需要多播,我还必须更新另一个数据提供程序,从中删除多播html" target="_blank">操作符。

解决这个问题的更优雅的方法是什么?我能更好地构建它以完全避免这个问题吗?

共有3个答案

闾丘照
2023-03-14

您可以分割单播流和多播流,但这是多余的。我认为第二种方法更好,顺便说一句,replayrefcount操作符实际上是在做事情,不是浪费。

调用启用多播时,您正在将myDataObservable的可观测转换为可观测
然后使用refcount()内部订阅,这也为后续订阅提供了一个单点;在这一点之后,一切都再次单播。

在另一个可观察的数据中,您真正想要实现的是相同的,因此,请执行与在myDataObservable中完全相同的操作。

狄鸿禧
2023-03-14

您可以使用“publish().refCount()”串联来允许共享单个订阅者。由于它们经常使用,因此它们有一个别名share()。

您也可以使用ConnectableWatable。但是在使用ConnectableWatables重播时要小心。

如果您在将可观察对象转换为可连接可观察对象之前将重播运算符应用于可观察对象,则生成的可连接可观察对象将始终向任何未来的观察者发出相同的完整序列,即使是那些在可连接可观察对象开始向其他订阅观察者发出项目后订阅的观察者。如文档所述:

曹经业
2023-03-14

关于您的第一种方法,在当前设置中,您的另一个DataObservable使用myDataObservable,据我所知,它们是逻辑耦合的,因为它们使用相同的源。因此,您需要为它们提供一些基本的共享逻辑。我将其提取到一个公共模块中,该模块将公开observable的单播版本,然后使myDataObservable和另一个DataObservable在不同的模块中使用,每个模块都添加了多播逻辑

另一种选择是,让一个类来监视您的资源,方法是像在myDataObservable中那样订阅它,在onNext中进行处理,并发布带有主题的映射结果,即如果您希望始终能够访问上次发布的值,则使用BehavioralSubject,并使用另一个主题发布原始结果。客户端将订阅该主题,并将获得仅在监控类中计算一次的映射值或原始值。

P、 在订阅之前,记住在你的主题中添加背压策略。

如果这些选项不适合您,请思考避免多次调用flatMap是否真的很重要?您的代码非常简单,这是一个重要的指标。如果flatMap不重,您可以让它运行多次。

 类似资料:
  • 问题内容: 给定汽车清单(),我可以这样做: 有没有办法我可以从一个到一个序列? 像没有参数的东西 问题答案: 您可以这样映射到: 请注意,flatMapping可能不会保留源可观察的顺序。如果订单对您很重要,请使用。

  • 我正在尝试开发我的第一个RxJava例子 我有一个带有文本框和三个按钮的主要活动。第一个按钮初始化单独类中的整数。第二个按钮订阅一个可观察量,该可观察量假定正在观察整数。第三个按钮将整数的值减小 1。 这是我的密码 和班级 当我尝试使用 订阅时,它只是给了我 的值(即 6),然后它给了我完成! 然后我尝试使用,认为我需要使用,只是而不是,但后来我得到了一个返回的空的,然后再次完成! 有人能帮助我从

  • 问题内容: 我使用侦听器作为回调来观察Android的异步操作,但是我认为用RxJava替换此侦听器可能很棒,我是使用此库的新手,但是我真的很喜欢它,并且我始终将其与Android项目一起使用。 这是我的重构代码: 一个简单的回调: 和“观察者”: 谢谢! 问题答案: 例如,您可以使用 Observable.fromCallable 来创建可观察数据。 然后使用您的数据 使用了rxjava 1.x

  • 问题内容: 我正在尝试将项目设置为表视图,但是setitems方法需要一个可观察的列表,而我的模型中却有一个可观察的集合.FXCollections实用程序类没有给定可观察的集合来创建可观察的列表的方法。类强制转换异常(按预期)。 目前,我正在使用这种代码 而且我有一些问题: 在表中进行编辑是否会按预期更新基础集? 这是这样做的“正确”方法吗 简而言之,我需要样式指南或最佳做法,以便在可观察集和可

  • 我正在从事一个涉及Hystrix的项目,我决定使用RxJava。现在,忘记Hystrix的其余部分,因为我相信主要问题是我完全搞砸了正确编写可观察代码。 需要:我需要一种方法来返回一个代表多个可观察对象的可观察对象,每个可观察对象都运行一个用户任务。我希望该可观察对象能够返回任务的所有结果,甚至错误。 问题:可观测流会因错误而消亡。如果我有三个任务,而第二个任务引发了一个异常,那么即使第三个任务成