当前位置: 首页 > 知识库问答 >
问题:

如何在Spring服务中同时执行多个API调用而不更改main?

宗政海
2023-03-14

我需要在现有的fat代码中创建一个服务,以从4个API中获取结果,我需要合并它们并重新格式化每个响应,但由于4个调用,我不知道如何同时进行,所以速度非常慢。我也无法更改main来添加< code>Runnable或main中的此类执行器,因为它可能会对另一个代码产生滚雪球效应。

因此,目前,我制作了一个控制器来处理请求,一个从用户那里获取请求并调用5种不同服务中间件(SM)功能的服务。每个SM函数都用于调用外部API,在每个SM中,我也重新格式化了API的每个返回映射。我使用<code>java.net。HttpURLConnection来执行API调用。因此,我的API“工作”了,但不能超过4秒。这些API需要额外的OAuth,因此总共大约需要10个API调用。

由于API调用的当前返回是对象类型,我可以将其视为映射,并通过对其中的数据执行循环来重新格式化输出。因此,SM函数的代码可能与以下类似:

token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
List<Map> data = (List) ((Map) response.get("output")).get("data");
List<Map> result = new HashMap();
for(Map m : data) {
  Map temp = new HashMap();
  temp.put("name", m.get("Name"));
  temp.put("health_status", m.get("HealthStatus"));
  result.add(temp);
}

// This format is mandatory
Map finalResult = new HashMap();
finalResult.put("output", result);
finalResult.put("status", "OK");
return finalResult;

sendHttpRequest是发送请求的方法,将参数序列化为JSON,并将API输出反序列化为对象。以下是 sendHttpRequest 的样子:

CloseableHttpClient httpClient = HttpClients.custom()
                        .setSSLSocketFactory(csf)
                        .build();

HttpComponentsClientHttpRequestFactory requestFactory =
                        new HttpComponentsClientHttpRequestFactory();
requestFactory.setConnectTimeout(this.connectTimeOut);
requestFactory.setReadTimeout(this.readTimeOut);
requestFactory.setHttpClient(httpClient);

RestTemplate rt = new RestTemplate(requestFactory);
HttpEntity<Map> request = null;
if(method.equals("POST"))
    request = new HttpEntity<Map>(objBody, headers);
else if(method.equals("GET"))
    request = new HttpEntity<Map>(headers);
    
try {
    ResponseEntity<Map> response = null;
    if(method.equals("POST"))
        restTemplate.postForEntity(url, request , Map.class);
    if(method.equals("GET"))
        restTemplate.postForEntity(url, request , Map.class);
    if(this.outputStream){
        logger.debug("Output : " + response.getBody());
    }
    return response.getBody();
} catch(HttpClientErrorException e) {
    logger.debug(e.getMessage());
}

sendHttpRequest方法也是一种现有的方法,我不允许更改,除非我只是创建一个新方法来执行我的请求。

简单地说,以下是我需要做的事情:

> < li>

对于每个API调用:

  • 从外部API获取授权令牌
  • 对另一个外部API执行请求(POST/GET)以获取数据
  • 将数据重新格式化为响应的预期格式(每个格式都有自己的格式)

在所有API完成调用后,我需要做:

  • 将 API 1 和 3 的输出合并到地图/对象
  • 合并 API 2 的输出

我曾尝试使用ExecutorCompletionService调用5条SMs。我还创建了一个内部类,实现了可调用

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");
        
        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

在合并输出中,我需要构造地图,所以它是这样的:

{
  "details": {"api1": {...}, "api3": {...}},
  "list_items": [{...}, {...}, ...], // Results of sorted merged lists from api2 & api4
  "api5": [{...}, {...}, {...}, ...]
}

同时,从api响应来看,基本上我只是在存在时检索它们的所有< code>output_schema。

有没有优化和加速这个API调用的技巧,这样在调用次数相同的情况下,可以执行得更快???非常感谢任何帮助。

我已经阅读了@Anant apadmanabhan的回答,但我似乎需要更改我无法做到的主类文件。或者实际上是否可以在主类中不使用@EnableAsync而应用CompletableFuture的使用?我还想知道如何在更快的时间内完成这件事,即使使用CompletableFuture和EnableAsync使用这个流程链。

共有2个答案

蒋向笛
2023-03-14

如果所有4个api调用都是相互独立的,并且您使用的是java 8,则可以根据需要将它们提取到单独的服务层中的单独函数,并在方法上使用spring @Async注释以及ComppletableFuture作为返回类型进行并行调用。

@Service
public class TestClient {
    RestTemplate restTemplate = new RestTemplate();

    @Async
    public CompletableFuture<List<TestPojo>> getTestPojoByLanguage(String language) {
        String url = "https://test.eu/rest/v2/lang/" + language + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }

    @Async
    public CompletableFuture<List<TestPojo>> getCountriesByRegion(String region) {
        String url = "https://testurl.eu/rest/v2/region/" + region + "?fields=name";
        Country[] response = restTemplate.getForObject(url, Country[].class);

        return CompletableFuture.completedFuture(Arrays.asList(response));
    }
}

完整的未来指南。

阎唯
2023-03-14

你尝试的解决方案在我看来相当不错:

ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService = new ExecutorCompletionService<>(executor);

List<Future<Map>> results = new ArrayList<>();
for(int i=1; i<6; i++) {
    // i here is used to define which api calls to be done
    results.add(completionService.submit(new CallAPIClass(paramMap, i)));
}

for (int i=0; i < results.size(); i++) {
    try {
        Map result = (Map) completionService.take().get();
        int code = (int) result.get("code");
        
        // Collect the results for each SM (SM function has described above)

    } catch (Exception e) {
        logger.debug(e.getMessage());
    }
}

// Merge the outputs.

我不太确定,除了可能更流畅的API之外,使用<code>CompletableFuture</code>是否会给您带来与程序性能相关的任何好处-本主题已在本文中进行了广泛讨论,请参见示例1 2 3-但这是一个可能的解决方案。

事实上,下一个代码基于我之前的一个答案,反过来又与Tomasz Nurkiewicz博客中的这篇文章密切相关。

您提供的代码的CompletableFuture对应部分如下所示:

ExecutorService executor = Executors.newFixedThreadPool(5);

// List of the different parameters to perform every external API invocations
final List<Map> smParameters = Arrays.asList(
  ...
);


// Submit invoke external task to the thread pool 
final List<CompletableFuture<Map>> futures = smParameters.stream().
  map(paramMap -> CompletableFuture.supplyAsync(() -> invokeExternalAPI(paramMap), executor)).
  collect(Collectors.<CompletableFuture<Map>>toList())
;

// The next code is based on the sequence method proposed in the blog I cited
// The idea is to turn the `List<CompletableFuture<Map>>` we have into a
// CompletableFuture<List<Map>> with the results of every single async task
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
  // Merge the outputs. The results variable contains the different Mapz
  // obtained from the every different API invocation
);

请验证上面的代码,可能需要定义Map对象的不同参数的类型。

提到的调用ExternalAPI可以接受Map,其中包含执行单个API调用所需的不同参数,例如:

private Map invokeExternalAPI(Map configuration) {
  // Pass and extract from the configuration the authUrl, etcetera, everything you need to

  // Your code...

  token = sendHttpRequest(authUrl, authRequestHeader, null, null, "GET");
  Map response = sendHttpRequest(url, requestHeader, bodyParam, null, "POST");
  List<Map> data = (List) ((Map) response.get("output")).get("data");
  List<Map> result = new HashMap();
  for(Map m : data) {
    Map temp = new HashMap();
    temp.put("name", m.get("Name"));
    temp.put("health_status", m.get("HealthStatus"));
    result.add(temp);
  }

  // This format is mandatory
  Map finalResult = new HashMap();
  finalResult.put("output", result);
  finalResult.put("status", "OK");
  return finalResult;
}

我认为您不需要修改主类或任何配置,因为该解决方案完全基于Java。

请记住,这种通用方法可以定制,以适应不同的要求。

例如,根据您的评论,您似乎需要从服务中调用在不同服务中间件中实现的功能。

为了定义要同时执行的任务列表,您可以尝试以下方法,而不是我最初的建议:

List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = CompletableFuture.supplyAsync(() -> sm1.doYourStuff(), executor);
futures.add(sm1Cf);

// Now obtain a reference to the second middleware, and submit it again
final ServiceMiddleware2 sm2 = new ServiceMiddleware2();
final CompletableFuture<Map> sm2Cf = CompletableFuture.supplyAsync(() -> sm2.doYourStuff(), executor);
futures.add(sm2Cf);

// the rest of service middleware. I think here a common interface
// or some kind of inheritance could be of help in the invocation

// At the end, you will get the list of futures you wanna execute in parallel

// The rest of the code is the same
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
final Map result = allDone.thenAccept(results ->
  // Merge the outputs. The results variable contains the different Mapz
  // obtained from the every different API invocation
);

要处理错误,您有几种选择。

一个显而易见的方法是处理服务中间件本身的错误,这样它就不会引发任何异常,而是在其结果<code>映射</code>中返回某种信息,如结果代码、状态等。

CompletableFuture本身也为您提供了不同的选项来处理错误。由于您可能需要对结果Map执行一些更改,因此您可以在必要时使用句柄方法。它基本上将结果和在执行与CompletableFuture关联的任务时获得的假设异常作为参数,并根据该结果和可能的错误返回具有适当自定义的新CompletableFuture。例如,在您的第4和第5个服务中间件中,这似乎会引发错误,您可以使用以下内容:

final ServiceMiddleware4 sm4 = new ServiceMiddleware4();
final CompletableFuture<Map> sm4Cf = CompletableFuture.supplyAsync(() -> sm4.doYourStuff(), executor)
  .handle((result, exception) -> {
      if (exception == null) {
        return result;
      }

      Map actualResult = new HashMap();
      actualResult.put("errorCode", "xxx")
      actualResult.put("errorMessage", exception.getMessage());
      return actualResult; 
    });
  )
;
futures.add(sm4Cf);

例如,这篇伟大的文章详细解释了进一步的错误处理方法。

所有这些方法都假设您的代码不会抛出检查过的异常。如果你需要处理这些问题,根据你的评论,你可以使用霍尔格在这个答案中发布的代码的修改版本。我们的想法是创建一个处理检查异常的方法,必要时使用适当的错误完成该方法:

public static <T> CompletableFuture<T> supplyAsync(Supplier supplier, Executor executor) {
    CompletableFuture<T> f=new CompletableFuture<>();
    CompletableFuture.runAsync(() -> {
        try { f.complete(supplier.get()); } catch(Throwable t) { f.completeExceptionally(t); }
    }, executor);
    return f;
}

然后,使用此方法提交每个服务中间件任务:

List<CompletableFuture<Map>> futures = new ArrayList<>(5);

// Obtain a reference to the second middleware, and submit it
final ServiceMiddleware1 sm1 = new ServiceMiddleware1();
final CompletableFuture<Map> sm1Cf = supplyAsync(() -> sm1.doYourStuff(), executor)
  // this method will only be executed if any exception is thrown
  .exceptionally(exception -> { 
    Map errorResult = new HashMap();
    errorResult.put("errorCode", "xxx")
    errorResult.put("errorMessage", exception.getMessage());
    return errorResult; 
  });
futures.add(sm1Cf);

// Apply a similar logic to the rest of services middlewares...

// The rest of the code is the same as above
final CompletableFuture<Void> allDoneFuture =
  CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));

final CompletableFuture<List<Map>> allDone = allDoneFuture.thenApply(v ->
  futures.stream().
    map(future -> future.join()).
    collect(Collectors.<Map>toList())
);

// Merge the outputs.
// Please, be aware that in the lambda expression results
// is a List of the different Maps obtained as the individual
// results of every single service middleware call
// I would create an object that agglutinates these results in
// the right format, as you indicated in your question. Let's call
// this container class ServiceMiddlewareResult. Then, the merge
// results code will looks like similar to this
final ServiceMiddlewareResult result = allDone.thenAccept(results -> {
  ServiceMiddlewareResult serviceMiddlewareResult = new ServiceMiddlewareResult();
  // Variable used for storing temporarily the Api 2 and 4 results
  // Parameterize it as necessary
  List tempResultsFromApi2AndApi4 = new ArrayList();
  // Honestly I don't remember if the order of the results is the
  // same as the order of the futures that generated them, my guess
  // is that not, as it depends on the actual future completion,
  // but in any way I always try thinking that the results can be 
  // in any order, so it is important that every Map contains the 
  // minimal information to identify the corresponding service 
  // middleware. With that assumption in mind, your code will look
  // similar to this:
  results.forEach(result -> {
    // The suggested idea, identify the service middleware that
    // produced the results
    String serviceMiddleware = result.get("serviceMiddleware");
    switch(serviceMiddleware) {
      // handle every case appropriately
      case 'sm1': {
        // it should be similar to sm3
        serviceMiddlewareResult.getDetails().setApi1(...);
        break;
      }

      case 'sm2':
      case 'sm4': {
        // Extract results from the Map, and add to the temporary list
        tempResultsFromApi2AndApi4.add(...)
        break;
      }

      case 'sm5': {
        // extract results and populate corresponding object
        serviceMiddlewareResult.setApi5(...);
        break;
      }
    }
  });

  List sortedResultsFromApi2AndApi4 = Collections.sort(
    sortedResultsFromApi2AndApi4, ... the appropriate comparator...
  );
  result.setListItems(sortedResultsFromApi2AndApi4);

  return result;  
});

我修改了示例,以提供一种可能的方法来合并您的结果。

如果您需要跟踪和改进整体解决方案提供的调试功能,请考虑在您的服务中间件代码中包含日志信息。

如果您以前使用过它们,作为替代,您可以尝试基于库的解决方案,如<code>RxJava</code>或<code>Project Reactor</code>等。

 类似资料:
  • 我试图在springboot上同时运行多个计划任务,但实际上它们运行队列(一个接一个,不是并行的) 这是我简单的服务: 输出: 但是,它应该是这样的: 我做错了什么? 这是我的配置:

  • 我从几个不同的API获取数据。它们是Rest和肥皂网络服务。我有一个ID,我一个接一个地传递给每个API,并获得数据作为回报。但是每个API需要几秒钟才能返回结果,因此我创建的最终响应对象需要太多时间。 我的应用程序是Spring4REST服务。并行调用所有这些API的最佳方法是什么,以便将响应时间减少到尽可能少的程度。 谢谢

  • yield 指令可以很简单的将异步控制流以同步的写法表现出来,但与此同时我们将也会需要同时执行多个任务,我们不能直接这样写: // 错误写法,effects 将按照顺序执行 const users = yield call(fetch, '/users'), repos = yield call(fetch, '/repos') 由于第二个 effect 将会在第一个 call 执行完

  • 最近,我正在从事一个项目,其中我有2个同时进行2个异步调用。由于我与Quarkus合作,我最终尝试利用叛变和垂直。x库。然而,我无法让我的代码与Unis一起工作。在下面的代码中,我可以想象两个Uni都将被调用,返回速度最快的Uni将被返回。然而,当组合uni时,似乎只返回列表中的第一个uni,即使第一个uni需要更长的时间。 由于uniFast应该首先完成,所以下面的代码应该在打印时打印出来。如何

  • 我们有一个spring boot应用程序和计划任务。 我们希望 在多个服务器上部署我们的应用程序 ,因此应用程序的多个实例。 如何配置spring在同时运行的多个实例上运行调度任务? 例如:一个应用程序在上午12点部署在第一个服务器实例中,任务计划在12点运行。同一个应用程序部署在第二个服务器实例中,时间为凌晨12点03分,由于部署任务存在差异,因此也在凌晨12点33分开始执行相同的cron表达式

  • 我将以下响应返回给用户 到目前为止,我正在进行三次连续调用来计算这个,每个调用都可以独立于其他调用运行。我尝试制作三种不同的作为: 如果我做了