当前位置: 首页 > 知识库问答 >
问题:

并行流中的Fork-Join池

颜阳炎
2023-03-14

我已经搜索了网上的各种文章和堆栈溢出问题,但我不能找到这个完美的答案。有许多问题与此相近,但略有不同。

我们知道Java8Streams API在内部使用Fork-Join池。

现在我的问题是如何使用Fork-Join池来划分流管道中的任务?

假设我们有以下内容:

List myList =  inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+100 ).collect(Collectors.toList());
    null
List myList2 = inputList.parallelStream().filter( x -> x>0 )
    .map(x -> x+5 ).sorted().map(x -> x+5 ).collect(Collectors.toList());

共有1个答案

陆寒
2023-03-14

首先,您必须使用Parallel使fork-join pooly处于活动状态。这个答案稍微解释了spliterator是如何执行拆分的;但简单地说,拆分是使用流元素的源完成的,并且并行处理整个流水线。在您的示例中,它是filtermap(当然,它还包括terminal操作)。

对于有状态操作--事情更加复杂。让我们以distince为例,首先看看它是如何处理顺序情况的。

通常,您可能认为非并行不同可以使用哈希集实现--您的想法是正确的。hashset可以保存已经看到的所有值,而不处理(发送到下一个操作)其他元素--理论上可以使用非并行的distince操作。但是,如果已知是排序的怎么办?想想看,这意味着我们可以保留一个将标记为seen的元素(与前面的hashset)。基本上,如果你愿意:

 1,1,2,2,3

这意味着您的有状态操作可以在单个元素之上实现--而不是hashset;代码类似于:

T seen = null;
....
if(seen == null || (!currentElement.equals(seen)){
    seen = currentElement;
    // process seen;
}

但是只有当您知道流是sorted时,这种优化才是可能的,因为这样您就知道下一个元素要么与您已经看到的元素相同,要么是一个新的元素,这对于您以前在其他操作中看到的元素是不可能的--排序操作保证了这一点。

现在是如何实现Parallel Distinct的。你基本上问这个问题:

那么如何创建线程池

同样,从流的角度看,没有任何改变,forkjoinpool使用相同数量的线程--唯一改变的显然是流实现。

简单地说,如果您的有序的,内部实现将使用LinkedHashSet(实际上是它的多个实例,因为它在这种情况下确实会减少)来保留您的顺序,如果您不关心顺序,它将使用ConcurrentHashMap--也就是说,如果源代码不是有序的(就像Set)或者您使用了显式的称为UnorderedUnordered。如果您真的想知道它是如何完成的,也可以在实现中查找sorted

因此,底线是fork Join pool不会改变基于流的实现,它使用相同的模型。另一方面,根据您所拥有的操作,Stream API可能将一些有状态数据用于有状态的中间操作,例如hashset/concurrenthashmap或单个元素等。

 类似资料:
  • fork-join框架允许在几个worker上中断某个任务,然后等待结果将它们组合起来。 它在很大程度上利用了多处理器机器的容量。 以下是fork-join框架中使用的核心概念和对象。 Fork Fork是一个过程,在这个过程中,任务将自身分成较小且独立的子任务,这些子任务可以同时执行。 语法 (Syntax) Sum left = new Sum(array, low, mid); left.

  • 主要内容:Fork,Join,ForkJoinPool,RecursiveAction,递归任务,实例框架允许在几个工作进程中断某个任务,然后等待结果组合它们。 它在很大程度上利用了多处理器机器的生产能力。 以下是框架中使用的核心概念和对象。 Fork Fork是一个进程,其中任务将其分成可以并发执行的较小且独立的子任务。 语法 这里是的子类,方法将任务分解为子任务。 Join 连接()是子任务完成执行后任务加入子任务的所有结果的过程,否则它会持续等待。 语法 这里剩下的是类的一个对象。 For

  • 问题内容: 这是我对Java 8Stream框架的理解: 东西产生了源溪 该实现负责提供 BaseStream#parallel() 方法,该方法进而返回可以并行运行其操作的Stream。 尽管有人已经找到了一种将自定义线程池与Stream框架的并行执行结合使用的方法,但是我一生无法在Java 8 API中提及默认的Java 8并行Stream实现将使用ForkJoinPool#commonPoo

  • 某些内容创建源流 实现负责提供一个baseStream#parallel()方法,该方法反过来返回一个可以并行运行其操作的流。 虽然已经有人找到了在Stream Framework的并行执行中使用自定义线程池的方法,但我在Java 8 API中找不到任何关于默认的Java 8并行流实现将使用ForkJoinPool#commonPool()的提及。(集合#ParallelStream()、Stre

  • 本文向大家介绍Java Fork/Join框架,包括了Java Fork/Join框架的使用技巧和注意事项,需要的朋友参考一下 Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程。Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能。 和任何ExecutorService接口的实现一样,Fork/Jo

  • 问题内容: 我们在Java中使用了三种不同的多线程技术 -Fork / Join pool,Executor Service和CountDownLatch 叉子/加入池 (http://www.javacodegeeks.com/2011/02/java-forkjoin-parallel- programming.html ) Fork / Join框架旨在使分治算法易于并行化。这种类型的算法非