Fork-Join 框架(Fork-Join framework)

优质
小牛编辑
127浏览
2023-12-01

fork-join框架允许在几个worker上中断某个任务,然后等待结果将它们组合起来。 它在很大程度上利用了多处理器机器的容量。 以下是fork-join框架中使用的核心概念和对象。

Fork

Fork是一个过程,在这个过程中,任务将自身分成较小且独立的子任务,这些子任务可以同时执行。

语法 (Syntax)

Sum left  = new Sum(array, low, mid);
left.fork();

这里Sum是RecursiveTask的子类,left.fork()将任务转换为子任务。

Join

Join是一个任务在子任务完成执行后加入子任务的所有结果的过程,否则它会一直等待。

语法 (Syntax)

left.join();

这里左边是Sum类的一个对象。

ForkJoinPool

它是一个特殊的线程池,设计用于fork-and-join任务拆分。

语法 (Syntax)

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

这是一个新的ForkJoinPool,具有4个并行级别的CPU。

RecursiveAction

RecursiveAction表示不返回任何值的任务。

语法 (Syntax)

class Writer extends RecursiveAction {
   @Override
   protected void compute() { }
}

RecursiveTask

RecursiveTask表示返回值的任务。

语法 (Syntax)

class Sum extends RecursiveTask<Long> {
   @Override
   protected Long compute() { return null; }
}

例子 (Example)

以下TestThread程序显示了基于线程的环境中Fork-Join框架的用法。

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class TestThread {
   public static void main(final String[] arguments) throws InterruptedException, 
      ExecutionException {
      int nThreads = Runtime.getRuntime().availableProcessors();
      System.out.println(nThreads);
      int[] numbers = new int[1000]; 
      for(int i = 0; i < numbers.length; i++) {
         numbers[i] = i;
      }
      ForkJoinPool forkJoinPool = new ForkJoinPool(nThreads);
      Long result = forkJoinPool.invoke(new Sum(numbers,0,numbers.length));
      System.out.println(result);
   }  
   static class Sum extends RecursiveTask<Long> {
      int low;
      int high;
      int[] array;
      Sum(int[] array, int low, int high) {
         this.array = array;
         this.low   = low;
         this.high  = high;
      }
      protected Long compute() {
         if(high - low <= 10) {
            long sum = 0;
            for(int i = low; i < high; ++i) 
               sum += array[i];
               return sum;
         } else {	    	
            int mid = low + (high - low)/2;
            Sum left  = new Sum(array, low, mid);
            Sum right = new Sum(array, mid, high);
            left.fork();
            long rightResult = right.compute();
            long leftResult  = left.join();
            return leftResult + rightResult;
         }
      }
   }
}

这将产生以下结果。

输出 (Output)

32
499500