当前位置: 首页 > 知识库问答 >
问题:

Java,顺序流在哪个线程中执行?

詹正浩
2023-03-14

在阅读关于流的留档时,我遇到了以下句子:

>

  • 。。。试图从行为参数访问可变状态会给您带来错误的选择。。。如果您不同步对该状态的访问,您将面临数据竞争,因此您的代码将被破坏。。。[1]

    如果行为参数确实有副作用。。。[没有]保证在同一个线程中执行同一流管道中“相同”元素上的不同操作。[2]

    对于任何给定的元素,可以在库选择的任何时间和线程中执行该操作。[3]

    这些句子没有区分顺序流和平行流。因此,我的问题是:

    1. 顺序流的管道在哪个线程中执行?它总是调用线程还是一个实现可以自由选择任何线程?
    2. 如果流是顺序的,则在哪个线程中执行的每个终端操作的动作参数
    3. 当使用顺序流时,我必须使用任何同步吗?
    • [1 2]https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html
    • [3]https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#forEach-java.util.function.Consumer-
  • 共有2个答案

    宿楚青
    2023-03-14
    1. Stream的终端操作是阻塞操作。如果没有并行执行,执行终端操作的线程将运行管道中的所有操作

    定义1.1。管道是一对链式方法。

    定义1.2。中间操作将位于流中的任何位置,末端除外。它们返回一个流对象,不执行管道中的任何操作。

    定义1.3。终端操作将仅位于流的末尾。他们执行管道。它们不返回流对象,因此不能在它们之后添加其他中间操作或终端操作。

    Java8向我们介绍了Spliterator接口。它具有迭代的功能,但也有一组操作来帮助并行执行和拆分任务。

    在顺序执行中从原语流调用forEach时,调用线程将调用拆分器。Foreach剩余方法:

    @Override
    public void forEach(IntConsumer action) {
       if (!isParallel()) {
            adapt(sourceStageSpliterator()).forEachRemaining(action);
        }
        else {
            super.forEach(action);
        }
    }
    

    您可以在我的教程:第6部分-拆分器中阅读有关拆分器的更多信息

    诸如reduce之类的流操作使用累加器组合器函数来执行并行流。根据定义,streams库禁止变异。你应该避免。

    并发和并行编程中有很多定义。我将介绍一套最适合我们的定义。

    定义8.1.并发编程是使用其他同步算法解决任务的能力。

    定义8.2。并行编程是在不使用其他同步算法的情况下解决任务的能力。

    您可以在我的教程:第7部分-并行流中阅读更多关于它的内容。

    王俊哲
    2023-03-14

    这一切都归结为基于规范的保证,以及当前实现可能有超出保证的额外行为的事实。

    Java语言架构师Brian Goetz在一个相关问题中就规范提出了一个相关的观点:

    规范的存在是为了描述调用者可以依赖的最小保证,而不是描述实现的功能。

    [...]

    当规范规定“不保留属性X”时,并不意味着可能永远不会遵守属性X;这意味着实现没有义务保留它。[...] (HashSet不保证迭代其元素时保留插入顺序,但这并不意味着这不会意外发生——你不能指望它。)

    这一切都意味着,即使当前实现碰巧具有某些行为特征,也不应依赖它们,也不应假设它们不会在库的新版本中更改。

    在哪个线程中执行顺序流的管道?它始终是调用线程还是实现可以自由选择任何线程?

    当前流实现可能使用也可能不使用调用线程,并且可能使用一个或多个线程。由于这些都不是API指定的,因此不应依赖此行为。

    如果流是顺序的,则在哪个线程中执行的每个终端操作的操作参数?

    虽然当前实现使用现有线程,但这不能依赖,因为留档声明线程的选择取决于实现。事实上,不能保证元素不被不同的线程处理,尽管当前流实现也不是这样做的。

    根据API:

    对于任何给定的元素,该操作可以在库选择的任何时间和任何线程中执行。

    请注意,虽然API在讨论遭遇顺序时特别调用了并行流,但Brian Goetz对此进行了澄清,以澄清行为的动机,而不是任何行为都特定于并行流:

    在这里明确提出平行案例的目的是教学[…]。然而,对于一个不知道并行性的读者来说,几乎不可能不假设forEach会保持遭遇顺序,因此添加这句话是为了帮助澄清动机。

    当使用顺序流时,我必须使用任何同步吗?

    当前的实现可能会工作,因为它们使用单个线程来执行顺序流的foreach方法。但是,由于它不受流规范的保证,因此不应该依赖它。因此,同步应该像多个线程可以调用方法一样使用。

    也就是说,流文档特别建议不要使用需要同步的副作用,并建议使用还原操作而不是可变累加器:

    许多可能尝试使用副作用的计算可以更安全、更有效地表达,而不会产生副作用,例如使用约化而不是可变累加器。[...] 少量流操作,如forEach()和peek(),只能通过副作用进行操作;使用时应小心。

    下面的代码将搜索字符串流,寻找与给定正则表达式匹配的字符串,并将匹配的字符串放在列表中。

         ArrayList<String> results = new ArrayList<>();
         stream.filter(s -> pattern.matcher(s).matches())
               .forEach(s -> results.add(s));  // Unnecessary use of side-effects!
    

    这段代码不必要地使用了副作用。如果并行执行,ArrayList的非线程安全性将导致不正确的结果,添加所需的同步将导致争用,破坏并行性的好处。此外,在这里使用副作用是完全没有必要的;forEach()可以简单地替换为更安全、更高效、更易于并行化的简化操作:

         List<String>results =
             stream.filter(s -> pattern.matcher(s).matches())
                   .collect(Collectors.toList());  // No side-effects!
    
     类似资料:
    • 问题内容: 我尝试从一本书(Paul Hyde,Java Thread Programming)中运行示例。它说线程的顺序将互换。但是我总是得到:之后打印10个“主线程”,然后打印10个“新线程”。更有趣的是:如果我将使用tt.run而不是tt.start,那么结果将相反。也许这本书太老了,示例基于JDK 1.2的原因???代码如下: 问题答案: JVM决定何时将控制权从主线程转移到第二个线程。由

    • 问题内容: 我正在使用rub redis宝石。想知道我是否例如: 这样的执行顺序得到保证吗? 问题答案: 当然可以保证顺序,否则流水线将毫无用处。您可以随时查看代码。例如,此测试明确假定命令是按顺序执行的:https : //github.com/redis/redis- rb/blob/master/test/pipelining_commands_test.rb#L32

    • 问题内容: 我对CompletableFuture方法有疑问: 事情是JavaDoc这么说的: 返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的结果作为所提供函数的参数来执行该阶段。有关涵盖异常完成的规则​​,请参见CompletionStage文档。 那线程呢?这将在哪个线程中执行?如果将来由线程池完成怎么办? 问题答案: 文档中指定的策略可以帮助您更好地理解: 对

    • 我有一个关于CompletableFuture方法的问题: 问题是JavaDoc只说了这么一句话: 返回一个新的CompletionStage,当此阶段正常完成时,将以此阶段的结果作为所提供函数的参数执行该CompletionStage。有关例外完成的规则,请参阅CompletionStage文档。 穿线呢?这将在哪个线程中执行?如果未来是由一个线程池来完成的呢?

    • 因为在过滤2之后,我们还得再找到一个元素来分层极限(2),操作,那么为什么输出不像我解释的那样呢?

    • 问题内容: 您将如何依次执行三个线程?例如。线程1,线程2,线程3。不可能将一个线程的引用传递给另一个线程并从run()方法调用。 因此代码应如下所示: 并应该把 这可以通过使用ThreadPoolExecutor并使用阻塞队列来实现,但即使那样也不是可以接受的答案。 问题答案: 在java.util.concurrent包中使用ExecutorService。更精确地使用