当前位置: 首页 > 知识库问答 >
问题:

完全未来-并行运行多个Rest调用并获得不同的结果

景景胜
2023-03-14

我有一个相当普遍或独特的要求。例如,我有以下AcCount细节列表:

列表

class AccountDetails {
    String bankAccountId;
    String mortgageAccountId;
    Integer noOfTrans;
    String addressLine;
    String externalLink;   
}

除了bankAccountId之外,上述所有字段都是从外部REST服务调用中提取的。我想并行调用所有REST服务并更新列表中的每个对象:

因此,它如下所示:

对于每个accountDetails

  • 调用抵押REST服务并更新martgageAccountId字段(REST返回MortgageInfo对象)

我希望所有上述调用并行进行,并且对于列表中的每个AccCountMore对象。如果有例外,我想优雅地处理它。请注意,上面的每个REST服务返回不同的自定义对象

我对如何使用完全未来链接实现这一点感到困惑。不确定allOfthenCompine(只需要两个),或thenComment应该使用以及如何将所有这些放在一起。

有什么例子/想法吗?


共有3个答案

翟黎明
2023-03-14
匿名用户

如果我简单地将您的account类划分为:

class Account {
  String fieldA;
  String fieldB;
  String fieldC;

  Account(String fieldA, String fieldB, String fieldC) {
    this.fieldA = fieldA;
    this.fieldB = fieldB;
    this.fieldC = fieldC;
  }
}

然后您可以使用CompletableFuture#allOf(…)等待所有可完成未来的结果,每个字段更新一次,然后分别从这些未来检索结果。我们不能使用allOf的结果,因为它不返回任何内容(void)。

Account account = CompletableFuture.allOf(cfA, cfB, cfC)
    .thenApply(ignored -> {
      String a = cfA.join();
      String b = cfB.join();
      String c = cfC.join();
      return new Account(a, b, c);
    }).join(); // or get(...) with timeout

我们可以在thenApply中使用连接,因为所有可完成的期货都在这个阶段完成。您可以修改上面的代码块以适应您的逻辑,例如更新字段而不是创建新对象。请注意,当可完成的未来异常完成时,上面的连接()可以引发异常。您可以将您的可完成的未来更改为在将未来提交给allOf(...)之前正确地处理(),或者在使用连接()之前询问它是否为完备异常():

CompletableFuture.allOf(cfA, cfB, cfC)
    .thenRun(() -> {
      if (!cfA.isCompletedExceptionally()) {
        account.fieldA = cfA.join();
      }
      if (!cfB.isCompletedExceptionally()) {
        account.fieldB = cfB.join();
      }
      if (!cfC.isCompletedExceptionally()) {
        account.fieldC = cfC.join();
      }
    }).join(); // or get(...) with timeout

在一个完成阶段中更新字段的好处是这些操作在同一个线程中完成,因此您不必担心并发修改。

晏树
2023-03-14

既然您已经标记了springboot,我想您应该使用它,并且您的服务是在springframework中编写的。然后我提供了一个与spring框架相关的答案。

首先,我创建了一个接口,用于将RESTAPI实现为异步的。

public interface AsyncRestCall<T> {
   /** this is a hypothetical method with hypothetical params!*/
   CompletableFuture<T> call(String bankAccountId); 
   String type();
}

然后,您可以为您的服务实施以下内容:

正如您在这个实现中看到的,我使用了MortgageRest,它表示Mortgage的rest服务。

 @Service
 public class MortgageService implements AsyncRestCall<MortgageInfo> {

   private final MortgageRest mortgageRest;

   @Autowired
   public MortgageService(MortgageRest mortgageRest) {
       this.mortgageRest = mortgageRest;
   }

   @Override
   public CompletableFuture<MortgageInfo> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(() -> mortgageRest.service(bankAccountId));
    }

   @Override
   public String type() {
      return "mortgage";
   } 
} 

抵押贷款余额:

@Service
public class MortgageRest {
  private RestTemplate restTemplate;
  public MortgageRest(RestTemplate restTemplate) {
     this.restTemplate = restTemplate;
  }
  public MortgageInfo service(String bankAccountId) {
     return new MortgageInfo("123455" + bankAccountId);
  }
}

对于其他Rest服务这样做。

@Service
public class TransactionService implements AsyncRestCall<Transactions> {

   private final TransactionRest transactionRest;

   public TransactionService(TransactionRest transactionRest) {
      this.transactionRest = transactionRest;
   } 

   @Override
   public CompletableFuture<Transactions> call(String bankAccountId) {
       return CompletableFuture.supplyAsync(transactionRest::service);
   }

   @Override
   public String type() {
       return "transactions";
   } 
} 

TransactionRest:

 @Service
 public class TransactionRest {

   public Transactions service() {
       return new Transactions(12);
   }
 }

现在,您需要访问所有asynchrestcall实现。对于这个porpus,您可以声明一个类,如下所示:

@Service
public class RestCallHolder {

  private final List<AsyncRestCall> asyncRestCalls;

  public RestCallHolder(List<AsyncRestCall> asyncRestCalls) {
      this.asyncRestCalls = asyncRestCalls;
  }

  public List<AsyncRestCall> getAsyncRestCalls() {
      return asyncRestCalls;
  }
}

AccountDetailService(你可以说出你喜欢的东西)使用completablefuture将rest服务称为并行服务。

在这个服务器中,每个bank AcCountId的其余调用将存储在Map中

@Service
public class AccountDetailService {

  private final RestCallHolder restCallHolder;

  public AccountDetailService(RestCallHolder restCallHolder) {
      this.restCallHolder = restCallHolder;
  }

  public List<AccountDetail> update(List<AccountDetail> accountDetails) {
     Map<String, Map<String, Object>> result = new HashMap<>();
     List<AccountDetail> finalAccountDetails = new ArrayList<>();

     accountDetails.forEach(accountDetail -> {
          List<CompletableFuture> futures = restCallHolder.getAsyncRestCalls()
                    .stream()
                    .map(rest -> rest.call(accountDetail.getBankAccountId()))
                    .collect(Collectors.toList());

     CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
                 .thenAccept(aVoid -> { 
                    Map<String, Object> res = restCallHolder.getAsyncRestCalls()
                              .stream()
                              .map(rest -> new AbstractMap.SimpleEntry<>(rest.type(),
                                  rest.call(accountDetail.getBankAccountId()).join()))
                              .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
                           result.put(accountDetail.getBankAccountId(), res);
                      }
                   ).handle((aVoid, throwable) -> {
                      return null; // handle the exception here 
             }).join();
            }
    );

      accountDetails.forEach(accountDetail -> finalAccountDetails.add(AccountDetail.builder()
             .bankAccountId(accountDetail.getBankAccountId())
             .mortgageAccountId(((MortgageInfo) result.get(accountDetail.getBankAccountId()).get("mortgage")).getMortgageAccountId())
             .noOfTrans(((Transactions) result.get(accountDetail.getBankAccountId()).get("transactions")).getNoOfTrans())
             .build()));
     return finalAccountDetails;
   }
 }

融伯寅
2023-03-14
AccountDetails accountDetails = new AccountDetails();

CompletableFuture.allOf(
                        CompletableFuture.
                                supplyAsync(() -> //CALL MORTAGE INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setMortgageAccountId(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setNoOfTrans(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME INFO REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setAddressLine(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                        CompletableFuture.
                                supplyAsync(() -> //CALL SOME OTHER REST, executor).
                                thenAccept(x -> {
                                    accountDetails.setExternalLink(x.getReqdField())
                                }).
                                handle(//HANDLE GRACEFULLY),
                ).join();
 类似资料:
  • 我编写了两个功能文件,每个功能文件打开不同的浏览器URL,例如一个是open google。com和secnd一个开放的亚马逊。但事实并非如此。 两个浏览器都打开了谷歌。通用域名格式。此外,它不能与浏览器交互,任何编码到浏览器的操作都不会执行。此外,关闭第一个浏览器会导致第二个浏览器出现空指针异常。 cucumber版本6我从AbstractCucumberTesNG继承开始。然后我创建登录。功能

  • 我的makefile中的一个目标是一个非常耗时的CPU任务。但是我可以分割工作负载并并行运行任务几次,以加快整个过程。 我的问题是make不会等待所有过程完成。 考虑一下这个简单的脚本,名为“代码> MyTask.SH <代码>: 现在,让我们从bash脚本调用它,并使用等待所有任务完成: 产出如预期: 但是在Makefile中尝试相同的方法时: 它不起作用: 当然,我可以创建多个目标,这些目标可

  • 我试图用Cucumber,用不同的参数并行运行多个testng套件。对于每个tesng套件,我试图传递不同的浏览器、testinfo等等。我想通过maven命令行选项来实现这一点。我关注了https://rationaleemotions.wordpress.com/2016/03/29/parallel-execution-of-multiple-testng-suites/#comment-1

  • 问题内容: 我有要并行处理的元素的集合。当我使用时,并行性有效。但是,当我使用时,它不会并行运行。 我写了一个代码样本来说明问题: 这是我在Windows 7上获得的输出 我们可以看到中的第一个元素必须在处理第二个元素之前完成。对于,第二个元素在第一个元素结束之前开始。 您能否告诉我是什么原因导致此问题,以及如何使用集合避免发生此问题? 问题答案: 我可以重现您看到的行为,其中并行性与您指定的fo

  • 返回的流量 返回一个 如果您不能回答我的问题,请至少告诉我如何并行地执行多个API调用,并在WebClient中等待结果

  • 但是,testCase2不处理异常并引发错误。我是不是漏掉了什么?抱歉,我是新手。