当前位置: 首页 > 知识库问答 >
问题:

在带有两个未来的循环中使用CompletableFuture来合并每个循环迭代

卜鹏
2023-03-14

我有如下代码:

testMethod(List<String> ids)  {
    List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>();
    
    for(String id : ids) {
        CompletableFuture<ResultOne> resultOne = AynchOne(id);
        CompletableFuture<ResultTwo> resultTwo = AynchTwo(id);
        
    CompletableFuture<ResultThree> resultThree =  resultOne.thenCombine(resultTwo, (ResultOne a, ResultTwo b) -> computeCombinedResultThree(a, b)); 
    
    resultThreeList.add(resultThree);
    }
    // PROCESS RESULTS HERE
}

class ResultOne {
    boolean goodResult;
    String id;

    ResultOne(String promId) {
        this.goodResult = true;
        this.id = promId;
    }
}

class ResultTwo {
    boolean goodResult;
    String id;

    ResultTwo(String promId) {
        this.goodResult = true;
        this.id = promId;
    }

class ResultThree() {
        boolean goodResult;
        String = id;
    }

private ResultThree computeCombinedResultThree(ResultOne r1,  ResultTwo r2) { 
   ResultThree resultThree = new ResultThree();
    resultThree.id = r1.id;
    resultThree.goodResult = r1.goodResult && r2.goodResult;

    return resultThree;
}

,我需要能够将结果resultOne和resultTwo放在一起,这样对于每次迭代,在完成整个同步执行时,我有一个(我猜)数组或映射,我可以随后处理,其中数组中的一个对象有相应的id,该id有一个true或false(表示两个布尔值与单独对象的和)。

根据读者的反馈,我已经完成了代码,可以合并两个原始期货,并组合每次迭代的所有结果以获得整个期货循环。此时我只需要处理结果。

我想也许我需要另一个完整的未来?这个可能是这样的(放在上面,我这里有“//过程结果”):

CompletableFuture<Void> future = resultThreeList
  .thenRun(() -> forwardSuccesses(resultThreeList));

future.get();

ForwardSuccessfulls()将遍历resultThreeList,将成功的ID转发给另一个进程,但不是这样做的。谢谢你的建议。谢谢

共有3个答案

谢诚
2023-03-14

for循环中,你会立即得到CompletableFutures作为回报。在后台,一些魔法发生了,你想等到两者都完成。

因此,在两个CompletableFuture都返回后,通过调用CompletableFuture引起阻塞等待。获取一个长值和一个时间单位。如果只调用get而不调用任何参数,您将永远等待。

明智地选择超时和JDK。JDK 8可能不提供get with timeout。此外,JDK 8不再受支持。JDK 11现在是长期支持,最近的编译器不再将JDK 8作为目标。

我真的敦促你们阅读关于CompletableFuture的肮脏细节,以及它与Future的区别,尤其是关于线程控制(比如取消)的细节。由于不知道CompletableFuture的底层提供者,我还假设查询一个ID是对资源的浪费,吞吐量非常有限。但这是另一个问题。

令狐跃
2023-03-14

在我看来,你不需要三个不同的resulttoneResultTwoResultTwo三个类,因为它们定义了相同的类型,所以我将为Result替换它们。

假设您只想转发成功,我在Result类中添加了一个简短的isGoodResult()方法,用作流的谓词:

class Result {
    public boolean goodResult;
    public String id;
// ...
    public boolean isGoodResult() {
        return this.goodResult;
    }
}

我还建议去掉循环,将其替换为流,以使代码更流畅。

forwardSuccess应该严格接受列表吗

void testMethod(List<String> ids)  {
    final List<Result> results = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .map(CompletableFuture::join)
        .filter(Result::isGoodResult)
        .collect(Collectors.toList());

    // PROCESS RESULTS HERE
    forwardSuccesses(results);
}

应该转发成功偷懒,接受CompletableFuture

void testMethod(List<String> ids)  {
    final List<CompletableFuture<Result>> futures = ids.stream()
        .parallel()
        .map(id -> asynchOne(id).thenCombine(
            asynchTwo(id), 
            (r1, r2) -> computeCombinedResult(r1, r2)))
        .collect(Collectors.toList());

    final CompletableFuture<List<Result>> asyncResults =
    CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new))
            .thenApply(__ -> futures 
                    .stream()
                    .map(CompletableFuture::join)
                    .filter(Result::isGoodResult)
                    .collect(Collectors.toList()));

    // PROCESS RESULTS HERE
    forwardSuccessesAsync(asyncResults);
}

刘俊语
2023-03-14

这就是你到现在为止所取得的成绩:

List<CompletableFuture<ResultThree>> resultThreeList = new ArrayList<>(ids.size());
for (String id : ids) {
    CompletableFuture<ResultOne> resultOne = aynchOne(id);
    CompletableFuture<ResultTwo> resultTwo = aynchTwo(id);

    CompletableFuture<ResultThree> resultThree = resultOne.thenCombine(resultTwo, this::computeCombinedResultThree);
    resultThreeList.add(resultThree);
}

现在你需要做的就是转换这个列表

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.allOf(resultThreeList.toArray(new CompletableFuture<?>[0]))
                .thenApply(v -> resultThreeList.stream()
                        .map(CompletableFuture::join)
                        .collect(Collectors.toList())
                );

或者用类似的东西

CompletableFuture<List<ResultThree>> combinedCompletables =
        CompletableFuture.supplyAsync(() -> resultThreeList.stream().map(this::safeGet).collect(Collectors.toList()));

其中,safeGet是一个只调用future的方法。get()并捕获可能发生的异常——由于这些异常,您不能在lambda中调用get()

现在,您可以使用然后accept()处理此列表:

try {
    combinedCompletables.thenAccept(this::forwardSuccesses).get(30, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
    e.printStackTrace();
}

同样,捕捉到的异常是由于调用了get()

附带说明,我真的不明白为什么有三个结果类,因为您所需要的——至少对于这部分代码——是id和结果状态。我会为此引入一个接口(Result?),并且只处理它。

 类似资料:
  • 问题内容: 学生的姓名(String [])和相应的标记(int [])存储在不同的数组中。 如何使用Java中的每个循环一起遍历两个数组? 一种简单的方法是在同一循环中使用索引变量。有什么好办法吗? 问题答案: 潜在的问题实际上是您应该将两个数组绑定在一起,并且仅跨一个数组进行迭代。 这是一个非常简单的演示-您应该使用getter和setter,还应该使用a 而不是数组,但这证明了这一点:

  • 考虑这个简单的C++函数来计算数组的前缀和: 它是4个融合的UOP1,这个CPU可以支持4个融合的OPs/周期。 有通过和携带的依赖链,每个都是一个循环,但是这些UOP可以到4个ALU端口中的任何一个,所以似乎不太可能冲突。融合的需要转到p6,这是一个更令人担忧的问题,但我只测量到p6的1.1 UOPS/迭代。这将解释每次迭代1.1个循环,但不是1.4个循环。如果我将循环展开2倍,端口压力会低得多

  • while,翻译成中文是“当...的时候”,这个单词在英语中,常常用来做为时间状语,while ... someone do somthing,这种类型的说法是有的。在python中,它也有这个含义,不过有点区别的是,“当...时候”这个条件成立在一段范围或者时间间隔内,从而在这段时间间隔内让python做好多事情。就好比这样一段情景: while 年龄大于60岁:-------->当年龄大于60

  • 我尝试将一个循环转换为一个循环在flutter中等待循环。我想要循环的元素是Firebase实时数据库中的快照。 我的函数如下所示: 我尝试了不同的方法,但是我没有让函数工作。 第一次尝试: 错误: 未处理的异常:键入'_InternalLinkedHashMap 第二次尝试: 错误: 未处理的异常:键入'_InternalLinkedHashMap 在所有的尝试中,我都得到了相同的错误。但是上述

  • 我在C语言课程考试前练习一些算法问题,我被这个问题卡住了(至少3个小时甚至4个小时),我不知道如何回答: 您有两个已排序的循环单链接列表,必须合并它们并返回新循环链接列表的标题,而不创建任何新的额外节点。返回的列表也应该进行排序。 节点结构为: 我尝试了很多方法(递归和非递归),但都没有解决问题。 谢谢你的帮助。

  • 问题内容: 我正在尝试将此for循环重写为for每个循环。 这就是我尝试过的 谁能指出我正确的方向?谢谢。 问题答案: 我认为您想得太多… :)