在代码下面执行
List.of(1, 2, 3, 4).stream()
.map(
integer -> {
System.out.println(
"Before parallel operator : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.parallel()
.map(
integer -> {
System.out.println(
" After parallel operator : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.forEach(
integer -> {
System.out.println(" For Each : " + Thread.currentThread().getName() + " : " + integer);
});
输出:
Before parallel operator : main : 3
Before parallel operator : ForkJoinPool.commonPool-worker-19 : 2
Before parallel operator : ForkJoinPool.commonPool-worker-23 : 1
Before parallel operator : ForkJoinPool.commonPool-worker-5 : 4
After parallel operator : main : 6
After parallel operator : ForkJoinPool.commonPool-worker-23 : 2
After parallel operator : ForkJoinPool.commonPool-worker-19 : 4
After parallel operator : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 8
For Each : main : 12
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : ForkJoinPool.commonPool-worker-5 : 16
除了3号元素,其他都是平行的吗?想了解并行运算符在后续调用中行为吗?
并行运算符在哪里开始,并行如何继续?
在调用终端操作(例如 forEach
或 collect
)之前,不会处理流,稍后会对此进行更多介绍。因此,回答您的问题,“并行运算符在哪里启动,并行性如何继续?
文档怎么说?
文件中明确说明了这一点:
可以使用基流修改流的模式。sequential()和基流。parallel()操作。最新的顺序或并行模式设置适用于整个流管道的执行
一个小演示
现在考虑以下代码(原谅我的System.out
,这是出于演示目的)。如果我们在并行
和顺序
之间切换,整个管道都会发生变化,而不仅仅是后续运算符。
System.out.println("=== Creating stream s1 as 1,2,3,4");
var s1 = List.of(1, 2, 3, 4).stream();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("=== s2 results of applying map to s1");
var s2 = s1.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("=== s3 results of applying parallel to s2");
var s3 = s2.parallel();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("=== s4 results of applying map to s3");
var s4 = s3.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("=== s5 results of applying sequential to s4");
var s5 = s4.sequential();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("s5 is parallel? " + s5.isParallel());
这将输出以下内容:
=== Creating stream s1 as 1,2,3,4
s1 is parallel? false
=== s2 results of applying map to s1
s1 is parallel? false
s2 is parallel? false
=== s3 results of applying parallel to s2
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
=== s4 results of applying map to s3
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
s4 is parallel? true
=== s5 results of applying sequential to s4
s1 is parallel? false
s2 is parallel? false
s3 is parallel? false
s4 is parallel? false
s5 is parallel? false
现在,当您调用诸如forEach或collect之类的终端操作符时,它将在处理过程中只考虑顺序流,即使在中间在中间。如文档所述,最新应用的模式用于整个管道。
这有什么用?
你可能会问。可以通过与终端操作员“打破”管道来更改管道中间的行为。例如,以您的示例为例,如果我们在第一个映射
之后立即应用 collect
,则第一个映射
将按顺序执行,然后并行将仅应用于后续运算符,但实际上,这现在是一个不同的管道,因为所有内容都是在列表的中间收集的。
java prettyprint-override">List.of(1, 2, 3, 4).stream()
.map(integer -> {
System.out.println("Before stream : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.collect(Collectors.toList())
.stream()
.parallel()
.map(integer -> {
System.out.println("After parallel stream : " + Thread.currentThread().getName() + " : " + integer);
return integer * 2;
})
.forEach(integer -> System.out.println("For Each : " + Thread.currentThread().getName() + " : " + integer));
这将输出如下内容:
Before stream : main : 1
Before stream : main : 2
Before stream : main : 3
Before stream : main : 4
After parallel stream : main : 6
After parallel stream : ForkJoinPool.commonPool-worker-23 : 2
After parallel stream : ForkJoinPool.commonPool-worker-5 : 4
After parallel stream : ForkJoinPool.commonPool-worker-19 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 16
For Each : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : main : 12
注意第一个< code>map是如何顺序执行的,而其余的操作符是并行执行的。
诸如RxJava之类的可观察流实现与观测
运算符有不同的用法,但它们也是一种完全不同的做事方式。
我一直在尝试将更多的函数式编程融入到我所做的事情中,因为我编写的代码具有无副作用的性质,并且在并发代码中具有实用性。我遇到了需要过滤掉java流的连续元素的情况,并且没有比简单的旧命令方法更好的函数方法了。假设我有一个记录参数的程序,我想过滤掉两个连续的元素。例如,。我在日志中想要的是。 我提出了几种方法,但没有一种方法比使用for循环更容易理解,该循环索引了我需要过滤掉的内容。 这似乎是一件很平
问题内容: 我想动态更新元素的文本: 我是jQuery的新手,因此对我来说,这项任务似乎非常具有挑战性。有人可以指出我要使用的功能/选择器吗? 如果可能的话,我想在不为需要更改的文本添加新容器的情况下执行此操作。 问题答案: 在Javascript中,该属性为您提供元素的所有子节点,包括文本节点。 因此,如果您知道要更改的文本始终是元素中的第一件事,那么请给出以下HTML: 您可以这样做: 当然,
在里面https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html#parallel-数据流,有一个描述 操作符子任务的数量是该特定操作符的并行度。流的并行性始终是其生成操作符的并行性。同一程序的不同操作符可能具有不同的并行级别。 我不明白什么是流的并行性总是它的生成操作符的并
我在rxjava/rxscala中使用.防抖运算符来捕获一些发生在彼此给定时间段内的事件,但希望使给定时间段可控。时间段。防抖用途作为参数给出。 理想情况下,我希望能够通过考试。去抖动操作符使用的最新值确定去抖动使用的时间段的可观测值。差不多吧。去抖动(timePeriodController,TimeUnit.Seconds)。 我看到了。debounce可以选择DebounceSelector
问题内容: 我想使用这项技术并更改svg颜色,但到目前为止我还没有做到这一点。我把它放在css中,但是无论如何我的图像总是黑色的。我的代码: 问题答案: 您不能以这种方式更改图像的颜色。如果将SVG加载为图像,则无法在浏览器中使用CSS或Javascript更改其显示方式。 如果你想改变你的SVG图像,你必须用它来装载,或使用在线。 如果要使用页面中的技术,则需要Modernizr库,您可以在其中
有没有一个“最佳实践”来改变流中的元素?我特别指的是流管道内的元素,而不是流管道外的元素。 例如,考虑这样的情况:我希望获取用户列表,为null属性设置默认值并将其打印到控制台。 编辑:官方的流java文档声明了少量的流操作,如forEach()和peek(),只能通过副作用来操作;这些应该小心使用。“鉴于这将是一个不干涉的行动,有什么特别使它危险呢?我所看到的例子涉及到管道之外,这显然是粗略的。