当前位置: 首页 > 知识库问答 >
问题:

为什么并行流不使用ForkJoinPool的所有线程?[副本]

蓬新
2023-03-14

将并行流执行提交到您自己的forkJoinpool:yourfjp.submit(()->stream.parallel().foreach(doSomething));

所以,我这样做了:

import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.IntStream;
import com.google.common.collect.Sets;

public class Main {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        ForkJoinPool forkJoinPool = new ForkJoinPool(1000);

        IntStream stream = IntStream.range(0, 999999);

        final Set<String> thNames = Collections.synchronizedSet(new HashSet<String>());

        forkJoinPool.submit(() -> {
            stream.parallel().forEach(n -> {

                System.out.println("Processing n: " + n);
                try {
                    Thread.sleep(500);
                    thNames.add(Thread.currentThread().getName());
                    System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
        }).get();
    }
}

我创建了一组线程名,以查看创建了多少线程,并记录了池中活动线程的数量,这两个数字都不超过16,所以这意味着这里的并行度不超过16(为什么甚至是16?)。如果我不使用forkJoinPool,我得到4作为并行度,这是根据我拥有的处理器数量。

为什么它给我16而不是1000?

共有1个答案

马坚
2023-03-14

更新

最初,这个答案是一个详细的解释,声称forkjoinpool施加了反压力,甚至没有达到规定的并行度级别,因为总是有空闲的工作人员可以处理流。

那是不正确的.

for (int i = 0; i < 1_000_000; ++i) {
   forkJoinPool.submit(() -> {
      try {
         Thread.sleep(1);
         thNames.add(Thread.currentThread().getName());
         System.out.println("Size: " + thNames.size() + " activeCount: " + forkJoinPool.getActiveThreadCount() + " parallelism: " + forkJoinPool.getParallelism());
      } catch (Exception e) {
         throw new RuntimeException(e);
      }
   });
}
 类似资料:
  • 参考Java的Fork/Join vs ExecutorService-何时使用哪个?,传统的线程池通常用于处理许多独立请求;用于处理连贯/递归任务,其中一个任务可能会产生另一个子任务并稍后加入。 那么,为什么Java-8的默认使用而不是传统的执行器? 在许多情况下,我们在或之后使用,然后提交一个函数式接口作为参数。从我的角度来看,这些任务是独立的,不是吗?

  • output指示在1s暂停之前执行16个流元素,然后再执行16个元素,依此类推。因此,即使forkjoinpool是用100个线程创建的,也只有16个线程被使用。 当我使用超过23个线程时,就会出现这种模式:

  • 问题内容: 假设我有如下代码: 使用时没有问题,但是当我使用并行时它会加倍。例如: Q1 :为什么并行性会在其中发生两倍? Q2 :如何避免这种奇怪的行为? 问题答案: 如果在工作线程中引发了异常,则Fork / Join池通常会尝试在调用者线程内重新创建异常,并将原始异常设置为其原因。这就是您认为的“加倍”。当您仔细查看堆栈跟踪时,您会注意到这两个异常之间的差异。 公用池在这方面没有什么不同。但

  • 阅读Java8的并行流API:https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html 还不清楚在使用这个流式API的并行性时,如何调优要使用的线程数? 计划在一个非常特定的机器类型和一致的数据类型上运行它,所以我想我可以在一组不同的设置上对它进行基准测试,然后使用最佳数量的线程。

  • 当我运行以下代码时,8个可用线程中只有2个运行,有人能解释为什么会这样吗?我如何以这样一种方式更改代码,它将利用所有8个线程? 处理器:8 [main]向1 [main]树发送命令,其中数据1为真 [main]向6 [forkjoinpool.commonpool-worker-2]发送命令到5 [main]树,其中数据6为真 [forkjoinpool.commonpool-worker-2]树

  • 在Java8中,可以设置一个定制的forkJoinPool供并行流使用,而不是公共池。 我的问题是它在技术上是如何发生的? 流以任何方式都不知道它被提交给了自定义的forkJoinpool并且没有直接访问它的权限。那么最终如何使用正确的线程来处理流的任务呢? 我试着看源代码,但没有用。我的最佳猜测是在提交时的某个点设置了某个threadLocal变量,然后在稍后由流使用。如果是这样的话,为什么语言