当前位置: 首页 > 知识库问答 >
问题:

在flatMap的结果上使流并行

邹举
2023-03-14

请考虑以下简单代码:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

很长一段时间以来,我认为即使在flatmap之后,Java也会对元素进行并行执行。但是上面的代码打印了所有的“thread:main”,这证明了我的想法是错误的。

flatmap之后使其并行的一个简单方法是收集并再次流:

Stream.of(1)
  .flatMap(x -> IntStream.range(0, 1024).boxed())
  .parallel() // Moving this before flatMap has the same effect because it's just a property of the entire stream
  .collect(Collectors.toList())
  .parallelStream()
  .forEach(x -> {
     System.out.println("Thread: " + Thread.currentThread().getName());
  });

==========关于问题的更多澄清=========

从一些回答来看,我的问题似乎传达得并不充分。正如@Andreas所说,如果我从3个元素的流开始,那么可能有3个线程在运行。

但我真正的问题是:Java Stream使用了一个通用的ForkJoinPool,它的默认大小等于内核数少一个。现在假设我有64个核心,那么我希望上面的代码会在flatmap之后看到许多不同的线程,但实际上,它只看到一个线程(在Andreas的情况下是3个线程)。顺便说一下,我确实使用了isParallel来观察流是并行的。

老实说,我问这个问题不是纯粹出于学术兴趣。我在一个项目中遇到了这个问题,该项目提供了一个用于转换数据集的长链流操作。该链以单个文件开始,并通过flatmap分解为大量元素。但显然,在我的实验中,它并没有充分利用我的机器(它有64个核心),而是只使用了一个核心(从cpu使用情况观察)。

共有1个答案

董凡
2023-03-14

我在想[...]关于flatmap的设计选择,它只在调用前并行化流,而不在调用后并行化。

你搞错了。flatmap之前和之后的所有步骤都是并行运行的,但它只是在线程之间拆分原始流。然后,flatmap操作由一个这样的线程处理,它的流不被拆分。

由于原始流只有1个元素,因此不能拆分,因此parallel不起作用。

 类似资料: