当前位置: 首页 > 知识库问答 >
问题:

并行运算符如何更改后续流元素上的流?

危飞跃
2023-03-14

在代码下面执行

List.of(1, 2, 3, 4).stream()
    .map(
        integer -> {
          System.out.println(
              "Before parallel operator : " + Thread.currentThread().getName() + " : " + integer);
          return integer * 2;
        })
    .parallel()
    .map(
        integer -> {
          System.out.println(
              " After parallel operator : " + Thread.currentThread().getName() + " : " + integer);
          return integer * 2;
        })
    .forEach(
            integer -> {
          System.out.println(" For Each : " + Thread.currentThread().getName() + " : " + integer);
        });

输出:

    Before parallel operator : main : 3
Before parallel operator : ForkJoinPool.commonPool-worker-19 : 2
Before parallel operator : ForkJoinPool.commonPool-worker-23 : 1
Before parallel operator : ForkJoinPool.commonPool-worker-5 : 4
 After parallel operator : main : 6
 After parallel operator : ForkJoinPool.commonPool-worker-23 : 2
 After parallel operator : ForkJoinPool.commonPool-worker-19 : 4
 After parallel operator : ForkJoinPool.commonPool-worker-5 : 8
 For Each : ForkJoinPool.commonPool-worker-19 : 8
 For Each : main : 12
 For Each : ForkJoinPool.commonPool-worker-23 : 4
 For Each : ForkJoinPool.commonPool-worker-5 : 16

除了3号元素,其他都是平行的吗?想了解并行运算符在后续调用中行为吗?

并行运算符在哪里开始,并行如何继续?

共有1个答案

岳炎彬
2023-03-14

在调用终端操作(例如 forEachcollect)之前,不会处理流,稍后会对此进行更多介绍。因此,回答您的问题,“并行运算符在哪里启动,并行性如何继续?

文档怎么说?

文件中明确说明了这一点:

可以使用基流修改流的模式。sequential()和基流。parallel()操作。最新的顺序或并行模式设置适用于整个流管道的执行

一个小演示

现在考虑以下代码(原谅我的System.out,这是出于演示目的)。如果我们在并行顺序之间切换,整个管道都会发生变化,而不仅仅是后续运算符。

System.out.println("=== Creating stream s1 as 1,2,3,4");
var s1 = List.of(1, 2, 3, 4).stream();
System.out.println("s1 is parallel? " + s1.isParallel());

System.out.println("=== s2 results of applying map to s1");
var s2 = s1.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());

System.out.println("=== s3 results of applying parallel to s2");
var s3 = s2.parallel();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());

System.out.println("=== s4 results of applying map to s3");
var s4 = s3.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());

System.out.println("=== s5 results of applying sequential to s4");
var s5 = s4.sequential();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("s5 is parallel? " + s5.isParallel());

这将输出以下内容:

=== Creating stream s1 as 1,2,3,4
s1 is parallel? false
=== s2 results of applying map to s1
s1 is parallel? false
s2 is parallel? false
=== s3 results of applying parallel to s2
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
=== s4 results of applying map to s3
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
s4 is parallel? true
=== s5 results of applying sequential to s4
s1 is parallel? false
s2 is parallel? false
s3 is parallel? false
s4 is parallel? false
s5 is parallel? false

现在,当您调用诸如forEach或collect之类的终端操作符时,它将在处理过程中只考虑顺序流,即使在中间在中间。如文档所述,最新应用的模式用于整个管道。

这有什么用?

你可能会问。可以通过与终端操作员“打破”管道来更改管道中间的行为。例如,以您的示例为例,如果我们在第一个映射之后立即应用 collect,则第一个映射将按顺序执行,然后并行将仅应用于后续运算符,但实际上,这现在是一个不同的管道,因为所有内容都是在列表的中间收集的。

java prettyprint-override">List.of(1, 2, 3, 4).stream()
    .map(integer -> {
        System.out.println("Before stream : " + Thread.currentThread().getName() + " : " + integer);
        return integer * 2;
    })
    .collect(Collectors.toList())
    .stream()
    .parallel()
    .map(integer -> {
        System.out.println("After parallel stream : " + Thread.currentThread().getName() + " : " + integer);
        return integer * 2;
    })
    .forEach(integer -> System.out.println("For Each : " + Thread.currentThread().getName() + " : " + integer));

这将输出如下内容:

Before stream : main : 1
Before stream : main : 2
Before stream : main : 3
Before stream : main : 4
After parallel stream : main : 6
After parallel stream : ForkJoinPool.commonPool-worker-23 : 2
After parallel stream : ForkJoinPool.commonPool-worker-5 : 4
After parallel stream : ForkJoinPool.commonPool-worker-19 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 16
For Each : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : main : 12

注意第一个< code>map是如何顺序执行的,而其余的操作符是并行执行的。

诸如RxJava之类的可观察流实现与观测运算符有不同的用法,但它们也是一种完全不同的做事方式。

 类似资料:
  • 我一直在尝试将更多的函数式编程融入到我所做的事情中,因为我编写的代码具有无副作用的性质,并且在并发代码中具有实用性。我遇到了需要过滤掉java流的连续元素的情况,并且没有比简单的旧命令方法更好的函数方法了。假设我有一个记录参数的程序,我想过滤掉两个连续的元素。例如,。我在日志中想要的是。 我提出了几种方法,但没有一种方法比使用for循环更容易理解,该循环索引了我需要过滤掉的内容。 这似乎是一件很平

  • 问题内容: 我想动态更新元素的文本: 我是jQuery的新手,因此对我来说,这项任务似乎非常具有挑战性。有人可以指出我要使用的功能/选择器吗? 如果可能的话,我想在不为需要更改的文本添加新容器的情况下执行此操作。 问题答案: 在Javascript中,该属性为您提供元素的所有子节点,包括文本节点。 因此,如果您知道要更改的文本始终是元素中的第一件事,那么请给出以下HTML: 您可以这样做: 当然,

  • 在里面https://ci.apache.org/projects/flink/flink-docs-release-1.8/concepts/programming-model.html#parallel-数据流,有一个描述 操作符子任务的数量是该特定操作符的并行度。流的并行性始终是其生成操作符的并行性。同一程序的不同操作符可能具有不同的并行级别。 我不明白什么是流的并行性总是它的生成操作符的并

  • 我在rxjava/rxscala中使用.防抖运算符来捕获一些发生在彼此给定时间段内的事件,但希望使给定时间段可控。时间段。防抖用途作为参数给出。 理想情况下,我希望能够通过考试。去抖动操作符使用的最新值确定去抖动使用的时间段的可观测值。差不多吧。去抖动(timePeriodController,TimeUnit.Seconds)。 我看到了。debounce可以选择DebounceSelector

  • 问题内容: 我想使用这项技术并更改svg颜色,但到目前为止我还没有做到这一点。我把它放在css中,但是无论如何我的图像总是黑色的。我的代码: 问题答案: 您不能以这种方式更改图像的颜色。如果将SVG加载为图像,则无法在浏览器中使用CSS或Javascript更改其显示方式。 如果你想改变你的SVG图像,你必须用它来装载,或使用在线。 如果要使用页面中的技术,则需要Modernizr库,您可以在其中

  • 有没有一个“最佳实践”来改变流中的元素?我特别指的是流管道内的元素,而不是流管道外的元素。 例如,考虑这样的情况:我希望获取用户列表,为null属性设置默认值并将其打印到控制台。 编辑:官方的流java文档声明了少量的流操作,如forEach()和peek(),只能通过副作用来操作;这些应该小心使用。“鉴于这将是一个不干涉的行动,有什么特别使它危险呢?我所看到的例子涉及到管道之外,这显然是粗略的。