当前位置: 首页 > 面试题库 >

内部更改限制和无序流

曾嘉福
2023-03-14
问题内容

基本上,这是在尝试回答另一个问题时出现的。假设此代码:

AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
        .parallel()
        .peek(x -> count.incrementAndGet())
        .limit(5)
        .forEach(System.out::println);

System.out.println("count = " + count);

我了解以下事实:这IntStream#generate无序的无限流
,要使其完成,必须进行短路操作(limit在这种情况下)。我也了解,在Supplier达到该限制之前,可以随意调用Stream实现多次。

在java-8下运行此命令,将count始终打印512(可能不总是打印,但在我的机器上是这样)。

相反,在java-10下运行this很少超过5。所以我的问题是内部发生了什么变化,从而使短路发生得更好了(我试图通过拥有信号源并尝试做一些区分来自己回答这个问题……)


问题答案:

更改发生在Java 9 beta 103和Java 9 beta
120(JDK‑8154387)之间。

负责的班级是StreamSpliterators.UnorderedSliceSpliterator.OfInt,分别。它的超一流StreamSpliterators.UnorderedSliceSpliterator

该类的旧版本看起来像

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
    }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of CHUNK_SIZE
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

如我们所见,它尝试CHUNK_SIZE = 1 << 7在每个分隔器中最多缓冲元素,最终可能以“ CPU核心数”×128个元素结束。

相反,新版本看起来像

abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
    static final int CHUNK_SIZE = 1 << 7;

    // The spliterator to slice
    protected final T_SPLITR s;
    protected final boolean unlimited;
    protected final int chunkSize;
    private final long skipThreshold;
    private final AtomicLong permits;

    UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
        this.s = s;
        this.unlimited = limit < 0;
        this.skipThreshold = limit >= 0 ? limit : 0;
        this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
            ((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
        this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
    }

    UnorderedSliceSpliterator(T_SPLITR s,
                              UnorderedSliceSpliterator<T, T_SPLITR> parent) {
        this.s = s;
        this.unlimited = parent.unlimited;
        this.permits = parent.permits;
        this.skipThreshold = parent.skipThreshold;
        this.chunkSize = parent.chunkSize;
    }

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Objects.requireNonNull(action);

            ArrayBuffer.OfRef<T> sb = null;
            PermitStatus permitStatus;
            while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
                if (permitStatus == PermitStatus.MAYBE_MORE) {
                    // Optimistically traverse elements up to a threshold of chunkSize
                    if (sb == null)
                        sb = new ArrayBuffer.OfRef<>(chunkSize);
                    else
                        sb.reset();
                    long permitsRequested = 0;
                    do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
                    if (permitsRequested == 0)
                        return;
                    sb.forEach(action, acquirePermits(permitsRequested));
                }
                else {
                    // Must be UNLIMITED; let 'er rip
                    s.forEachRemaining(action);
                    return;
                }
            }
        }

所以现在有一个实例字段chunkSize。当存在定义的限制并且表达式的((skip + limit) / AbstractTask.LEAF_TARGET) + 1计算结果小于时CHUNK_SIZE,将使用该较小的值。因此,当限制chunkSize较小时,该值会小得多。在您的限制为的情况下5,块大小将始终为1



 类似资料:
  • 我正在研究一个涉及for循环和if-else语句的java函数。我需要根据多次迭代中的条件更改标志变量的值。我声明了一个名为flag的变量,并希望根据每次迭代中的条件进行更改。我需要在每次迭代结束时打印flag变量的值。但是当我打印变量时,它显示了一个错误,变量没有初始化。如果我给它一个初始值,它会一直打印初始值,而不是在If-else语句中处理的值。我不能根据自己的要求初始化for循环中的fla

  • 然而看起来chrome headless消耗了太多的内存和cpu,有人知道我们如何限制chrome headless的cpu/内存使用吗?或者有什么变通办法。 提前道谢。

  • spark-defaults.conf中没有任何内容,以编程方式初始化spark上下文的代码是: 在所有这些之后,Spark UI的Environment选项卡的Spark.driver.maxResultSize为10G,Spark.driver.memory为20G,但是驱动程序的存储内存的executors选项卡显示为0.0B/4.3GB。 (请注意:我以前的Spark.Driver.Mem

  • 我需要在我的主机上运行composer update,因此我使用ssh登录,并尝试在/www文件夹中运行以下命令,我在该文件夹中安装了Laravel和composer: 我得到这个错误: 我正在与我的主机提供商联系,他们告诉我运行命令: 我运行了这个命令,但得到:“无法打开文件:composer” 怎么办?这里的解决方案是什么?

  • 我有一个具有以下模式的待售项目集合: 我继承了一个聚合查询,它返回匹配类别的项目,按商家分组,组按组中的最大评级排序: 在此之后,代码继续按评级对每组中的项目进行排序,并删除除每组中排名前2位之外的所有项目。 作为聚合函数的一部分,是否可以在组内执行此排序和限制,以便聚合只返回每组中评级最高的两个项目?

  • 问题内容: 我有一些Resque工作者使用Rails应用程序。看来我最多只能同时运行2个工人(应用程序在生产Apache的EC2上运行)。有什么办法可以提高这个限制? 编辑: 我在redis.conf中有maxclients 0 我可以通过rake来启动10个工作程序,但是当他们实际上正在排队时,我在浏览器中得到“ ERR最大到达客户端数”。 编辑:更新的错误(在原始文件中是正确的) 编辑:实际上