当前位置: 首页 > 知识库问答 >
问题:

来自哈希集中的并行流不并行运行

荣曾笑
2023-03-14

我有要并行处理的元素集合。当我使用列表时,并行性起作用。但是,当我使用set时,它不会并行运行。

public static void main(String[] args) {
    ParallelTest test = new ParallelTest();

    List<Integer> list = Arrays.asList(1,2);
    Set<Integer> set = new HashSet<>(list);

    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    System.out.println("set print");
    try {
        forkJoinPool.submit(() ->
            set.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }

    System.out.println("\n\nlist print");
    try {
        forkJoinPool.submit(() ->
            list.parallelStream().forEach(test::print)
        ).get();
    } catch (Exception e) {
        return;
    }   
}

private void print(int i){
    System.out.println("start: " + i);
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
    }
    System.out.println("end: " + i);
}
set print
start: 1
end: 1
start: 2
end: 2

list print
start: 2
start: 1
end: 1
end: 2

共有1个答案

赵佐
2023-03-14

我可以再现您看到的行为,其中的并行性与您指定的fork-join池并行性不匹配。在将fork-join池的并行度设置为10并将集合中的元素数增加到50之后,我看到基于列表的流的并行度仅上升到6,而基于集的流的并行度从未超过2。

但是,请注意,这种将任务提交到fork-join池以运行该池中的并行流的技术是一种实现“技巧”,并不保证能够工作。实际上,用于执行并行流的线程或线程池是未指定的。默认情况下,使用公共fork-join池,但在不同的环境中,最终可能使用不同的线程池。(考虑应用程序服务器中的容器。)

在java.util.Stream.AbstractTask类中,leaf_target字段决定了所做的拆分量,而拆分量又决定了可以实现的并行度。该字段的值基于forkJoinPool.getCommonPoolParallelism(),它当然使用公共池的并行性,而不是运行任务的任何池。

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "10");

再说一遍,这不是有保证的行为,将来可能会改变。但在可预见的将来,这种技术可能适用于JDK8实现。

更新2019-06-12:JDK-8190974在JDK 10中已被修复,该修复程序已被备份到即将发布的JDK 8u版本(8u222)。

 类似资料:
  • 问题内容: 我有要并行处理的元素的集合。当我使用时,并行性有效。但是,当我使用时,它不会并行运行。 我写了一个代码样本来说明问题: 这是我在Windows 7上获得的输出 我们可以看到中的第一个元素必须在处理第二个元素之前完成。对于,第二个元素在第一个元素结束之前开始。 您能否告诉我是什么原因导致此问题,以及如何使用集合避免发生此问题? 问题答案: 我可以重现您看到的行为,其中并行性与您指定的fo

  • 问题内容: 我想接受输入并对其应用并行流,然后我希望将输出作为列表。输入可以是任何列表或可以对其应用流的任何集合。 我在这里担心的是,如果我们要输出为map,我们可以从java中选择一个选项,例如 但是我看不到以线程安全的方式从并行流收集以提供列表作为输出的选项。我看到那里还有一个选择 通过这种方式,我们可以在collect方法中提供各种并发实现。但是我认为java.util.concurrent

  • 问题内容: 它是纠正与Java 8,你需要执行下面的代码确实获得从平行流Collection? 从CollectionAPI: 默认Stream parallelStream() 返回一个可能与此流作为其源的并行Stream。此方法允许返回顺序流。 从BaseStreamAPI: S parallel() 返回并行的等效流。可能由于流已经是并行的,或者因为基础流的状态被修改为并行而返回自身。 我需

  • 在Java8中,您需要执行以下代码才能从中获得并行流,这是否正确? 来自API: 默认流parallelStream()

  • 问题 我有一堆jUnit测试(许多具有自定义运行器,例如PowerMockRunner或JUnitParamsRunner)都在一些根包下(它们在的不同深度的不同子包中)。 我想收集包<code>tests<code>下的所有测试,并在不同的JVM中并行运行每个测试类。理想情况下,并行化是可配置的,但默认的number_of_cores也完全可以。请注意,我不想在自己的JVM中运行每个方法,而是要

  • 常见问题,Int Raku,如何合并,合并两个哈希? 说: 如何获取