当前位置: 首页 > 知识库问答 >
问题:

为什么ParallelStream不会在递归中使用所有Commonpool的线程?

吕淮晨
2023-03-14

当我运行以下代码时,8个可用线程中只有2个运行,有人能解释为什么会这样吗?我如何以这样一种方式更改代码,它将利用所有8个线程?

package il.co.roy;

import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

public class Tree<T>
{
    private final T data;
    private final Set<Tree<T>> subTrees;

    public Tree(T data, Set<Tree<T>> subTrees)
    {
        this.data = data;
        this.subTrees = subTrees;
    }

    public Tree(T data)
    {
        this(data, new HashSet<>());
    }

    public Tree()
    {
        this(null);
    }

    public T getData()
    {
        return data;
    }

    public Set<Tree<T>> getSubTrees()
    {
        return subTrees;
    }

    @Override
    public boolean equals(Object o)
    {
        if (this == o)
            return true;
        if (o == null || getClass() != o.getClass())
            return false;
        Tree<?> tree = (Tree<?>) o;
        return Objects.equals(data, tree.data) &&
                Objects.equals(subTrees, tree.subTrees);
    }

    @Override
    public int hashCode()
    {
        return Objects.hash(data, subTrees);
    }

    @Override
    public String toString()
    {
        return "Tree{" +
                "data=" + data +
                ", subTrees=" + subTrees +
                '}';
    }

    public void sendCommandAll()
    {
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] sending command to " + data);
        try
        {
            Thread.sleep(5000);
        } catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        if (data != null)
            System.out.println("[" + Thread.currentThread().getName() + "] tree with data " + data + " got " + true);
        subTrees.parallelStream()
//              .map(Tree::sendCommandAll)
                .forEach(Tree::sendCommandAll);
//              .reduce(true, (aBoolean, aBoolean2) -> aBoolean && aBoolean2);
    }
}
package il.co.roy;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class Main
{
    public static void main(String... args)
    {
        System.out.println("Processors: " + Runtime.getRuntime().availableProcessors());


        final Tree<Integer> root = new Tree<>(null,
                Set.of(new Tree<>(1,
                        IntStream.range(2, 7)
                                        .boxed()
                                        .map(Tree::new)
                                        .collect(Collectors.toSet()))));

        root.sendCommandAll();

//      IntStream.generate(() -> 1)
//              .parallel()
//              .forEach(i ->
//              {
//                  System.out.println(Thread.currentThread().getName());
//                  try
//                  {
//                      Thread.sleep(5000);
//                  } catch (InterruptedException e)
//                  {
//                      e.printStackTrace();
//                  }
//              });
    }
}
root (data is `null`)
  |- 1
     |- 2
     |- 3
     |- 4
     |- 5
     |- 6

处理器:8
[main]向1
[main]树发送命令,其中数据1为真
[main]向6
[forkjoinpool.commonpool-worker-2]发送命令到5
[main]树,其中数据6为真
[forkjoinpool.commonpool-worker-2]树,其中数据5为真
[forkjoinpool.commonpool-worker-2]发送命令到4
[forkjoinpool.commonpool-worker-2]树,其中数据4为真-worker-2]数据为3的树为真
[forkjoinpool.commonpool-worker-2]向2发送命令
[forkjoinpool.commonpool-worker-2]数据为2的树为真

(记录在案,当我在main.java中执行注释的代码时,JVM将使用commonpool可用的所有7(+1)个线程)

如何改进代码?

共有1个答案

汝昀
2023-03-14

正如本答案的后半部分所解释的,处理hashmaphashset时的线程利用率取决于支持数组中元素的分布,后者取决于hashcodes。特别是当与(默认)容量相比,元素数量较少时,这可能会导致工作分裂。

一个简单的解决方法是使用new ArrayList<>(subTrees).parallelStream()代替subTrees.parallelStream()

但是请注意,您的方法在处理子节点之前执行当前节点的实际工作(在这个示例中使用sleep),这也降低了潜在的并行性。

您可以使用

public void sendCommandAll() {
    if(subTrees.isEmpty()) {
        actualSendCommand();
        return;
    }
    List<Tree<T>> tmp = new ArrayList<>(subTrees.size() + 1);
    tmp.addAll(subTrees);
    tmp.add(this);
    tmp.parallelStream().forEach(t -> {
        if(t != this) t.sendCommandAll(); else t.actualSendCommand();
    });
}

private void actualSendCommand() {
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] sending command to " + data);
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    if (data != null)
        System.out.println("[" + Thread.currentThread().getName()
                         + "] tree with data " + data + " got " + true);
}

这允许在处理子节点的同时处理当前节点。

 类似资料:
  • 参考Java的Fork/Join vs ExecutorService-何时使用哪个?,传统的线程池通常用于处理许多独立请求;用于处理连贯/递归任务,其中一个任务可能会产生另一个子任务并稍后加入。 那么,为什么Java-8的默认使用而不是传统的执行器? 在许多情况下,我们在或之后使用,然后提交一个函数式接口作为参数。从我的角度来看,这些任务是独立的,不是吗?

  • output指示在1s暂停之前执行16个流元素,然后再执行16个元素,依此类推。因此,即使forkjoinpool是用100个线程创建的,也只有16个线程被使用。 当我使用超过23个线程时,就会出现这种模式:

  • 将并行流执行提交到您自己的forkJoinpool:yourfjp.submit(()->stream.parallel().foreach(doSomething)); 所以,我这样做了: 我创建了一组线程名,以查看创建了多少线程,并记录了池中活动线程的数量,这两个数字都不超过16,所以这意味着这里的并行度不超过16(为什么甚至是16?)。如果我不使用forkJoinPool,我得到4作为并行度

  • 问题内容: 因此,我在闲逛时使用了递归,我发现使用递归的循环比常规的while循环要慢得多,我想知道是否有人知道为什么。我已经包括了我下面所做的测试: 但是,在上一次测试中,我注意到如果删除该语句,则表明速度略有提高,因此我想知道if语句是否是造成循环速度差异的原因? 问题答案: 您已将函数编写为尾递归。在许多命令式和函数式语言中,这将触发尾部递归消除,在这种情况下,编译器用简单的JUMP替换了C

  • Java 8中的默认“paralellStream()”使用公共的ForkJoinPool,如果在提交任务时公共池线程耗尽,这可能是一个延迟问题。然而,在许多情况下,有足够的CPU功率可用,并且任务足够短,因此这不是一个问题。如果我们确实有一些长期运行的任务,这当然需要仔细考虑,但对于这个问题,我们假设这不是问题所在。 然而,用实际上不做任何CPU限制工作的I/O任务填充是一种引入瓶颈的方法,即使

  • 我有一个(co?)递归函数对,它们处理元组列表,并根据一些开始和结束条件将它们折叠成批处理。 我做得不多,所以我可能很愚蠢。 我已经修改了一个简单的非尾部递归版本,通过明确引入一个“tot”参数来构成当前折叠状态,我认为这是尾部递归的,但我在大输入上得到了可怕的堆栈溢出。。。。(在调试器和(调试)中)。exe) 作为一个明确的折叠,可能有更好的方法来做到这一点...但这几乎不是重点,重点是为什么它