请考虑以下简单代码:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
很长一段时间以来,我认为即使在flatmap
之后,Java也会对元素进行并行执行。但是上面的代码打印了所有的“thread:main”,这证明了我的想法是错误的。
在flatmap
之后使其并行的一个简单方法是收集并再次流:
Stream.of(1)
.flatMap(x -> IntStream.range(0, 1024).boxed())
.parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
.collect(Collectors.toList())
.parallelStream()
.forEach(x -> {
System.out.println("Thread: " + Thread.currentThread().getName());
});
==========关于问题的更多澄清=========
从一些回答来看,我的问题似乎传达得并不充分。正如@Andreas所说,如果我从3个元素的流开始,那么可能有3个线程在运行。
但我真正的问题是:Java Stream使用了一个通用的ForkJoinPool,它的默认大小等于内核数少一个。现在假设我有64个核心,那么我希望上面的代码会在flatmap
之后看到许多不同的线程,但实际上,它只看到一个线程(在Andreas的情况下是3个线程)。顺便说一下,我确实使用了isParallel
来观察流是并行的。
老实说,我问这个问题不是纯粹出于学术兴趣。我在一个项目中遇到了这个问题,该项目提供了一个用于转换数据集的长链流操作。该链以单个文件开始,并通过flatmap
分解为大量元素。但显然,在我的实验中,它并没有充分利用我的机器(它有64个核心),而是只使用了一个核心(从cpu使用情况观察)。
我在想[...]关于flatmap
的设计选择,它只在调用前并行化流,而不在调用后并行化。
你搞错了。flatmap
之前和之后的所有步骤都是并行运行的,但它只是在线程之间拆分原始流。然后,flatmap
操作由一个这样的线程处理,它的流不被拆分。
由于原始流只有1个元素,因此不能拆分,因此parallel
不起作用。
看起来map和flatMap返回不同的类型。 返回applySchema函数(或spark 1.3中的createDataFrame)所需的org.apache.spark.rdd.rdd[org.apache.spark.sql.row]。 但是,返回org.apache.spark.rdd.rdd[Any],我不能调用applySchema()。 如何在flatMap()之后使用applySc
我有一个struct类型的对象数组 现在我想要一个数组,其中每个对象数组的所有元素组成如下所示: 结果: 紧凑型:[[1,2][1,2][1,2][1,2]]扁平:[1,2,1,2,1,2] 由于平面图在Swift 4.1中已弃用,我尝试使用compactMap,但它给出了数组数组而不是单个数组。 如何通过compactMap实现,就像我通过flatMap实现一样。
我有一个价格对象,它有两个属性,成本和税。我希望能够流式处理价格对象列表,将成本和税收相加,然后汇总总数。 有没有办法在Java8流中完全做到这一点?我正在考虑这样的逻辑: