Fork-Join 框架(Fork-Join framework)
优质
小牛编辑
127浏览
2023-12-01
fork-join框架允许在几个worker上中断某个任务,然后等待结果将它们组合起来。 它在很大程度上利用了多处理器机器的容量。 以下是fork-join框架中使用的核心概念和对象。
Fork
Fork是一个过程,在这个过程中,任务将自身分成较小且独立的子任务,这些子任务可以同时执行。
语法 (Syntax)
Sum left = new Sum(array, low, mid);
left.fork();
这里Sum是RecursiveTask的子类,left.fork()将任务转换为子任务。
Join
Join是一个任务在子任务完成执行后加入子任务的所有结果的过程,否则它会一直等待。
语法 (Syntax)
left.join();
这里左边是Sum类的一个对象。
ForkJoinPool
它是一个特殊的线程池,设计用于fork-and-join任务拆分。
语法 (Syntax)
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
这是一个新的ForkJoinPool,具有4个并行级别的CPU。
RecursiveAction
RecursiveAction表示不返回任何值的任务。
语法 (Syntax)
class Writer extends RecursiveAction {
@Override
protected void compute() { }
}
RecursiveTask
RecursiveTask表示返回值的任务。
语法 (Syntax)
class Sum extends RecursiveTask<Long> {
@Override
protected Long compute() { return null; }
}
例子 (Example)
以下TestThread程序显示了基于线程的环境中Fork-Join框架的用法。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
public static void main(final String[] arguments) throws InterruptedException,
ExecutionException {
int nThreads = Runtime.getRuntime().availableProcessors();
System.out.println(nThreads);
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i;
}
ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
System.out.println(result);
}
static class Sum extends RecursiveTask<Long> {
int low;
int high;
int[] array;
Sum(int[] array, int low, int high) {
this.array = array;
this.low = low;
this.high = high;
}
protected Long compute() {
if(high - low <= 10) {
long sum = 0;
for(int i = low; i < high; ++i)
sum += array[i];
return sum;
} else {
int mid = low + (high - low)/2;
Sum left = new Sum(array, low, mid);
Sum right = new Sum(array, mid, high);
left.fork();
long rightResult = right.compute();
long leftResult = left.join();
return leftResult + rightResult;
}
}
}
}
这将产生以下结果。
输出 (Output)
32
499500