如何使用5个CompletableFutures异步执行20个可运行任务(或1个任务20次)?
这就是我得到的:
Runnable task = () -> {
long startTime = System.currentTimeMillis();
Random random = new Random();
while (System.currentTimeMillis() - startTime < 3000) {
DoubleStream.generate(() -> random.nextDouble())
.limit(random.nextInt(100))
.map(n -> Math.cos(n))
.sum();
}
System.out.println("Done");
};
for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
}
如果我执行这段代码,我可以看到它只运行3次。异步获取():3,然后在1 for()迭代中剩下2
所以,我想做所有20个任务,尽可能异步
将以下系统属性设置为希望公共分叉联接池使用的线程数:
java.util.concurrent.ForkJoinPool.common.parallelism
见ForkJoinPool
原因是,在构建可完成的未来时,您没有指定自己的fork-join池,因此它隐式地使用
ForkJoinPool.commonPool()
完全安全
CompletableFuture
的默认执行程序是ForkJoinPool
的公共池,它具有与CPU内核数减1相匹配的默认目标并行性。因此,如果您有四个核心,最多有三个作业将异步执行。由于每5个作业强制等待完成一次,因此将执行3次并行执行,然后在每次循环迭代中执行2次并行执行。
如果您想获得一个特定的执行策略,比如您选择的并行性,最好的方法是指定一个正确配置的执行器。然后,您应该让执行程序管理并行性,而不是在循环中等待。
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
这允许五个并行作业,但会让五个线程中的每一个在一个完成后立即拿起一个新作业,而不是等待下一个循环迭代。
但是当你说
所以,我想做所有20个任务,尽可能异步
现在还不清楚为什么你在安排了五个工作之后还要强制等待。可通过以下方式实现最大并行性:
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks
这可能会产生与作业一样多的线程,除非一个作业在调度完所有作业之前完成,因为在这种情况下,工作线程可能会选择一个新作业。
但是这个逻辑根本不需要一个CompletableFuture
。您还可以使用:
ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
但当您的工作不涉及I/O或任何其他类型的等待响应时。释放CPU后,没有必要创建比CPU内核更多的线程。配置为处理器数量的池更可取。
ExecutorService pool = Executors.newWorkStealingPool(
Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();
在您的特殊情况下,这可能会运行得更慢,因为当线程比内核多时,您的作业会使用html" target="_blank">系统时间来显示运行得更快,但实际上运行得更少。但是对于普通的计算任务,这将提高性能。
您可以使用allOf作为一个任务同时运行多个任务。首先,我创建了5个任务的组合(与你的问题相同),但后来我添加了10个任务(只运行了两次),获得了一半的执行时间。
for (int i = 0; i < 2; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
// and so on until ten
CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);
combined.get();
}
刚刚开始探索reactor项目及其抽象、Mono和Flux,并希望了解与Java8 barebones CompletableFuture的基本区别。 下面是我的一个简单代码: 首先,并不奇怪。通过ForkJoinPool调度函数的执行,“end”行立即打印,程序终止,因为主线程在这里确实很短--正如预期的那样。 但是将主线程阻塞在那里。此外,在函数中打印的线程名是主线程。因此,我看到的是顺序/阻
我想用Java 8-9启动线程,使用异步模式,这些是我的类和我的线程: 我有三根线。我的类包含单个方法 按以下方式设置我的%s: 正在创建线程: 最后,我的问题是我如何使用异步模式启动这三个线程。
我正在尝试转换
我正在试验Python 3.4的asyncio模块。由于没有使用asyncio的MongoDB生产就绪包,我编写了一个小包装类,在执行器中执行所有mongo查询。这是包装: 我想异步执行插入,这意味着执行它们的协程不想等待执行完成。asyncio手册声明,所以我构造了这个测试脚本: 当我运行脚本时,我得到以下结果: 应该有一行指示mongo查询已完成。当我从这个协程而不是使用异步IO运行它时,我可
这是我正在研究的完全未来的例子 首先我从SupplySync调用compose方法,在这里我执行方法composeMethod,有三毫秒的延迟,然后它将创建一个文件并返回一个字符串作为结果。完成后,我调用Run方法,它只打印一个方法,然后有一个非阻塞方法从主线程运行。 我在这里面临的问题是,主线程执行完nonblockingmethod()并在3毫秒延迟之前退出进程,而随后的composeMeth
当我使用完全的未来。allOf()组合javadoc中描述的独立可完成的未来,在提供给该方法的所有未来之后,它不能可靠地完成。例如。: 结果如下: 我希望日志“Joined”和“Completed allOf”写在“Completed f1”和“Completed f2”之后。为了让事情变得更加混乱,阵列中的未来顺序似乎是头等大事。如果我换了台词 到 结果输出更改为: 更糟糕的是,如果我多次运行完