当前位置: 首页 > 知识库问答 >
问题:

这是文件中的错误吗。或者我对平行流有什么误解?

缪远
2023-03-14

环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25

我尝试使用并行流的Files.lines(),但我想. skip()的第一行(它是一个CSV文件的头)。因此我试着这样做:

try (
    final Stream<String> stream = Files.lines(thePath, StandardCharsets.UTF_8)
        .skip(1L).parallel();
) {
    // etc
}

但有一列无法解析为int。。。

所以我尝试了一些简单的代码。这个问题非常简单:

$ cat info.csv 
startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes
1422758875023;34;54;151;4375;4375;27486
$

代码同样简单:

public static void main(final String... args)
{
    final Path path = Paths.get("/home/fge/tmp/dd/info.csv");
    Files.lines(path, StandardCharsets.UTF_8).skip(1L).parallel()
        .forEach(System.out::println);
}

我系统地得到了以下结果(好的,我只运行了大约20次):

startDate;treeDepth;nrMatchers;nrLines;nrChars;nrCodePoints;nrNodes

我错过了什么?

编辑似乎问题或误解的根源远不止于此(下面的两个例子是FreeNode的##java上的一位同事捏造的):

public static void main(final String... args)
{
    new BufferedReader(new StringReader("Hello\nWorld")).lines()
        .skip(1L).parallel()
        .forEach(System.out::println);

    final Iterator<String> iter
        = Arrays.asList("Hello", "World").iterator();
    final Spliterator<String> spliterator
        = Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED);
    final Stream<String> s
        = StreamSupport.stream(spliterator, true);

    s.skip(1L).forEach(System.out::println);
}

这张照片是:

Hello
Hello

UH

@霍尔格建议,对于任何有序且不大小的流,都会发生这种情况

Stream.of("Hello", "World")
    .filter(x -> true)
    .parallel()
    .skip(1L)
    .forEach(System.out::println);

此外,它源于已经发生的所有讨论,即问题(如果是一个问题?)正在使用。forEach()(正如@SotiriosDelimanolis首先指出的)。

共有3个答案

刘狐若
2023-03-14

问题是,您将并行流与forEach一起使用,并且您希望跳过操作依赖于正确的元素顺序,而这里的情况并非如此。forEach文档节选:

对于并行流管道,此操作不能保证尊重流的相遇顺序,因为这样做会牺牲并行性的好处。

我猜基本上是跳过操作首先在第二行执行,而不是第一行。如果将流设置为顺序或使用forEachOrdered,则可以看到它会产生预期的结果。另一种方法是使用收集器。

长孙知
2023-03-14

这个答案已经过时了——请阅读这个答案!

快速回答问题:观察到的行为是有意的!没有bug,所有这些都是根据文档进行的。但可以说,这种行为应该得到更好的记录和沟通。应该更清楚地看到,forEach是如何忽略排序的。

我将首先介绍允许观察到的行为的概念。这为剖析问题中给出的一个例子提供了背景。我会在高水平上做这件事,然后再在非常低的水平上做。

[TL;DR:单独阅读,高层解释会给出一个粗略的答案。]

与其讨论Streams,也就是与流相关的方法所操作或返回的类型,不如讨论流操作和流管道。方法调用lineskipparallel是流操作,它们构建了一个流管道[1],正如其他人所指出的,当调用终端操作forEach时,管道作为一个整体进行处理[2]。

管道可以被认为是一系列操作,这些操作一个接一个地在整个流上执行(例如过滤所有元素,将剩余元素映射到数字,求和所有数字)。但这是误导!一个更好的比喻是,终端操作通过每个操作[3]拉动单个元素(例如,获取下一个未过滤的元素,映射它,将其添加到总和,请求下一个元素)。一些中间操作可能需要遍历几个(例如skip)或甚至所有(例如排序)元素,然后才能返回请求的下一个元素,这是操作中状态的来源之一。

每个操作都通过以下StreamOpFlag表示其特性:

  • DISTINCT
  • 排序
  • 订购
  • 大小
  • 短路

它们在流源、中间操作和终端操作之间结合,构成管道的特征(作为一个整体),然后用于优化[4]。类似地,管道是否并行执行是整个管道的属性[5]。

因此,每当您对这些特性做出假设时,您都必须仔细查看构建管道的所有操作,无论它们的应用顺序如何,以及它们做出了什么保证。这样做时,请记住终端操作是如何通过管道拉动每个单独的元件的。

让我们看看这个特殊情况:

BufferedReader fooBarReader = new BufferedReader(new StringReader("Foo\nBar"));
fooBarReader.lines()
        .skip(1L)
        .parallel()
        .forEach(System.out::println);

无论您的流源是否已排序(它是),通过调用forEach(而不是forEachOrdered)您可以声明顺序对您来说并不重要[6],这有效地减少了skip从“跳过前n个元素”到“跳过任意n个元素”[7](因为没有顺序,前者变得毫无意义)。

因此,如果订单能带来提速,你就赋予管道忽略订单的权利。对于并行执行,它显然是这么认为的,这就是为什么会得到观察到的输出。因此,你观察到的是预期的行为,没有错误。

请注意,这与有状态的skip不冲突!如上所述,有状态并不意味着它以某种方式缓存整个流(减去跳过的元素),随后的所有内容都在这些元素上执行。它只是意味着操作有一些状态——即跳过的元素数量(嗯,实际上没那么容易,但我对正在发生的事情了解有限,我会说这是一个公平的简化)。

让我们更详细地看一下:

  • BufferedReader。行创建,我们称之为\u行
  • 调用参考ipeline.skip
  • 它构造了一个切片操作(跳过的泛化

让我们看看管道是如何执行的:

  • 呼叫\u跳过。forEach创建一个ForEachOp(我们称之为\u forEach)并将其交给\u skip。评估,它可以做两件事:
    1. 调用sourceSpliterator围绕此管道阶段的源创建拆分器:
      • 调用本身的OpeValueParallellazy(事实证明)
      • 这将确定流是无序的,并创建一个无序切片拆分器(我们称之为\u切片拆分器),其中跳过=1,没有限制
      1. 它将一大块(在本例中是全部)行放入缓冲区,并对它们进行计数

      所以无序切片拆分器。OfRef。forEachRemaining是最终真正忽略订单的地方。我没有将其与订购的变体进行比较,但以下是我的假设:

      • 在并行化下,将拆分器的元素铲入缓冲区可能会与执行相同操作的其他任务交错
      • 这会让追踪他们的订单变得异常困难
      • 这样做或防止交织会降低性能,如果顺序不相关,则毫无意义
      • 如果顺序丢失,除了处理前n个允许的元素之外,没有什么可做的了

      有问题吗?;)抱歉这么久了。也许我应该省去细节,写一篇博文。。。。

      [1] java。util。流——流操作和管道:

      河流作业分为中间作业和终端作业,并结合起来形成河流管道。

      [2] java。util。流——流操作和管道:

      在执行管道的终端操作之前,管道源的遍历不会开始。

      [3]这个比喻代表了我对溪流的理解。除了代码之外,主要的来源是引用java.util.stream-流操作和管道(突出显示我的):

      惰性地处理流可以显著提高效率;在上面的filter-map-sum示例这样的管道中,过滤、映射和求和可以融合到数据的单次传递中,中间状态最少。懒惰还可以避免在不必要时检查所有数据;对于“查找长度超过1000个字符的第一个字符串”之类的操作,只需检查足够多的字符串,以找到一个具有所需特征的字符串,而无需检查源中可用的所有字符串。

      [4] java。util。流动StreamOpFlag

      在管道的每个阶段,都可以计算组合的流和操作标志[…关于如何跨源、中间和终端操作组合标志的jadda、jadda、jadda…]生成从管道输出的标志。然后可以使用这些标志应用优化。

      在代码中,您可以在AbstractPipeline中看到这一点。combinedFlags,通过组合上一个操作和新操作的标志,在构造期间(以及在其他一些情况下)设置。

      [5] java。util。stream-并行(我无法直接链接到它-向下滚动一点):

      当终端操作启动时,流管道将根据调用它的流的方向顺序或并行执行。

      在代码中,您可以在AbstractPipeline中看到这一点。sequentialparallelisParallel,它们在流源上设置/检查布尔标志,使其在构造流时调用setter时不相关。

      [6] 爪哇。util。流动流动forEach:

      对此流的每个元素执行操作。[...]此操作的行为显式地是非确定性的。

      将其与java进行对比。util。流动流动预告:

      如果流具有定义的遭遇顺序,则按照流的遭遇顺序对此流的每个元素执行操作。

      [7] 这也没有明确的文档记录,但我在流中对这条评论的解释。跳过(被我大大缩短):

      因为skip(n)不仅要跳过任意n个元素,还要跳过遇到顺序中的前n个元素,所以在有序的并行管道上,skip()[]可能非常昂贵。[...][R]移动排序约束[...]可能会导致并行管道中跳过()的显著加速

谷梁宏恺
2023-03-14

由于问题的当前状态与前面的陈述完全相反,应该注意的是,Brian Goetz现在有一个明确的陈述,关于无序特征的反向传播,一个跳过操作被认为是一个错误。还指出,现在认为它根本没有终端操作有序性的反向传播。

还有一个相关的错误报告,JDK-8129120,其状态为“在Java中修复”

我用jdk1做了一些测试。8.0_60而且现在的实现似乎确实展示了更直观的行为。

 类似资料:
  • 问题内容: 环境:Ubuntu x86_64(14.10),Oracle JDK 1.8u25 我尝试使用的并行流,但我想要第一行(这是带有标头的CSV文件)。因此,我尝试这样做: 但是随后一列无法解析为一个整数… 所以我尝试了一些简单的代码。文件问题很简单: 代码也同样简单: 我 系统地 得到了以下结果(好的,我只运行了大约20次): 我在这里想念什么? 编辑 似乎问题或误解根源远不止于此(以下

  • 我试图解决这个问题:第三个最大数量 但我犯了这个错误 第4行:Char 37:运行时错误:有符号整数溢出:-9223372036854775808-10不能在类型“long long”(solution.cpp)摘要中表示:UndefinedBehaviorSanitizer:undefined behavior prog_joined。cpp:13:37 这是我的代码 有人能告诉我这个错误到底意

  • 问题内容: 我想在我的一个golang控制器中指定一个html模板。我的目录结构是这样的 我想为请求/ new加载first.html。我已经将NewHandler用于url / new,并且当/ new请求到达并且在controller.go中时,NewHandler函数正在执行。这是我的代码 但是我遇到一个错误 请帮助我删除此错误。提前致谢 问题答案: 我已经通过提供html的绝对路径解决了这

  • 问题内容: 我正在使用Ubuntu 14.04。我有以下代码: 但我不断收到以下错误: python程序和图像都在同一位置。可能是什么问题? 问题答案: 您需要安装:

  • **Jun29, 2021 9:45:18PMorg.hibernate.版本logVersion INFO: HHH000412: HiberNate Core{5.2.8。最终}Jun29, 2021 9:45:19PMorg.hibernate.cfg.环境INFO: HHH000206:hibernate.properties未找到Jun29, 2021 9:45:19PMorg.hibe

  • 我正试图在文件系统上创建一个文件,但仍然得到这个异常: 即使拍完照片后得到了链接Camare: :/storage/emulated/0/sample-take-image/image162064003789420210510_160517.jpg 所以有人能告诉我为什么我不能创建文件在这种情况下。我目前在Android11上 **CompilesDK版本30