我需要在现有的fat代码中创建一个服务,以从4个API中获取结果,我需要合并它们并重新格式化每个响应,但由于4个调用,我不知道如何同时进行,所以速度非常慢。我也无法更改main来添加< code>Runnable或main中的此类执行器,因为它可能会对另一个代码产生滚雪球效应。
因此,目前,我制作了一个控制器来处理请求,一个从用户那里获取请求并调用5种不同服务中间件(SM)功能的服务。每个SM函数都用于调用外部API,在每个SM中,我也重新格式化了API的每个返回映射。我使用<code>java.net。HttpURLConnection来执行API调用。因此,我的API“工作”了,但不能超过4秒。这些API需要额外的OAuth,因此总共大约需要10个API调用。
由于API调用的当前返回是对象
类型,我可以将其视为映射
,并通过对其中的数据执行循环来重新格式化输出。因此,SM函数的代码可能与以下类似:
token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
List<Map> data = (List) ((Map) response.get("output")).get("data");
List<Map> result = new HashMap();
for(Map m : data) {
Map temp = new HashMap();
temp.put("name", m.get("Name"));
temp.put("health_status", m.get("HealthStatus"));
result.add(temp);
}
// This format is mandatory
Map finalResult = new HashMap();
finalResult.put("output", result);
finalResult.put("status", "OK");
return finalResult;
sendHttpRequest
是发送请求的方法,将参数序列化为JSON,并将API输出反序列化为对象
。以下是 sendHttpRequest
的样子:
CloseableHttpClient httpClient = HttpClients.custom()
.setSSLSocketFactory(csf)
.build();
HttpComponentsClientHttpRequestFactory requestFactory =
new HttpComponentsClientHttpRequestFactory();
requestFactory.setConnectTimeout(this.connectTimeOut);
requestFactory.setReadTimeout(this.readTimeOut);
requestFactory.setHttpClient(httpClient);
RestTemplate rt = new RestTemplate(requestFactory);
HttpEntity<Map> request = null;
if(method.equals("POST"))
request = new HttpEntity<Map>(objBody, headers);
else if(method.equals("GET"))
request = new HttpEntity<Map>(headers);
try {
ResponseEntity<Map> response = null;
if(method.equals("POST"))
restTemplate.postForEntity(url, request , Map.class);
if(method.equals("GET"))
restTemplate.postForEntity(url, request , Map.class);
if(this.outputStream){
logger.debug("Output : " + response.getBody());
}
return response.getBody();
} catch(HttpClientErrorException e) {
logger.debug(e.getMessage());
}
sendHttpRequest
方法也是一种现有的方法,我不允许更改,除非我只是创建一个新方法来执行我的请求。
简单地说,以下是我需要做的事情:
> < li>
对于每个API调用:
在所有API完成调用后,我需要做:
我曾尝试使用ExecutorCompletionService
调用5条SMs。我还创建了一个内部类,实现了可调用
。
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);
List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
// i here is used to define which api calls to be done
results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}
for (int i=0; i < results.size(); i++) {
try {
Map result = (Map) completionService.take().get();
int code = (int) result.get("code");
// Collect the results for each SM (SM function has described above)
} catch (Exception e) {
logger.debug(e.getMessage());
}
}
// Merge the outputs.
在合并输出中,我需要构造地图,所以它是这样的:
{
"details": {"api1": {...}, "api3": {...}},
"list_items": [{...}, {...}, ...], // Results of sorted merged lists from api2 & api4
"api5": [{...}, {...}, {...}, ...]
}
同时,从api响应来看,基本上我只是在存在时检索它们的所有< code>output_schema。
有没有优化和加速这个API调用的技巧,这样在调用次数相同的情况下,可以执行得更快???非常感谢任何帮助。
我已经阅读了@Anant apadmanabhan的回答,但我似乎需要更改我无法做到的主类文件。或者实际上是否可以在主类中不使用@EnableAsync而应用CompletableFuture
的使用?我还想知道如何在更快的时间内完成这件事,即使使用CompletableFuture和EnableAsync使用这个流程链。
如果所有4个api调用都是相互独立的,并且您使用的是java 8,则可以根据需要将它们提取到单独的服务层中的单独函数,并在方法上使用spring @Async
注释以及ComppletableFuture
作为返回类型进行并行调用。
@Service
public class TestClient {
RestTemplate restTemplate = new RestTemplate();
@Async
public CompletableFuture<List<TestPojo>> getTestPojoByLanguage(String language) {
String url = "https://test.eu/rest/v2/lang/" + language + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);
return CompletableFuture.completedFuture(Arrays.asList(response));
}
@Async
public CompletableFuture<List<TestPojo>> getCountriesByRegion(String region) {
String url = "https://testurl.eu/rest/v2/region/" + region + "?fields=name";
Country[] response = restTemplate.getForObject(url, Country[].class);
return CompletableFuture.completedFuture(Arrays.asList(response));
}
}
完整的未来指南。
你尝试的解决方案在我看来相当不错:
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);
List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
// i here is used to define which api calls to be done
results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}
for (int i=0; i < results.size(); i++) {
try {
Map result = (Map) completionService.take().get();
int code = (int) result.get("code");
// Collect the results for each SM (SM function has described above)
} catch (Exception e) {
logger.debug(e.getMessage());
}
}
// Merge the outputs.
我不太确定,除了可能更流畅的API之外,使用<code>CompletableFuture</code>是否会给您带来与程序性能相关的任何好处-本主题已在本文中进行了广泛讨论,请参见示例1 2 3-但这是一个可能的解决方案。
事实上,下一个代码基于我之前的一个答案,反过来又与Tomasz Nurkiewicz博客中的这篇文章密切相关。
您提供的代码的CompletableFuture
对应部分如下所示:
ExecutorService executor = Executors.newFixedThreadPool(5);
// List of the different parameters to perform every external API invocations
final List<Map> smParameters = Arrays.asList(
...
);
// Submit invoke external task to the thread pool
final List<CompletableFuture<Map>> futures = smParameters.stream().
map(paramMap -> CompletableFuture.supplyAsync(() -> invokeExternalAPI(paramMap), executor)).
collect(Collectors.<CompletableFuture<Map>>toList())
;
// The next code is based on the sequence method proposed in the blog I cited
// The idea is to turn the `List<CompletableFuture<Map>>` we have into a
// CompletableFuture<List<Map>> with the results of every single async task
final CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.<Map>toList())
);
// Merge the outputs.
final Map result = allDone.thenAccept(results ->
// Merge the outputs. The results variable contains the different Mapz
// obtained from the every different API invocation
);
请验证上面的代码,可能需要定义Map
对象的不同参数的类型。
提到的调用ExternalAPI
可以接受Map
,其中包含执行单个API调用所需的不同参数,例如:
private Map invokeExternalAPI(Map configuration) {
// Pass and extract from the configuration the authUrl, etcetera, everything you need to
// Your code...
token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
List<Map> data = (List) ((Map) response.get("output")).get("data");
List<Map> result = new HashMap();
for(Map m : data) {
Map temp = new HashMap();
temp.put("name", m.get("Name"));
temp.put("health_status", m.get("HealthStatus"));
result.add(temp);
}
// This format is mandatory
Map finalResult = new HashMap();
finalResult.put("output", result);
finalResult.put("status", "OK");
return finalResult;
}
我认为您不需要修改主类或任何配置,因为该解决方案完全基于Java。
请记住,这种通用方法可以定制,以适应不同的要求。
例如,根据您的评论,您似乎需要从服务中调用在不同服务中间件中实现的功能。
为了定义要同时执行的任务列表,您可以尝试以下方法,而不是我最初的建议:
List<CompletableFuture<Map>> futures = new ArrayList<>(5);
// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = CompletableFuture.supplyAsync(() -> sm1.doYourStuff(), executor);
futures.add(sm1Cf);
// Now obtain a reference to the second middleware, and submit it again
final ServiceMiddleware2 sm2 = new ServiceMiddleware2();
final CompletableFuture<Map> sm2Cf = CompletableFuture.supplyAsync(() -> sm2.doYourStuff(), executor);
futures.add(sm2Cf);
// the rest of service middleware. I think here a common interface
// or some kind of inheritance could be of help in the invocation
// At the end, you will get the list of futures you wanna execute in parallel
// The rest of the code is the same
final CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.<Map>toList())
);
// Merge the outputs.
final Map result = allDone.thenAccept(results ->
// Merge the outputs. The results variable contains the different Mapz
// obtained from the every different API invocation
);
要处理错误,您有几种选择。
一个显而易见的方法是处理服务中间件本身的错误,这样它就不会引发任何异常,而是在其结果<code>映射</code>中返回某种信息,如结果代码、状态等。
CompletableFuture本身也为您提供了不同的选项来处理错误。由于您可能需要对结果Map
执行一些更改,因此您可以在必要时使用句柄
方法。它基本上将结果和在执行与CompletableFuture
关联的任务时获得的假设异常作为参数,并根据该结果和可能的错误返回具有适当自定义的新CompletableFuture
。例如,在您的第4和第5个服务中间件中,这似乎会引发错误,您可以使用以下内容:
final ServiceMiddleware4 sm4 = new ServiceMiddleware4();
final CompletableFuture<Map> sm4Cf = CompletableFuture.supplyAsync(() -> sm4.doYourStuff(), executor)
.handle((result, exception) -> {
if (exception == null) {
return result;
}
Map actualResult = new HashMap();
actualResult.put("errorCode", "xxx")
actualResult.put("errorMessage", exception.getMessage());
return actualResult;
});
)
;
futures.add(sm4Cf);
例如,这篇伟大的文章详细解释了进一步的错误处理方法。
所有这些方法都假设您的代码不会抛出检查过的异常。如果你需要处理这些问题,根据你的评论,你可以使用霍尔格在这个答案中发布的代码的修改版本。我们的想法是创建一个处理检查异常的方法,必要时使用适当的错误完成该方法:
public static <T> CompletableFuture<T> supplyAsync(Supplier supplier, Executor executor) {
CompletableFuture<T> f=new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try { f.complete(supplier.get()); } catch(Throwable t) { f.completeExceptionally(t); }
}, executor);
return f;
}
然后,使用此方法提交每个服务中间件任务:
List<CompletableFuture<Map>> futures = new ArrayList<>(5);
// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = supplyAsync(() -> sm1.doYourStuff(), executor)
// this method will only be executed if any exception is thrown
.exceptionally(exception -> {
Map errorResult = new HashMap();
errorResult.put("errorCode", "xxx")
errorResult.put("errorMessage", exception.getMessage());
return errorResult;
});
futures.add(sm1Cf);
// Apply a similar logic to the rest of services middlewares...
// The rest of the code is the same as above
final CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.<Map>toList())
);
// Merge the outputs.
// Please, be aware that in the lambda expression results
// is a List of the different Maps obtained as the individual
// results of every single service middleware call
// I would create an object that agglutinates these results in
// the right format, as you indicated in your question. Let's call
// this container class ServiceMiddlewareResult. Then, the merge
// results code will looks like similar to this
final ServiceMiddlewareResult result = allDone.thenAccept(results -> {
ServiceMiddlewareResult serviceMiddlewareResult = new ServiceMiddlewareResult();
// Variable used for storing temporarily the Api 2 and 4 results
// Parameterize it as necessary
List tempResultsFromApi2AndApi4 = new ArrayList();
// Honestly I don't remember if the order of the results is the
// same as the order of the futures that generated them, my guess
// is that not, as it depends on the actual future completion,
// but in any way I always try thinking that the results can be
// in any order, so it is important that every Map contains the
// minimal information to identify the corresponding service
// middleware. With that assumption in mind, your code will look
// similar to this:
results.forEach(result -> {
// The suggested idea, identify the service middleware that
// produced the results
String serviceMiddleware = result.get("serviceMiddleware");
switch(serviceMiddleware) {
// handle every case appropriately
case 'sm1': {
// it should be similar to sm3
serviceMiddlewareResult.getDetails().setApi1(...);
break;
}
case 'sm2':
case 'sm4': {
// Extract results from the Map, and add to the temporary list
tempResultsFromApi2AndApi4.add(...)
break;
}
case 'sm5': {
// extract results and populate corresponding object
serviceMiddlewareResult.setApi5(...);
break;
}
}
});
List sortedResultsFromApi2AndApi4 = Collections.sort(
sortedResultsFromApi2AndApi4, ... the appropriate comparator...
);
result.setListItems(sortedResultsFromApi2AndApi4);
return result;
});
我修改了示例,以提供一种可能的方法来合并您的结果。
如果您需要跟踪和改进整体解决方案提供的调试功能,请考虑在您的服务中间件代码中包含日志信息。
如果您以前使用过它们,作为替代,您可以尝试基于库的解决方案,如<code>RxJava</code>或<code>Project Reactor</code>等。
我试图在springboot上同时运行多个计划任务,但实际上它们运行队列(一个接一个,不是并行的) 这是我简单的服务: 输出: 但是,它应该是这样的: 我做错了什么? 这是我的配置:
我从几个不同的API获取数据。它们是Rest和肥皂网络服务。我有一个ID,我一个接一个地传递给每个API,并获得数据作为回报。但是每个API需要几秒钟才能返回结果,因此我创建的最终响应对象需要太多时间。 我的应用程序是Spring4REST服务。并行调用所有这些API的最佳方法是什么,以便将响应时间减少到尽可能少的程度。 谢谢
yield 指令可以很简单的将异步控制流以同步的写法表现出来,但与此同时我们将也会需要同时执行多个任务,我们不能直接这样写: // 错误写法,effects 将按照顺序执行 const users = yield call(fetch, '/users'), repos = yield call(fetch, '/repos') 由于第二个 effect 将会在第一个 call 执行完
最近,我正在从事一个项目,其中我有2个同时进行2个异步调用。由于我与Quarkus合作,我最终尝试利用叛变和垂直。x库。然而,我无法让我的代码与Unis一起工作。在下面的代码中,我可以想象两个Uni都将被调用,返回速度最快的Uni将被返回。然而,当组合uni时,似乎只返回列表中的第一个uni,即使第一个uni需要更长的时间。 由于uniFast应该首先完成,所以下面的代码应该在打印时打印出来。如何
我们有一个spring boot应用程序和计划任务。 我们希望 在多个服务器上部署我们的应用程序 ,因此应用程序的多个实例。 如何配置spring在同时运行的多个实例上运行调度任务? 例如:一个应用程序在上午12点部署在第一个服务器实例中,任务计划在12点运行。同一个应用程序部署在第二个服务器实例中,时间为凌晨12点03分,由于部署任务存在差异,因此也在凌晨12点33分开始执行相同的cron表达式
我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了