为什么下面的代码不输出任何输出,而如果我们删除parallel,则输出0、1?
IntStream.iterate(0, i -> ( i + 1 ) % 2)
.parallel()
.distinct()
.limit(10)
.forEach(System.out::println);
尽管我知道理想情况下应该将限制放在不同的位置,但是我的问题与添加并行处理导致的差异更多有关。
真正的原因是 有序并行
.distinct()
是完整的屏障操作,如文档中所述:
保持
distinct()
并行管道的稳定性是相对昂贵的(要求操作充当一个完整的屏障,并具有大量缓冲开销),并且通常不需要稳定性。
“完全屏障操作”是指必须先执行所有上游操作,然后才能启动下游程序。Stream
API中只有两个完整的屏障操作:(.sorted()
每次)和.distinct()
(在有序并行情况下)。由于您有提供给您的非短路无限流,因此.distinct()
最终将导致无限循环。根据合同.distinct()
,不能仅以任何顺序将元素发射到下游:它应始终发射第一个重复元素。从理论上讲,可以.distinct()
更好地实现并行有序,但实现起来会更加复杂。
对于解决方案,@
user140547是正确的:.unordered()
在.distinct()
此之前将distinct()
算法切换为无序算法(仅使用共享ConcurrentHashMap
存储所有观察到的元素并将每个新元素发射到下游)之前添加。请注意,.unordered()
之后 添加.distinct()
将无济于事。
我试图理解为什么下面的Java程序给出了< code>OutOfMemoryError,而对应的程序却没有< code >。parallel()没有。 我有两个问题: > 该程序的预期输出是什么? 不带<代码>。parallel()看起来这只是输出< code>sum(1 2 3...)这意味着它只是“停留”在平面图中的第一个流,这是有意义的。 对于并行,我不知道是否有预期的行为,但我的猜测是它以
因此,我一直在尝试并行运行流,并基于API文档和我阅读的其他支持材料监视它们的行为。 我创建了两个并行流并运行,其中一个流是有序的,另一个是无序的。然后,我使用打印结果(以确保在distinct运行后看到流的最终遇到顺序),并且可以清楚地看到无序版本并不保持原始顺序,但具有较大的数据集,将明显提高并行性能。 有API注释建议,当流无序时,和操作也应该更有效地并行运行,因为您不必检索第一个元素,而可
我试图理解方法是如何精确地处理并行流的,我不理解为什么下面的代码不返回这些字符串的串联。 代码如下: 该代码仅适用于顺序流,但对于并行流,它不会返回串联。每次输出都不同。有人能解释一下那里发生了什么事吗?
示例2: 如果流被设置为,就像在第二个示例中那样,我可以想象内部工作者在等待外部工作队列中的线程可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,而默认线程池只有有限数量的线程。但是,似乎不会出现死锁: 两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成之后才能完成,因为在每个并行流的末端有一个完成屏障。 如何通过共享的工作线程池来
相反,如果我从web服务器收集数据,为什么不直接使用相同的节点进行事件处理呢?这些操作已经由负载均衡器分布在节点上,我在web服务器上使用负载均衡器。我可以在相同的JVM实例上创建执行器,并将事件从web服务器异步发送到执行器,而不涉及任何额外的IO请求。我还可以监视web服务器中的执行器,并确保执行器处理了事件(至少一次或恰好一次处理保证)。通过这种方式,管理我的应用程序将容易得多,而且由于不需
我有一个相当复杂的过程,需要几个层次的嵌套for循环。 只针对一组特定的条件执行操作。换句话说: