当前位置: 首页 > 知识库问答 >
问题:

无法将可完成未来的结果输入响应对象

邹弘
2023-03-14

我使用Java8的Java.util.concurrent的CompletableFuture接口调用了四个API。我想执行多个rest调用,组合结果并返回一个JSON。

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor).thenApply(s -> {
        putIntoResponse(response, s);
        return true;
    });

    return response;
}

private void putIntoResponse(Map<String, Map<String, Object>> response, List<Map<String, Object>> s) {
    if(s.size() > 0) {
        for (Map<String, Object> maps : s) {
            if (maps != null && maps.containsKey("abcd")) {
                String abcd = maps.get("abcd").toString();
                if(!response.containsKey(abcd))
                    response.put(maps.get("abcd").toString(), maps);
                else {
                    for (Map.Entry<String, Object> entry: maps.entrySet()) {
                        response.get(abcd).put(entry.getKey(), entry.getValue());
                    }
                }
            }
        }
    }
}

来自API的响应:

{ 
   {
    "abcd": 1,
    "cde": 2
   },
   { 
    "abcd": 2,
     "cde": 3
   }
}

将上述响应解析为:

{
   "1" : {
    "abcd": 1,
    "cde": 2
   },
   "2":{ 
    "abcd": 2,
     "cde": 3
   }

}

共有1个答案

耿俊
2023-03-14

我认为您的问题在于CompletableFuture.supplyAsync()没有被阻塞,因此您的代码立即向前移动,而不是等待应用异步操作。

代码是以异步方式执行的,在本例中,这意味着告诉CompletableFuture.supplyAsync()中的操作要执行,然后进一步操作。只有当SupplyAsync中的代码完成执行时,才调用thenApply()部分,这很可能发生在您已经返回响应之后。

如果希望在返回响应之前等待所有CompletableFuture.join()方法完成执行。

首先尝试重构代码,去掉theenapply()部分,以便您的~completablefutures`得到部分响应。

然后将所有CompletableFutures分配给一些变量(在我的示例中是MyFirstFutureMySecondFuture等)。

之后,使用对所有CompletableFutures应用join()方法,并对每个结果应用putinResponse方法。

例如:

public Map<String, Map<String, Object>> getAllValuesInParallel2(RequestObj requestObj) {

    Map<String, Map<String, Object>> response = new HashMap<>();
    ExecutorService executor = Executors.newFixedThreadPool(6);
    CompletableFuture<List<Map<String, Object>>> myFuture1 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("mani");
        return aClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture2 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("gani");
        return bClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture3 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("priya");
        return cClient.transform(keys, requestObj);
    }, executor);

    CompletableFuture<List<Map<String, Object>>> myFuture4 = CompletableFuture.supplyAsync(() -> {
        List<Template> keys = new ArrayList<>();
        keys.add("ravi");
        return dClient.transform(keys, requestObj);
    }, executor);

    Stream.of(myFuture1, myFuture2, myFuture3, myFuture4)
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .forEachOrdered(s -> putIntoResponse(response, s));


    return response;
}
 类似资料:
  • 我正在尝试将方法的调用/结果链接到下一个调用。我得到了编译时错误方法,因为if无法从前一次调用中获得objB的引用。 如何将上一次调用的结果传递给下一个链?我是不是完全误解了这个过程?

  • 我有以下代码,即从返回CompletableFuture的web API获取给定时间间隔的报告。如果超过了返回报告的行数,时间间隔将被分成两半,并为两半调用API。这将递归重复,直到行数满足条件。 我想得到完全未来 我的代码在不需要进行间隔拆分时运行正常。如果需要递归调用,它将只返回空列表,递归调用将在稍后异步执行。 我也尝试过使用类似的东西,但没用: 如果我知道自己做错了什么,我会心存感激。谢谢

  • 我要做的是异步计算树结构的深度,我将有树的第一层,我想启动一个异步线程来分别计算每个节点的深度。 在计算过程中,树中显然可能有一个分叉,在这一点上,我想踢一个额外的线程来计算那个分支。 我已经得到了这个工作,但我需要做一些整理逻辑,当所有这些未来完成。但我对这一过程中产生的额外的可完成的未来感到困扰。 我会用什么方法来保存所有开始的CompletableFutures+那些动态创建的,并且在执行任

  • 我有一个实例列表。 如何将他们转变成这样一个未来:

  • 我已将自动完成功能应用于两个。为此,我使用了自动完成计算器。我观察到它的速度减慢到我甚至无法输入一个字符的程度。有什么解决办法吗? 谢谢

  • 问题内容: 信封:Akka 2.1,scala版本2.10.M6,JDK 1.7,u5 现在是我的问题:我有: 现在在第一行中,我有一个Future对象的Future,有什么方法可以在不阻塞当前线程的情况下将其转换为Future? Akka有什么方法吗?据我检查,我还没有发现…第一次发帖....不好意思的格式和组织…:〜P 问题答案: 简短答案(英语):flatMap dat sh!t 较短的答案

  • 我有几个单独的操作,我想以与JS中相同的风格链接这些操作。 所以目前我有这样的代码: 可以在一个链中重写吗?有点:

  • 问题内容: 我尽力而为,但没有找到任何文章和博客可以清楚地比较和,并且提供了很好的分析。 因此,如果任何人都可以向我解释或指向这样的博客或文章,那对我来说真的非常好。 问题答案: 无论 ListenableFuture 和 CompletableFuture 有超过它的父类的优势 未来 通过允许呼叫者在这样或那样的回调“注册”当异步动作已经完成被调用。 使用 Future, 您可以执行以下操作: