当前位置: 首页 > 知识库问答 >
问题:

使用concat()和first()实现RxJava并发调用的缓存

朱修德
2023-03-14

我正在使用RxJava和concat()first()操作符:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

cachedEntities返回从缓存列表构建的可观察,而networkEntities方法使用Retrofit获取实体。

除非两个用户快速订阅getEntities()返回的可观察对象,否则这非常有效。我猜在进行第二次订阅时,第一次订阅的网络请求没有完成。在这种情况下,执行了两个网络请求。我想避免。

我尝试创建一个单线程调度程序,以便仅在第一次调用结束时执行第二次调用,但没有运气:

 mSingleThreadScheduler = Schedulers.from(Executors.newSingleThreadExecutor());

以及:

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .subscribeOn(mSingleThreadScheduler)
            .first();
}

我曾尝试将subscribeOn调用放在可观察链的较低位置,但我得到了相同的结果。

有什么提示吗?

共有3个答案

祝花蜂
2023-03-14

这似乎是一个可观察到的具有多个订阅者的相对常见的用例。您需要类似的东西

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first()
            .replay(1)            
}

有关更深入的解释,请参阅此问题的答案。

壤驷棋
2023-03-14

给予

public Observable<List<Entity>> getEntities() {
    invalidateCacheIfNeeded();
    return Observable
            .concat(cachedEntities(), networkEntities())
            .first();
}

您应该创建一个AsyncSubject

private Observable<List<Entity>> networkEntities() {
    return mSubject
            .map(Data::getEntities);
}

你的网络通话应该是这样的

public Observable<Data> getDataFromNetwork() {
    return networkOperation()
            .subscribeOn(mSingleThreadScheduler)
            .subscribe(mSubject);
}
白青青
2023-03-14

我认为使方法线程安全不是一个好主意。因为它会阻塞整个方法从而降低性能。因此建议将数据结构设为线程安全的。在您的情况下,您正在方法中使用List

public Observable<List<Entity>> getEntities() {

}

使用CopyOnWriteArrayList而不是List。它是线程安全的。

public Observable<CopyOnWriteArrayList<Entity>> getEntities() {

}

希望它会起作用。

 类似资料:
  • 我试图使用RxJava和Rome的改型,在你建议使用组件拱之前(在这个机会是不可能的,项目是在和50%和只需要继续与拱清理)。 所以问题是这个。我有一个返回POJO的web服务。类似于这样: null 相互作用者 回调 演示文稿 演示者 查看

  • 使用RxJava,我需要将一个项目流缓冲到3个组中,但如果传入项目之间的间隔超过500ms,则刷新缓冲区。 bufferWithTimeOrCount()操作符正是我想要的,但它似乎只针对RxJS和Rx实现。NET,我需要使用RxJava来实现这一点。 是否有方法复制bufferWithTimeOrCount()的行为,并获得我对现有RxJava 1的期望。x运算符? 尝试每隔500毫秒发出一个新

  • 问题内容: 我试图使用LinkedHashMap实现LRU缓存。在LinkedHashMap的文档(http://docs.oracle.com/javase/7/docs/api/java/util/LinkedHashMap.html)中,它表示: 请注意,如果将密钥重新插入到映射中,则插入顺序不会受到影响。 但是当我做以下推 输出是 这表明重新插入确实影响了订单。有人知道任何解释吗? 问题答

  • 问题内容: 我想在Web Java应用程序中实现重量级对象的简单缓存。但是我不知道该怎么做。 我是否缺少某些东西或ConcurrentHashMap方法(putIfAbsent等)还不够,是否需要额外的同步? 是否有更好的简单API(在内存存储中,没有外部配置)来执行此操作? P. 问题答案: 如果为要缓存的内容临时拥有多个实例是安全的,则可以执行“无锁”缓存,如下所示: 多个线程可以“竞争”来创

  • [//]: # ( 此处删除了setDefer特性,因为支持setDefer的客户端都推荐用一键协程化了。 ) 使用子协程(go)+通道(channel)实现并发请求。 !>建议先看概览,了解协程基本概念再看此节。 实现原理 在onRequest中需要并发两个http请求,可使用go函数创建2个子协程,并发地请求多个URL 并创建了一个chan,使用use闭包引用语法,传递给子协程 主协程循环调用

  • 我正在MVP解决方案中使用RxJava,我希望实现以下场景: > 如果由于任何原因不成功(没有互联网-服务器无法访问-服务器内部错误)显示适当的消息,但也使用缓存的数据填充视图。 制约因素: > 我不想使用任何额外的回调(RX可以做到这一切) 我不想直接从演示者访问本地回购 我尝试的内容: 在我的回购中: 在我的演讲者: 正如我们所知,当我使用时,可观察的源会发生变化,但永远不会发出错误。 如何先