当前位置: 首页 > 知识库问答 >
问题:

如何用RxJava2并行生成多个非阻塞服务请求

邓昊天
2023-03-14

我需要一些使用RxJava2实现并行异步调用的帮助

1) 我有多个保险公司(目前我只接受两个),我需要使用该保险公司的名称发送多个并行请求。

2)如果其中任何一个给服务器错误,那么剩余的请求不应该被阻止。

以下是我迄今为止所做的尝试;

ArrayList<String> arrInsurer = new ArrayList<>();
        arrInsurer.add(AppConstant.HDFC);
        arrInsurer.add(AppConstant.ITGI);

        RequestInterface service = getService(ServiceAPI.CAR_BASE_URL);
        for (String insurerName : arrInsurer) {
            service.viewQuote(Utils.getPrefQuoteId(QuoteListActivity.this), insurerName)
                    .subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ViewQuoteResDTO>() {
                        @Override
                        public void accept(@NonNull ViewQuoteResDTO viewQuoteResDTO) throws Exception {
                            Log.e("Demo", viewQuoteResDTO.getPremiumData().getIDV()+"");
                             updateList();
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e("Demo", throwable.getMessage());
                        }
                    });
        }

private RequestInterface getService(String baseUrl) {      
    Gson gson = new GsonBuilder()
            .setLenient()
            .create();

    return new Retrofit.Builder()
            .baseUrl(baseUrl)                
           .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create(gson))
            .build().create(RequestInterface.class);

}

现在,只有当两个请求都成功响应时,上述代码才能正常工作。但是当任何请求作为内部服务器错误给出响应时,其余请求也会被阻止。

当任何请求给出失败响应时,我得到以下日志错误;

E/Demo: HTTP 500 Aww Snap, Some thing happened at server. Please try back again later.
E/Demo: unexpected end of stream on Connection{100.xxx.xxx.xx:portNo, proxy=DIRECT@ hostAddress=/100.xxx.xxx.xx:portNo cipherSuite=none protocol=http/1.1}

如何处理此错误?

共有1个答案

丁鸿云
2023-03-14

我想像其他与Rx相关的问题一样,这有多个答案。我会给你我在我们的应用程序中使用的,并准确解决这个用例。希望有帮助。

简短版本-这依赖于mergeDelayError。在这里查看

为什么合并?因为与concat不同,它将并行执行可观察对象。为什么mergeDelayError?它会延迟错误...本质上它会执行每个可观察对象并在一切完成时传递错误。这确保即使一个或多个错误,其他错误仍将被执行。

您必须小心一些细节。事件的顺序不再保留,这意味着合并运算符可能会交错一些可观察的事件(考虑到您以前的做事方式,这应该不是问题)。据我所知,即使多个可观察对象失败,您也只会得到一个onError调用。如果这两个都可以,那么您可以尝试以下操作:

List<Observable<ViewQuoteResDTO>> observables = new ArrayList<>();
for (String insurerName : arrInsurer) {     
   observables.add(service.viewQuote(
         Utils.getPrefQuoteId(QuoteListActivity.this), insurerName));
}

Observable.mergeDelayError(observables)
          .subscribeOn(Schedulers.computation())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(/* subscriber calls if you need them */);

其思想是创建您要运行的所有可观察对象,然后使用mergeDelayError触发它们。

 类似资料:
  • 现在我们已经知道了Java NIO里面那些非阻塞特性是怎么工作的,但是要设计一个非阻塞的服务仍旧比较困难。非阻塞IO相对传统的阻塞IO给开发者带来了更多的挑战。在本节非阻塞服务的讲解中,我们一起来讨论这些会面临的主要挑战,同时也会给出一些潜在的解决方案。 查找关于设计非阻塞服务的相关资料是比较难的,本文提出的解决方案也只能是基于笔者个人的工作经验,构思。如果你有其他的解决方案或者是更好的点子,那么

  • 问题内容: 我有两个分开的阻塞队列。客户端通常使用第二个阻塞队列中的第一个来检索要处理的元素。 在某些情况下,客户端对两个阻塞队列中的元素感兴趣,无论哪个队列首先提供数据。 客户端如何并行等待两个队列? 问题答案: 您可以尝试在某种循环中使用该方法,以仅在指定时间量内等待一个队列,然后再轮询另一个队列。 除此之外,我会说在另一个线程上为每个队列运行阻塞操作并为您的主应用程序提供回调接口是另一个稍微

  • 主要内容:1 非阻塞服务器-GitHub仓库,2 无阻塞IO管道,3 非阻塞与阻塞IO管道,4 基本的无阻塞IO管道设计,5 读取部分消息,6 存储部分消息,7 编写部分消息,8 总结,9 服务器线程模型即使你了解了Java NIO非阻塞功能如何工作(Selector,Channel, Buffer等),设计一个无阻塞服务器仍然很难。与阻塞IO相比,非阻塞IO包含多个挑战。这份非阻塞服务器教程将讨论非阻塞服务器的主要挑战,并为它们描述一些潜在的解决方案。 本教程中描述的思想是围绕Java NIO

  • 我正在研究反应式编程,我怀疑它是否是非阻塞IO的Java REST web服务的实现。Java Servlet 3.1规范引入了一些接口,以实现非阻塞web请求。 我的问题是: Netty是否实现了该规范,而Tomcat、JBoss和Jetty没有实现 谢谢。

  • 我是新进入Vert. x的。我遵循了Vert. x文档和一些教程。但是我混淆了使用Vert. x实现非阻塞REST Web服务的正确方法是什么。我在Java中找到了这篇文章开发非阻塞Web应用程序,其中包含一个使用Vert. x实现非阻塞Web应用程序的示例。 此代码块包含向另一个Vertical(“todoService”:TodoServiceVerticle)发送消息。 这是“todoSer

  • 问题内容: 我在芹菜中使用Python进行大量的(〜10 / sec)API调用(包括GET,POST,PUT,DELETE)。每个请求大约需要5-10秒才能完成。 我尝试在池中运行芹菜工人,并发数为1000。 由于正在阻塞进程,每个并发连接都在等待一个请求。 如何使异步? 问题答案: 使用eventlet Monkey patching使所有纯python库都无阻塞。 补丁单库 import e