假设我有这样的代码:
Collections.singletonList(10)
.parallelStream() // .stream() - nothing changes
.flatMap(x -> Stream.iterate(0, i -> i + 1)
.limit(x)
.parallel()
.peek(m -> {
System.out.println(Thread.currentThread().getName());
}))
.collect(Collectors.toSet());
输出是相同的线程名称,所以这里没有parallel
的好处--我的意思是有一个线程来完成所有的工作。
在flatmap
内部有以下代码:
result.sequential().forEach(downstream);
我理解强制sequential
属性如果“outer”流将是并行的(它们可能会阻塞),“outer”将不得不等待“flatmap”完成,反过来(因为使用了相同的公共池),但为什么总是强制这样做呢?
这是一个可以在以后的版本中改变的东西吗?
有两个不同的方面。
首先,只有一个流水线,它要么是顺序的,要么是并行的。在内部流处选择顺序或并行是无关紧要的。请注意,您在引用的代码段中看到的downdown
使用者代表整个后续流管道,因此在代码中,以.collectors.toset());
结尾,这个使用者最终将把结果元素添加到一个不是线程安全的set
实例中。因此与单个使用者并行处理内部流将中断整个操作。
如果一个外部流被拆分,引用的代码可能会同时被调用,不同的使用者会添加到不同的集合中。这些调用中的每一个都将处理外部流映射到不同内部流实例的不同元素。由于外部流仅由单个元素组成,因此不能拆分。
实现的方式也是为什么flatMap()之后的filter()在Java流中“不完全”懒惰的原因?发出,因为在内部流上调用foreach
,该内部流将把所有元素传递给下游使用者。如这个答案所示,支持懒惰和子流拆分的替代实现是可能的。但这是一种根本不同的实现方式。流实现的当前设计主要是通过使用者组合来工作的,因此最终,源拆分器(以及从源拆分器拆出的那些)接收一个consumer
,它以TryAdvance
或ForeachReflay
的形式表示整个流管道。相反,链接答案的解决方案会拆分器组合,生成一个新的spliterator
委托给源拆分器。我想,这两种方法都有优点,但我不确定OpenJDK实现在反过来工作时会损失多少。
问题内容: 假设我有以下代码: 输出是相同的线程名称,因此这里没有任何好处-我的意思是,只有一个线程可以完成所有工作。 里面是这段代码: 我知道如果“外部”流是并行的(可能会阻塞),则强制该属性,“外部”将不得不等待“ flatMap”完成,反之亦然(因为使用了相同的公共池),但是为什么 总是 强迫那个? 那是在以后的版本中 可能会 更改的事情之一吗? 问题答案: 有两个不同方面。 首先,只有一个
我已经讨论了前面的几个问题,比如java stream中的遭遇顺序保存、Brian Goetz的回答,以及javadoc for stream。reduce()和java。util。流包javadoc,但我仍然无法理解以下内容: 以这段代码为例: 为什么减少总是保持遭遇顺序? 到目前为止,经过几十次运行,产量是相同的
我有一个包含240个项目的列表,使用完整发送此列表需要1个多小时。 所以我跟着这篇文章同时发送,以尽量减少响应时间,但是里面的代码永远不会执行:
想要编写 Excel 公式,该公式将根据行的匹配项对值求和。要匹配的单元格可以多次出现,并且基于映射图例,它们应返回值的总和。 在黄色单元格中,我试图根据G3: G8中名称的匹配来计算B9: B21范围内的值的总和,根据映射图例到Item1,然后是Item2和Item 3。此外,我想考虑在日期1、日期2和日期3之间进行的总和。下面的SumProduct公式仅适用于单个日期的sum数组,而不适用于日
我对webflux比较陌生,我想找到解决方案,在有条件时避免嵌套flatMap: 如何防止这种分支条件平面映射混乱?我无法想象如果我在项目中有另一个实体。会有更多嵌套的平面地图?
在我的应用程序中,有多个企业。每个企业登录并做一些动作,如上传数据,然后Kafka生产者采取的数据,并发送到主题。另一方面,Kafka消费者使用来自主题的数据并执行业务逻辑。并保存到数据库中。在这种情况下,当单个企业登录时,一切都很完美。但当多个企业登录时,则Kafka依次消费。也就是说,我如何使过程并行?在多个客户端请求上。提前谢了。