我有一个<代码>列表
这些要求应该同时提出。所有请求完成后,我想有一个<code>列表
所以我创建了一个方法来发出HTTP请求
public Flux<JsonNode> performGetRequest(String url) {
WebClient webClient = WebClient.create(String.format("%s%s", API_BASE_URL, url));
return webClient.get()
.retrieve()
.bodyToFlux(JsonNode.class);
}
上面的方法是这样调用的
public List<CustomModel> fetch(List<String> urls) {
return Flux.fromIterable(urls)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(this::performGetRequest)
.flatMap(jsonNode -> Flux.fromIterable(customDeserialize(jsonNode)))
.sequential()
.collectList()
.flatMapMany(Flux::fromIterable)
.collectList()
.block();
}
对于每个响应,我都使用一个自定义方法来反序列化响应
private List<CustomModel> customDeserialize(final JsonNode jsonNodeResponse) {
List<CustomModel> customModelList = new ArrayList<>();
for (JsonNode block : jsonNodeResponse) {
// deserialize the response, create an instance of CustomModel class
// and add it to customModelList
}
return customModelList;
}
问题是,即使我使用了< code>parallel()方法,整个过程也可能不是并行运行的。完成的时间说明我做错了。
我错过了什么吗?
我不能100%确定这是否是这里的问题,但我注意到在使用WebClient
和ParallelFlux
时,WebClient
仅返回响应的Publisher
(体ToMono
/体ToFlux
),而不是实际请求。
考虑使用Fluc. defer
/Mono.defer
包装远程调用,以获取已经用于请求的Publisher
,例如:
.flatMap(url -> Flux.defer(() -> performGetRequest(url)))
问题是,即使我使用了parallel()方法,整个过程也可能不是并行运行的。完成的时间说明我做错了。
我错过了什么吗?
由于您正在调用<code>block</code>,我将假设您正在运行一个MVC servlet应用程序,该应用程序仅将<code>WebClient</code〕用于rest调用。
如果您没有运行完整的webflux应用程序,那么您的应用程序将启动一个单独的事件循环,该循环将处理计划的所有事件。如果运行一个完整的webflux应用程序,您将获得与运行机器上的内核一样多的事件循环。
通过使用< code>parallel,反应器文件说明:
要获得ParallelFlux,您可以在任何Flux上使用并行()
运算符。就其本身而言,此方法不会并行化工作。相反,它将工作负载划分为“rails”(默认情况下,与CPU内核一样多的rails)。
为了告诉产生的ParallelFlux在哪里运行每个rails(并通过扩展来并行运行rails ),您必须使用runOn(Scheduler)。注意,对于并行工作,有一个推荐的专用调度器:Schedulers.parallel()。
您正在创建一个有界弹性
调度程序,该调度程序未针对并行工作进行优化。
但我想提一下,您正在做async i/o
而不是并行
工作,这是非常重要的一点。当您并行运行时,您很可能不会获得任何性能提升,因为您的大部分i/o都会触发请求,然后只是等待响应。
<code>ParellelFlux</code>将确保所有的cpu内核都被使用,但也有一些惩罚。有一个设置时间来确保所有核心都能开始工作,然后需要完成的工作不是cpu密集型的,他们只会发出1000个请求,然后所有线程都完成了,必须等待响应。
需要在内核上设置工作线程,需要将信息发送到每个内核,检索等。
当您有CPU密集型工作时,并行获得了大部分好处,其中每个事件都需要在多个内核上执行繁重的计算。但是对于async
工作,常规的Flux
很可能就足够了。
以下是Reactor开发人员之一Simon Baslé关于在Reactor中运行I / O工作,并行与异步
另外值得一提的是,有界弹性
调度器被调整为阻塞工作,作为纯 webflux 应用程序中常规 servlet 行为的回退。
您正在servlet应用程序中运行webflow,因此您获得的好处可能不如webflow应用程序那么全面。
我正在使用Java VertX框架,并尝试使用VertX WebClient和一个简单的HTTP请求加载多个JSON对象。我想并行地做这件事,这样可以加快进程。 我有一个endpoint对象: 在另一个类中,我有以下应该并行处理的函数(源代码): 我不知道如何继续下去。VertX WebClient强制我使用异步处理程序,这意味着我不能直接返回JsonObject。
问题内容: 我有一个Android应用程序,当前正在使用库来发出网络请求并使用来显示下载的图像。 我想测试的功能,由于我需要运行大量请求(数千个),因此我有点担心并行执行。处理的并行请求,将并行运行的请求限制为四个,而其他请求排队等待执行。在文档中,我找不到任何方法来处理并发请求的数量,而且我怀疑此类详细信息留给了此库中的开发人员。 这样对吗?如果是这样,是否有任何面向Android的实现/库可用
我在多个线程上运行以下方法: HTTP请求是并行处理的吗?以这种方式发出请求是否会阻止其他线程发送请求,直到第一个响应到达?
问题内容: 我正在使用Node.js运行服务器,并且需要从正在运行的另一台服务器()请求数据。我需要向数据服务器发出许多请求(〜200)并收集数据(响应大小从〜20Kb到〜20Mb不等)。每个请求都是独立的,我想将响应保存为以下形式的一个巨大数组: 请注意,项目的顺序并不重要,理想情况下,它们应该以数据可用的顺序填充数组。 现在,当运行该程序时,它将显示: 现在,由于文件的大小如此可变,我期望它可
问题内容: 问题 我有两个Apis。Api 1为我提供了一个项目列表,Api 2为我提供了我从Api 1获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能下降。 问题 借助Retrofit和RxJava,可以快速有效地解决此问题。 我的方法 当下,我的解决方案如下所示: 步骤1:从Api 1 执行改造。 第2步:我遍历此项目,并向Api 2请求每个项目。 步骤3:对每个项目依次执行改造
1.3 新版功能. 默认情况下,Fabric 会默认 顺序 执行所有任务(详细信息参见 Execution strategy ),这篇文档将介绍 Fabric 如何在多个主机上 并行 执行任务,包括 Fabric 参数设置、任务独立的装饰器,以及命令行全局控制。 它是如何运转的 由于 Fabric 1.x 并不是完全线程安全(以及为了更加通用,任务函数之间并不会产生交互),该功能的实现是基于 Py
我试图使用Jackson库读取Kafka主题中的字符串,并从另一个流执行连接。 这是一个包含两个数据流的示例代码。我想对这些消息流执行连接操作。 例如,传入的流是: 连接条件是。我如何在Flink中实现这一点? 数据流 1: 数据流 2:
我在spring mvc 3.2.2中使用apache http客户端同步发送5个get请求,如图所示。 如何异步(并行)发送所有这些内容并等待请求返回,以便从所有 GET 请求返回已解析的有效负载字符串?