当前位置: 首页 > 知识库问答 >
问题:

缓存项提供程序的RxJava运算符

任小云
2023-03-14

我正在尝试实现一个提供程序,它按以下顺序在内存、磁盘、网络中查找项目。这样做的主要目的是,如果我有正确的本地缓存,就可以避免网络调用。有一个陷阱,因为我对网络的调用使用过滤器来获取项目,我可以从本地查询中获得10个项目,但仍然需要转到网络,因为这些项目来自不同的网络调用,具有不同的查询参数。

现在,我正在将concat与firstOrDefault一起使用,检查列表是否为null或空。我已经实现了一种方法来检查我是否已经用特定的查询调用了服务器,并在从磁盘读取时使用它返回null。

我现在需要优化提供者,以便它:

  1. 发出本地项目
  2. 如果需要就上网
  3. 发出在线项目

(现在它停在第一个好的项目列表上)。

我正在尝试takeWhile,使用一种方法,如果数据为null或空,或者如果我还没有为该查询调用服务器,则返回true。问题是,如果对该项目的检查为false,takeWhile不会发出该项目,这意味着我不会得到最后一个好项目(也是最好的项目)。

我能想到的最好的解决方案是一个操作员发出项目,直到某个条件出现,然后自己退订。我找不到一个。

编辑:一些代码解决方案1)使用firstOrDefault:如果!则不会发出本地项目!DiskService.was下载(),因为DiskService返回空列表

public Observable<List<Item>> items() {
    List<Observable> obs = new ArrayList<>();

    Observable<List<Item>> memoryObs = Observable.defer(this::getMemoryItems);

    Observable<List<Item>> diskObs = Observable.defer(this::getDiskItems);

    Observable<List<Item>> networkObs = Observable.defer(this::getNetworkItems);

    Observable<List<Item>> concat = Observable.concat(memoryObs, diskObs, networkObs;


    return concat.firstOrDefault(new ArrayList<>(), this::canAccept);
}

private boolean canAccept(List<Item> data) {
    return data != null && data.size() > 0;
}

//Method in DiskService
public boolean wasDownloaded(){
    return true if the query was performed on the server, false otherwise.
} 

解决方案2)使用takeWhile。takeWhile的问题是,Observable不会发出不检查其状态的项,这意味着我不会得到最佳列表。黑客的解决方案是将错误检查推迟到下一项,但这样即使在不必要时也会触发网络请求。在这个解决方案中,我使用了一个只包含列表的TrustedItemList和一个布尔值,它告诉被观察者是否可以信任一个非空的项目列表(对于内存和网络总是true,对于磁盘总是true,如果下载了则为true())

public Observable<List<Item>> items() {
    List<Observable> obs = new ArrayList<>();

    Observable<TrustedItemList> memoryObs = Observable.defer(this::getMemoryItems);

    Observable<TrustedItemList> diskObs = Observable.defer(this::getDiskItems);

    Observable<TrustedItemList> networkObs = Observable.defer(this::getNetworkItems);

    Observable<TrustedItemList> concat = Observable.concat(memoryObs, diskObs, networkObs;


    return concat.takeWhile(this::shouldContinueSearching)
                 .filter(trustedItemList -> trustedItemList.items != null && !trustedItemList.items.isEmpty())
                 .map(trustedItemList -> trustedItemList.items);
}

private boolean shouldContinueSearching(TrustedPoiList data) {
     return data == null || data.items == null || data.items.isEmpty() || !data.canTrustIfNotEmpty; 
}

共有1个答案

竺鸿骞
2023-03-14
匿名用户

我最终使用了一个自定义的可观察的。运营商,无耻地从运营商Takewhile复制而来,只需更改呼叫用户。onNext(t)就在订阅服务器之前。onCompleted()在onNext方法中。这样,将发出最后一个项,即在布尔检查中返回false的项。

public final class OperatorTakeWhileWithLast<T> implements Observable.Operator<T, T> {

    private final Func2<? super T, ? super Integer, Boolean> predicate;

    public OperatorTakeWhileWithLast(final Func1<? super T, Boolean> underlying) {
        this((input, index) -> {
            return underlying.call(input);
        });
    }

    public OperatorTakeWhileWithLast(Func2<? super T, ? super Integer, Boolean> predicate) {
        this.predicate = predicate;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
        Subscriber<T> s = new Subscriber<T>(subscriber, false) {
            private int counter = 0;
            private boolean done = false;

            @Override
            public void onNext(T t) {
                boolean isSelected;
                try {
                    isSelected = predicate.call(t, counter++);
                } catch (Throwable e) {
                    done = true;
                    Exceptions.throwIfFatal(e);
                    subscriber.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    unsubscribe();
                    return;
                }
                if (isSelected) {
                    subscriber.onNext(t);
                } else {
                    done = true;
                    subscriber.onNext(t); //Just added this line
                    subscriber.onCompleted();
                    unsubscribe();
                }
            }

            @Override
            public void onCompleted() {
                if (!done) {
                    subscriber.onCompleted();
                }
            }

            @Override
            public void onError(Throwable e) {
                if (!done) {
                    subscriber.onError(e);
                }
            }
        };
        subscriber.add(s);
        return s;
    }
}

我的items()方法(解决方案2)现在以以下内容结束:

return concat.lift(new OperatorTakeWhileWithLast<TrustedItemList>(this::shouldContinueSearching))  
             .filter(trustedItemList -> trustedItemList.items != null && !trustedItemList.items.isEmpty()) 
             .map(trustedItemList -> trustedItemList.items); 

 类似资料:
  • 在使用JPA时,Hazelcast是否支持Hibernate的分布式和远程二级缓存? 我有客户机/服务器体系结构,我的问题是使用JPA的数据访问操作在客户端。。。,当缓存是远程服务器时。 请给我一个代码样本,让我在两边都做?(在JPA侧和Hazelcast侧) 非常感谢!

  • 对于Jcache配置,我们需要有通用的XML配置参数(如timetolive)<我们正在使用EhCache进行开发,并可能在其他环境中使用其他符合Jsr107的缓存提供程序,如Infinispan。 是否可以让两个缓存提供程序都使用一个配置文件,并且如果需要,我们只需要为不同的环境更改一些参数? 可以在属性文件中定义这些属性并使用它们根据配置文件初始化缓存管理器吗? 我浏览了jsr107留档,但没

  • 我想为我的客户和API建立契约测试。我的API不能在本地运行,所以我希望能够在部署到生产之前,针对已部署的API临时版本运行提供程序测试。 我在网上看到的提供程序测试的大多数示例都使用了localhost。当尝试对我部署的HTTPSendpoint运行提供程序测试时,测试失败,显示。是不支持HTTPS协议,还是我遗漏了什么? 使用pact-provider-verifier cmd line工具工

  • 我使用web和缓存依赖项创建Spring Boot2.0 Starter项目: 然后我更新了Spring bootstrap类来测试REST服务缓存: 该应用程序启动失败后出现异常: 原因:java.lang.IllegalArgumentException:无法自动配置缓存管理器,请检查您的配置(缓存类型为“Caffeine”)(在org.springframework.util.assert.

  • 我们正在尝试更改我们的ASP。NET会话状态在Azure中超时,并注意到ASP。NET会话过期,命名缓存设置过期。换个网站就够了吗。为Azure的自定义会话状态提供程序配置sessionState timeout属性,还是我们需要更改服务配置中的命名缓存设置以影响会话超时? 根据MSDN,HttpSessionState。超时属性是会话状态提供程序终止会话之前请求之间允许的时间量,以分钟为单位。这

  • 我看到的是一个rxjava操作符,它等待另一个observable发出一个条目来观察一个条目。我可以用flatMap和map运算符来完成,但我只是想知道是否有一个运算符可以完成这项工作。我在找takeUntil操作员的对立面。我还想让它在等待其他可观察的项目时缓冲项目。