当前位置: 首页 > 知识库问答 >
问题:

并行无限Java流耗尽内存

赵浩邈
2023-03-14

我试图理解为什么下面的Java程序给出了< code>OutOfMemoryError,而对应的程序却没有< code >。parallel()没有。

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

我有两个问题:

>

  • 该程序的预期输出是什么?

    不带<代码>。parallel()看起来这只是输出< code>sum(1 2 3...)这意味着它只是“停留”在平面图中的第一个流,这是有意义的。

    对于并行,我不知道是否有预期的行为,但我的猜测是它以某种方式交错了第一个n左右的流,其中n是并行工作人员的数量。根据分块/缓冲行为,它也可能略有不同。

    是什么导致它运行内存溢出?我特别想了解这些流是如何在底层实现的。

    我猜是有什么东西阻塞了流,所以它永远不会结束,并且能够消除生成的值,但我不太清楚计算的顺序以及缓冲发生的位置。

    编辑:如果相关的话,我使用的是Java11。

    Editt 2:显然,即使对于简单的程序< code>IntStream.iterate(1,i-

  • 共有3个答案

    郜光明
    2023-03-14

    OOME不是因为流是无限的,而是因为它不是无限的。

    也就是说,如果你注释掉.limit(...),它永远不会耗尽内存 - 但当然,它也永远不会结束。

    一旦分割,如果元素在每个线程中累积,则流只能跟踪元素的数量(看起来实际的累加器是<code>Spliterators$ArraySpliterator#array)。

    看起来您可以在没有平面图的情况下复制它,只需使用-Xmx128m运行以下命令:

        System.out.println(Stream
                .iterate(1, i -> i + 1)
                .parallel()
          //    .flatMap(n -> Stream.iterate(n, i -> i+n))
                .mapToInt(Integer::intValue)
                .limit(100_000_000)
                .sum()
        );
    

    但是,在注释掉限制()之后,它应该运行良好,直到您决定备用笔记本电脑为止。

    除了实际的实施细节之外,我认为正在发生的事情如下:

    使用limit求和缩减器希望前X个元素求和,因此没有线程可以发出部分和。每个“切片”(线程)都需要累积元素并将其传递。没有限制,没有这样的约束,因此每个“切片”将只计算它得到的元素的部分和(永远),假设它最终将发出结果。

    段渊
    2023-03-14

    我最好的猜测是,添加parallel()会改变flatMap()的内部行为,而flatMap()之前已经懒洋洋地评估了问题。

    [JDK-8202307]在获取java.lang.OutOfMemoryError:java堆空间时调用Stream.iterator()时报告了您得到的OutOfmemoryErro错误。在平面图中使用无限/非常大的流的流上使用next()。如果您查看票据,它或多或少与您获得的堆栈跟踪相同。票证已关闭,因为无法修复,原因如下:

    迭代器()spterator()方法是“逃逸舱口”,用于无法使用其他操作时使用。它们有一些限制,因为它们将流实现的推送模型转换为拉模型。在某些情况下,这种转换需要缓冲,例如当一个元素被(平面)映射到两个或多个元素时。这将使流实现变得非常复杂,可能是以牺牲常见情况为代价的,以支持背压的概念来传达要通过元素生产的嵌套层提取多少元素。

    冀冯浩
    2023-03-14

    你说“但是我不太清楚事情是按照什么顺序被评估的,缓冲发生在哪里”,这正是并行流的意义所在。评估顺序未指定。

    示例的一个关键方面是限制(100_000_000)。这意味着实现不能只求和任意值,而必须求和前100000000个数字。请注意,在参考实现中,.unordered()。limit(100_000_000)不会改变结果,这表明对于无序情况没有特殊的实现,但这是一个实现细节。

    现在,当工作线程处理元素时,它们不能只是将它们相加,因为它们必须知道允许它们使用哪些元素,这取决于有多少元素在它们的特定工作负载之前。因为这个流不知道大小,这只能在前缀元素被处理后才知道,这对于无限流是永远不会发生的。因此工作线程暂时保持缓冲,这些信息变得可用。

    原则上,当工作线程知道它处理最左边的工作块时,它可以立即对元素求和、计数,并在达到极限时发出结束信号。因此流可以终止,但这取决于很多因素。

    在您的情况下,一个看似合理的场景是其他工作线程分配缓冲区的速度比最左边的作业计数的速度快。在这种情况下,计时的细微变化可能会使流偶尔返回一个值。

    当我们降低所有工作线程(除了处理最左边块的线程)的速度时,我们可以让流终止(至少在大多数运行中):

    System.out.println(IntStream
        .iterate(1, i -> i+1)
        .parallel()
        .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
        .flatMap(n -> IntStream.iterate(n, i -> i+n))
        .limit(100_000_000)
        .sum()
    );
    

    我遵循Stuart Marks的建议,在谈论相遇顺序而不是处理顺序时,使用从左到右的顺序。

     类似资料:
    • 我有业务托管在nkl.com,不幸的是我不能使用作曲家在那里了。支持写了更多,我已经达到了1.5 GB的限制,更多是不可能的。 现在我的问题是,有没有办法让Composer一块一块地更新或安装它,这样就可以释放中间的内存,或者有没有其他解决方案可以在服务器上运行我的Laravel应用程序? 目前,Composer JSON中包含以下包。还有一些计划。

    • 问题内容: 为什么下面的代码不输出任何输出,而如果我们删除parallel,则输出0、1? 尽管我知道理想情况下应该将限制放在不同的位置,但是我的问题与添加并行处理导致的差异更多有关。 问题答案: 真正的原因是 有序并行 是完整的屏障操作,如文档中所述: 保持并行管道的稳定性是相对昂贵的(要求操作充当一个完整的屏障,并具有大量缓冲开销),并且通常不需要稳定性。 “完全屏障操作”是指必须先执行所有上

    • 我有一个相当复杂的过程,需要几个层次的嵌套for循环。 只针对一组特定的条件执行操作。换句话说:

    • 问题内容: 在我的应用中,我经常调用一个返回json字符串的外部api。 但是在某些情况下 PHP致命错误:内存中已耗尽xxx字节(尝试分配32字节)的内存大小… 我无法控制外部API,当然可以增加php的内存,但这有一些缺点。 1-无论我设置的大小,仍然可能太小。2-如果将内存大小设置为“ infinite”(无限),则可能会导致服务器被杀。 理想情况下,我想在调用json_decode(…)之

    • 我目前在Azure中托管了几十个网站,最近开始在每个web应用的门户刀片中看到“内存资源耗尽”警告: 我在两个S3标准(大型)应用程序服务计划中托管我的网站,我在所有网站上都会收到警告,无论它们在哪个应用程序服务计划上。 有趣的是,当查看任一应用服务计划的内存使用率时,我总是低于40%,内存使用率实际上相当一致。我从未看到峰值或任何接近85%内存使用率的东西。 我的问题是,我是否误解了警告消息?是

    • 我发现JVM只有一个线程池用于并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这导致了与不相关的并行流一起使用的不相关的或者快速的函数的活跃度问题。 stream上没有允许使用备用线程池的方法。 有没有一种简单的方法来避免这个问题,也许是以某种方式指定要使用哪个线程池?