当前位置: 首页 > 知识库问答 >
问题:

理解链式可完成期货的并行执行

张勇
2023-03-14

我有一个关于Java流和链式可完成期货如何执行的问题。

我的问题是:如果我运行下面的代码,调用执行(),列表中有10个项目需要大约11秒才能完成(列表中的项目数加1)。这是因为我有两个线程并行工作:第一个执行digItUp操作,一旦完成,第二个执行fillItBackIn操作,第一个开始处理digItUp列表中的下一个项目。

如果我注释掉第36行(.collect(Collectors.toList())),那么execute()方法需要大约20秒才能完成。Thread不平行运行;对于列表中的每个项目,digItUp操作完成,然后在处理列表中的下一个项目之前,fillItBackIn操作按顺序完成。

我不清楚为什么排除(。收集(Collectors.toList()))会改变这种行为。有人能解释一下吗?

完整课程:

package com.test;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class SimpleExample {

    private final ExecutorService diggingThreadPool = Executors.newFixedThreadPool(1);
    private final ExecutorService fillingThreadPool = Executors.newFixedThreadPool(1);

    public SimpleExample() {

    }

    public static void main(String[] args) {
        List<Double> holesToDig = new ArrayList<>();
        Random random = new Random();
        for (int c = 0; c < 10; c++) {
            holesToDig.add(random.nextDouble(1000));
        }
        new SimpleExample().execute(holesToDig);
    }

    public void execute(List<Double> holeVolumes) {
        long start = System.currentTimeMillis();
        holeVolumes.stream()
                .map(volume -> {
                    CompletableFuture<Double> digItUpCF = CompletableFuture.supplyAsync(() -> digItUp(volume), diggingThreadPool);
                    return digItUpCF.thenApplyAsync(volumeDugUp -> fillItBackIn(volumeDugUp), fillingThreadPool);
                })
                .collect(Collectors.toList())
                .forEach(cf -> {
                    Double volume = cf.join();
                    System.out.println("Dug a hole and filled it back in.  Net volume: " + volume);
                });
        System.out.println("Dug up and filled back in " + holeVolumes.size() + " holes in " + (System.currentTimeMillis() - start) + " ms");
    }

    public Double digItUp(Double volume) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Dug hole with volume " + volume);
        return volume;
    }

    public Double fillItBackIn(Double volumeDugUp) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        System.out.println("Filled back in hole of volume " + volumeDugUp);
        return 0.0;
    }
}

共有1个答案

单于旭东
2023-03-14

原因是collect(Collectors.toList())是一个终端操作,因此它会触发流管道(请记住,流是惰性计算的)。因此,当您调用collect时,所有CompletableFuture实例都会被构造并放置在列表中。这意味着有一个CompletableFuture链,其中每一个都是由两个阶段组成的链,我们称之为X和Y。

每当第一个线程执行器完成一个X阶段时,它就可以自由地处理下一个组合的CompletableFuture的X阶段,而另一个线程执行器则处理上一个CompletableFuture的阶段Y。这是我们直觉预期的结果。

另一方面,当您不调用collect时,则在本例中,forEach是终端操作。然而,在这种情况下,流中的每个元素都是按顺序处理的(以确认是否尝试切换到parallelStream()),因此对于第一个CompletableFuture,会执行阶段X和Y。只有当前一个流元素的阶段Y完成时,forEach才会移动到流管道中的第二个元素,只有到那时,新的CompletableFuture才会从原始的Double值映射。

 类似资料:
  • 我有一个可完成期货的列表,我想从第一个期货开始,如果有任何完成例外,我想尝试列表中的下一个期货,依此类推,直到我耗尽了我所有的期货。如果任何一个期货成功了,我想就此止步,而不使用列表中的下一个期货。我如何做到这一点?到目前为止,我已经尝试过: 但是当我测试这种方法时,我看到当未来完成失败时,会抛出异常,并且不会尝试下一组期货。 编辑: 这就是样本的样子

  • 问题内容: 我遇到了一个奇怪的情况。我不满意,运行以下代码时出现意外结果: 没有抛出异常(即使使用),我看到的是控制台输出为 现在,很明显,此代码没有实际的生产价值,但这是一种情况的表示,其中您的代码具有未知数量的嵌套,其中每个嵌套或其中的一些嵌套将无法执行。 任何解释(以及有关如何修复的示例)将不胜感激 问题答案: 之所以不起作用,是因为在您的简单测试中,VM在所有任务完成之前就退出了。 当您致

  • 现在,我有三个函数:updateFieldFromCollection1()、

  • 我有一个关于CompletableFuture方法的问题: JavaDoc的意思是: 返回一个新的完成阶段,当此阶段正常完成时,将使用此阶段的结果作为所提供函数的参数来执行该阶段。有关异常完成的规则,请参阅完成阶段文档。 穿线呢?这将在哪个线程中执行?如果未来由线程池来完成呢?

  • 我正在尝试实现Slueth,用于spring boot微服务的分布式跟踪,这些微服务通过消息传递通道相互通信。 其中一个微服务是一个调度器,它接收一天内创建的新消费者。然后,它以异步方式为每个消费者的数据运行分组过程。 现在,我使用traceableExeucutorService将为调度程序线程生成的sleuth跟踪传递给每个使用者的子线程。 跟踪配置 调度程序服务 这最终会为每个消费者使用相同

  • 我有3种方法需要并行运行,因为它们彼此独立,并在最后合并每种方法的结果,并将其作为响应发送。我还需要处理异常。 在不同的帖子中,我找到了下面的代码并进行了相应的修改。 上述各项是否应正确并行运行?我知道这需要更多的时间,我想确保我做对了。