当前位置: 首页 > 面试题库 >

RxJava合并请求序列

景震博
2023-03-14
问题内容

问题

我有两个Apis。Api 1为我提供了一个项目列表,Api 2为我提供了我从Api 1获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能下降。

问题

借助Retrofit和RxJava,可以快速有效地解决此问题。

我的方法

当下,我的解决方案如下所示:

步骤1:Single<ArrayList<Information>>从Api 1 执行改造。

第2步:我遍历此项目,并向Api 2请求每个项目。

步骤3:Single<ExtendedInformation>对每个项目依次执行改造退货

步骤4:在完全执行Api 2的所有调用之后,我为所有包含信息和扩展信息的项创建一个新对象。

我的密码

 public void addExtendedInformations(final Information[] informations) {
        final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
        final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
            @Override
            public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
                informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
                if (informationDetailArrayList.size() >= informations.length){
                    listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
                }
            }
        };

        for (Information information : informations) {
            getExtendedInformation(ratingRequestListener, information);
        }
    }

    public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
        Single<ExtendedInformation> repos = service.findForTitle(information.title);
        disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
            @Override
            public void onSuccess(ExtendedInformation extendedInformation) {
                    ratingRequestListener.onDownloadFinished(information, extendedInformation);
            }

            @Override
            public void onError(Throwable e) {
                ExtendedInformation extendedInformation = new ExtendedInformation();
                ratingRequestListener.onDownloadFinished(extendedInformation, information);
            }
        }));
    }

    public interface RatingRequestListener {

        void onDownloadFinished(Information information, ExtendedInformation extendedInformation);

    }

问题答案:

tl; dr 异步地或在调度程序上使用concatMapEagerflatMap执行子调用。

很长的故事

我不是android开发人员,所以我的问题将仅限于纯RxJava(版本1和版本2)。

如果我看对图片,所需的流程是:

some query param 
  \--> Execute query on API_1 -> list of items
          |-> Execute query for item 1 on API_2 -> extended info of item1
          |-> Execute query for item 2 on API_2 -> extended info of item1
          |-> Execute query for item 3 on API_2 -> extended info of item1
          ...
          \-> Execute query for item n on API_2 -> extended info of item1
  \----------------------------------------------------------------------/
      |
      \--> stream (or list) of extended item info for the query param

假设Retrofit为

interface Api1 {
    @GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}

interface Api2 {
    @GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}

如果项目的顺序不重要,则flatMap只能使用:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

仅当 改造生成器配置有

  • 要么使用异步适配器(调用将在okhttp内部执行程序中排队)。我个人认为这不是一个好主意,因为您无法控制此执行器。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
    
  • 或使用基于调度程序的适配器(调用将在RxJava调度程序上进行调度)。这是我的首选,因为您明确选择了使用哪个调度程序,因此很可能是IO调度程序,但是您可以自由尝试其他调度程序。

    .addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
    

原因是flatMap将订阅由创建的每个可观察对象api2.extendedInfo(...),并将它们合并到结果可观察对象中。因此结果将按接收顺序显示。

如果 改装客户端 没有 设置为异步或组对调度运行,也可以设置一个:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

这种结构几乎与先前的执行相同,它 在本地 指示api2.extendedInfo应该在每个调度程序上运行。

可以调整的maxConcurrency参数flatMap以控制您想同时执行多少个请求。尽管我对此会保持谨慎,但您不想同时运行所有查询。通常,默认值maxConcurrency足够好(128)。

现在,如果原始查询的顺序很重要
concatMap通常是运算符,它flatMap按顺序执行相同的操作,但是顺序执行,如果代码需要等待所有子查询执行,则结果会很慢。尽管解决方案与相比又迈出了一步concatMapEager,这一步将按顺序订阅可观察的内容,并根据需要缓冲结果。

假设改造客户端是异步的或在特定的调度程序上运行:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()))
    .subscribe(...)

或者,如果必须在本地设置调度程序:

api1.items(queryParam)
    .flatMap(itemList -> Observable.fromIterable(itemList)))
    .concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
    .subscribe(...)

也可以在此运算符中调整并发性。

另外,如果Api返回Flowable,则可以.parallel在RxJava
2.1.7中使用仍处于beta状态的Api。但是结果却是不规则的,我还不知道有没有排序的方法(还可以吗?)。

api.items(queryParam) // Flowable<Item>
   .parallel(10)
   .runOn(Schedulers.io())
   .map(item -> api2.extendedInfo(item.id()))
   .sequential();     // Flowable<ItemExtended>


 类似资料:
  • 设置您的本地环境 步骤1: 复制 步骤2: 构建 步骤3: 分支 进行更改 步骤4: 编写代码 步骤5: 提交更改 提交代码说明的指导 步骤6:变基 步骤7: 测试 步骤8: 推送代码 步骤9: 新建一个合并代码请求 步骤10: 讨论和更新 批准和请求更改工作流程 步骤11: 执行合并 持续集成测试 设置您的本地环境 步骤1: 复制 在 GitHub 上复制项目到你的账号并把项目克隆到本地。 $

  • GitLab可以引用提交消息中的特定问题来解决特定的问题。 在本章中,我们将讨论如何在GitLab中引用问题: 步骤(1): 要引用问题,您需要创建问题的问题编号。 要创建问题,请参阅创建问题章节。 步骤(2): 要查看创建的问题,请单击Issues选项卡下的List选项: 步骤(3): 在对本地存储库进行更改之前,请使用以下命令检查它是否为最新版本: 命令从远程服务器下载最新的更改并直接集成到当

  • 主要内容:合并请求的步骤合并请求可用于对项目其他人员之间所做的代码进行交换,与他们讨论更改。 合并请求的步骤 步骤(1): 在创建新的合并请求之前,应该在GitLab中创建一个分支。 您可以参考本章创建分支: 步骤(2): 登录到您的GitLab帐户并转到项目部分下的项目: 步骤(3): 点击选项卡,然后点击New merge request 按钮: 步骤(4): 要合并请求,请从下拉列表中选择源分支和目标分支,然后单击

  • 我有两个可观测值,一个返回1个元素,另一个返回多个元素。我的目标是在不阻塞的情况下将它们合并在一起,以构建如下对象: 我试过压缩、合并和合并,但似乎都不是解决方案。

  • 我必须进行N次REST API调用并合并所有调用的结果,如果至少有一次调用失败(返回错误或超时),则会失败。我想使用RxJava,我有一些要求: 能够在某些情况下配置每个api调用的重试。我的意思是,如果我有一个重试=2,我提出3个请求,每个请求最多必须重试2次,总共最多6个请求 如果我想用一个线程发出所有请求,我需要一个异步Http客户端,不是吗? 谢谢

  • 主要内容:RxJava 合并运算符 介绍,RxJava 合并运算符 示例RxJava 合并运算符 介绍 以下是用于从多个 Observable 创建单个 Observable 的运算符。 运算符 描述 And/Then/When 使用模式和计划中介组合项目集。 CombineLatest 通过指定的函数组合每个 Observable 发出的最新项并发出结果项。 Join 如果在第二个 Observable 发射项目的时间范围内发送,则组合两个 Observable 发