当前位置: 首页 > 知识库问答 >
问题:

RxJava运算符,它结合了两个可观察对象并在第一个可观察对象发出时立即发出

房新翰
2023-03-14

问题:我有一个功能,用户可以输入一个查询字符串,我创建了两个可观察对象,一个用于查询本地DB,另一个用于从API获取结果。这两个操作必须并行运行。我需要尽快显示DB中的结果,当API结果返回时,我需要进行检查以删除本地结果中的重复项。

我的方法:CombineTest似乎是最接近我需要的东西。但问题是,只有当两个可见光都发出结果时,它才会发出。我想我要找的是CombineLatest和Concat操作符的组合。

以下是我迄今为止所做的尝试:

public void performSearchAsync(String query) {
    Observable<List<LocalSearchResult>> localObservable = Observable.just(performLocalSearch(query))
            .subscribeOn(Schedulers.io());
    Observable<Response<List<ApiSearchResult>>> apiObservable = SearchApi.INSTANCE.getSearchResults(query)
            .subscribeOn(Schedulers.io());

    Observable.combineLatest(localObservable, apiObservable, (localSearchResults, listResponse) -> {
        SearchResultWrapper wrapper = new SearchResultWrapper();
        wrapper.localResults = localSearchResults;
        //hold all local result strings in a variable
        List<String> localResultNames = new ArrayList<>();
        for (LocalSearchResult localSearchResult : localSearchResults) {
            localResultNames.add(localSearchResult.name);
        }

        if (!listResponse.isSuccessful()) {
            return wrapper;
        }
        wrapper.apiResults = listResponse.body();

        //Do simple de-dupe
        List<ApiSearchResult> deDupedResults = new ArrayList<>();
        for (ApiSearchResult apiResult : wrapper.apiResults) {
            if (!localResultNames.contains(apiResult.name)) {
                deDupedResults.add(apiResult);
            }
        }
        wrapper.apiResults = deDupedResults;
        return wrapper;
    }).distinctUntilChanged(searchResultWrapper -> searchResultWrapper.localResults)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<SearchResultWrapper>() {
                @Override
                public void onNext(SearchResultWrapper searchResultWrapper) {
                    if (searchResultWrapper == null) return;
                    if (searchResultWrapper.localResults != null && !searchResultWrapper.localResults.isEmpty()) {
                        //add to view
                        applySearchResult(searchResultWrapper.localResults);
                    }
                    if (searchResultWrapper.apiResults == null || searchResultWrapper.apiResults.isEmpty())
                        return;
                    //add to view
                    apiSearchResultsAdapter.updateResults(searchResultWrapper.apiResults);
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onCompleted() {

                }
            });
}

static class SearchResultWrapper {
    List<LocalSearchResult> localResults;
    List<ApiSearchResult> apiResults;
}

在这里,我假设我的本地查询总是比API快,并且万一它更慢,我希望运算符仅在本地DB可观察发出后发出。我知道去重逻辑可以改进,它只是一个示例表示,但我希望它发生在我有两个结果的那个函数中。

我还希望即使API observable抛出错误,也能向用户发送并提供本地结果,尽管我不确定这种结构是否可行。

希望我的解释足够清楚,有人可以建议一个合适的方法来解决这个问题。我也在考虑编写自己的操作符,尽管我听说这很棘手。

共有1个答案

董鸣
2023-03-14

实际上你不需要把它弄得太复杂。只需使用合并conatdoOnNext就可以了。

例如(抱歉,我在这里使用kotlin):

    // actually here is a mistake. Observable.just() receive a item rather than a expression.
    // you should use Observable.fromCallable() instead.
    val localObservable = Observable.fromCallable { performLocalSearch(query) }
        .subscribeOn(Schedulers.io())


    val apiObservable = SearchApi.INSTANCE.getFoodSearchResults(query)
        .doOnNext {
            //do your check here
        }
        .subscribeOn(Schedulers.io())
        .map{
            //map it to the same type with local
        }


    localObservable.mergeWith { apiObservable}

或者,您可以使用concatWith而不是mergeWith,这样可以保证在完成localObservable后调用api。

 类似资料:
  • 我正在努力理解如何合并两个可观察对象并利用它们合并后的产品。我在mergeMap、switchMap、flatMap、大理石图等上看了无数视频,但我仍然不知道合并观测值是如何工作的。我觉得在使用RxJS的时候,我不会有效率,甚至不会正确。 我有一个要订阅的可观测值,我还想订阅代码中特定表单数组的valueChanges可观测值。但是,我需要确保只有在正确构建表单数组之后才能进行第二次订阅,否则将出

  • 上下文正在使用Couchbase在2级文档存储上实现REST CRUD服务。数据模型是指向零个或多个项目文档的索引文档。使用异步get将索引文档检索为可观察的。这后面是一个为每个项目文档检索零个或多个ID的。异步get返回一个可观察的,所以现在我正在创建的可观察的是可观察的 编辑: 你可能已经猜到我是一个被动的新手。@akarnokd的答案让我意识到我想做的是愚蠢的。解决方案是合并项目

  • 我有一个组件订阅服务中的一个可观察对象。该方法反过来订阅另一个服务中的可观察对象。我想将一个数组从最后一个服务传递回第一个服务,然后第一个服务将该数组传递回组件。更具体地说,该组件调用其本地服务,然后调用一个数据服务,该数据服务通过http客户端访问我的数据库。http客户端正在工作,数据服务将数组返回给本地服务。本地服务接收数组,但我不知道如何将该数组作为可观察对象传递回组件。以下是简短的代码块

  • Observables 是多个值的惰性推送集合。它填补了下面表格中的空白: 单个值 多个值 拉取 Function Iterator 推送 Promise Observable 示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1、2、3,然后1秒后会推送值4,再然后是完成流: var observable = Rx.Observable.create(functio

  • 问题内容: 如果我有一个对象,希望能够观察其他几个可观察对象,而不是所有相同类型的对象。例如,我希望A能够观察B和C。B和C完全无关,除了它们都实现Observable之外。 显而易见的解决方案是仅在update方法中使用“ if instanceof”,但很快就会变得混乱,因此我想知道是否还有其他方法? 问题答案: 与以前的建议类似,您可以将更新更改为。 这样您可以添加一种方法 对于您要观察的每

  • 我正在用RxJava在Android中制作计时器。我需要在RxJava中制作一个计时器,以便每秒发出一个可观察的信号。我试过以下方法,但没有成功。有没有想过我做错了什么?