当前位置: 首页 > 知识库问答 >
问题:

Java中的嵌套并行流

华星驰
2023-03-14
public static void main(String[] args) {
    IntStream.range(0, 10).forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}
0
    0 0
    0 1
    0 2
    0 3
    0 4
    0 5
    0 6
    0 7
    0 8
    0 9
1
    1 0
    1 1
    1 2
    1 3
    1 4
    1 5
    1 6
    1 7
    1 8
    1 9
2
    2 0
    2 1
    2 2
    2 3
...

示例2:

public static void main(String[] args) {
    IntStream.range(0, 10).parallel().forEach(i -> {
        System.out.println(i);
        IntStream.range(0, 10).parallel().forEach(j -> {
            System.out.println("    " + i + " " + j);
        });
    });
}

如果流被设置为parallel(),就像在第二个示例中那样,我可以想象内部工作者在等待外部工作队列中的线程可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,而默认线程池只有有限数量的线程。但是,似乎不会出现死锁:

6
5
8
    8 6
0
1
    6 2
7
    1 6
    8 5
    7 6
    8 8
2
    0 6
    0 2
    0 8
    5 2
    5 4
    5 6
    0 5
    2 6
    7 2
    7 5
    7 8
    6 4
    8 9
    1 5
 ...

两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成之后才能完成,因为在每个并行流的末端有一个完成屏障。

如何通过共享的工作线程池来管理这些内部流和外部流之间的协调,而不产生任何死锁?

共有1个答案

严扬
2023-03-14

并行流后面的线程池是公共池,您可以通过forkjoinpool.commonpool()获得它。它通常使用NumberOfProcessors-1个workers。为了像您所描述的那样解决依赖关系,如果(一些)当前的工作者被阻塞并且死锁成为可能,它能够动态地创建额外的工作者。

然而,这不是你的案子的答案。

forkjoinpool中的任务有两个重要功能:

    null

当一个线程执行这样的任务a并加入子任务B时,它不只是等待阻塞子任务完成其执行,而是同时执行另一个任务C。当C完成时,线程回到A并检查B是否完成。注意,B和C可以(而且很可能是)相同的任务。如果B完成了,那么A已经成功地等待/加入了它(非阻塞!)。如果前面的解释不清楚,请参阅本指南。

现在当您使用并行流时,流的范围被递归地拆分为任务,直到任务变得如此小,以至于可以更高效地顺序执行。这些任务被放入公共池中的工作队列(每个工作者有一个)。因此,intstream.range(0,100).parallel().foreach所做的就是递归地拆分范围,直到它不再值得使用为止。每个最终任务,或者更确切地说是一组迭代,可以使用foreach中提供的代码顺序执行。此时,公共池中的工作者可以执行这些任务,直到完成所有任务,并且流可以返回为止。注意,调用线程通过加入子任务来帮助执行!

现在,在您的例子中,这些任务本身都使用了一个并行流。程序相同;将其拆分为较小的任务,并将这些任务放入公共池中的工作队列中。从forkjoinpool的角度来看,这些只是现有任务之上的附加任务。工作人员只是继续执行/加入任务,直到完成所有任务,并且外部流可以返回。

这就是您在输出中看到的:没有确定性的行为,没有固定的顺序。另外,也不能出现死锁,因为在给定的用例中,不会有阻塞线程。

您可以使用以下代码检查解释:

java prettyprint-override">    public static void main(String[] args) {
        IntStream.range(0, 10).parallel().forEach(i -> {
            IntStream.range(0, 10).parallel().forEach(j -> {
                for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
                System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
                for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
            });
        });
    }

您应该注意到主线程参与了内部迭代的执行,因此它不是(!)被封锁了。普通池工人只是一个接一个地挑任务,直到全部完成。

 类似资料:
  • 将“find any()”流操作放在哪里有关系吗?我应该直接在过滤器操作后放置它吗?还是嵌套的并行流是问题所在?

  • 问题内容: 首先,我想创建一个具有以下结构的自定义用户库: src: 库包1 ClassName0.java LibA.pack2 ClassName1.java 我对此没问题。后来我想将此库导入另一个项目并调用 (同时使用pack1和pack2这两个类),由于要求全名,因此将失败,即 如何一次导入整个库以同时使用pack1和pack2这两个类? Ps绝对不是所谓的“嵌套程序包”,但我不知道该如何

  • 问题内容: Java编程语言是否有任何扩展使创建嵌套函数成为可能?在很多情况下,我需要创建在另一个方法或for循环的上下文中仅使用一次的方法。到目前为止,尽管用Javascript可以很容易地完成,但我迄今仍无法完成。 例如,这无法在标准Java中完成: 问题答案: Java 8引入了lambda。 该语法可在定义了一种方法的任何接口上使用。因此,您可以将其与一起使用,但不能与一起使用。 是jav

  • 问题内容: 我必须与API进行交互,并且响应格式(根据我的阅读)似乎结构不良。我发现一个Google 网上论坛在这里回答了一个类似的问题,但是我在实现Response类来处理Gson.fromJson时遇到了麻烦。有没有我想念的例子? 问题答案: JSON对象可以由或Javabean类表示。这是一个使用Javabean的示例。 如下使用它:

  • 我有一门java课 在上面的场景中,示例具有子示例,这又是示例列表。此嵌套可以是 n 级。我想实现的是有一个示例列表,即扁平化上面的对象并将所有示例收集到最终列表中(收集所有n级示例)。一个明显的方法是递归。在Java中有什么方法可以更有效地实现它。我尝试了一些java 8概念,但它们不符合要求。

  • 如何使用JavaFX TableView实现嵌套的行(而不是列)?还是在TableView中合并行?