基本上,这是在尝试回答另一个问题时出现的。假设此代码:
AtomicInteger i = new AtomicInteger(0);
AtomicInteger count = new AtomicInteger(0);
IntStream.generate(() -> i.incrementAndGet())
.parallel()
.peek(x -> count.incrementAndGet())
.limit(5)
.forEach(System.out::println);
System.out.println("count = " + count);
我了解以下事实:这IntStream#generate
是 无序的无限流
,要使其完成,必须进行短路操作(limit
在这种情况下)。我也了解,在Supplier
达到该限制之前,可以随意调用Stream实现多次。
在java-8下运行此命令,将count
始终打印512
(可能不总是打印,但在我的机器上是这样)。
相反,在java-10下运行this很少超过5
。所以我的问题是内部发生了什么变化,从而使短路发生得更好了(我试图通过拥有信号源并尝试做一些区分来自己回答这个问题……)
更改发生在Java 9 beta 103和Java 9 beta
120(JDK‑8154387)之间。
负责的班级是StreamSpliterators.UnorderedSliceSpliterator.OfInt
,分别。它的超一流StreamSpliterators.UnorderedSliceSpliterator
。
该类的旧版本看起来像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of CHUNK_SIZE
if (sb == null)
sb = new ArrayBuffer.OfRef<>(CHUNK_SIZE);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < CHUNK_SIZE);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
如我们所见,它尝试CHUNK_SIZE = 1 << 7
在每个分隔器中最多缓冲元素,最终可能以“ CPU核心数”×128个元素结束。
相反,新版本看起来像
abstract static class UnorderedSliceSpliterator<T, T_SPLITR extends Spliterator<T>> {
static final int CHUNK_SIZE = 1 << 7;
// The spliterator to slice
protected final T_SPLITR s;
protected final boolean unlimited;
protected final int chunkSize;
private final long skipThreshold;
private final AtomicLong permits;
UnorderedSliceSpliterator(T_SPLITR s, long skip, long limit) {
this.s = s;
this.unlimited = limit < 0;
this.skipThreshold = limit >= 0 ? limit : 0;
this.chunkSize = limit >= 0 ? (int)Math.min(CHUNK_SIZE,
((skip + limit) / AbstractTask.LEAF_TARGET) + 1) : CHUNK_SIZE;
this.permits = new AtomicLong(limit >= 0 ? skip + limit : skip);
}
UnorderedSliceSpliterator(T_SPLITR s,
UnorderedSliceSpliterator<T, T_SPLITR> parent) {
this.s = s;
this.unlimited = parent.unlimited;
this.permits = parent.permits;
this.skipThreshold = parent.skipThreshold;
this.chunkSize = parent.chunkSize;
}
…
@Override
public void forEachRemaining(Consumer<? super T> action) {
Objects.requireNonNull(action);
ArrayBuffer.OfRef<T> sb = null;
PermitStatus permitStatus;
while ((permitStatus = permitStatus()) != PermitStatus.NO_MORE) {
if (permitStatus == PermitStatus.MAYBE_MORE) {
// Optimistically traverse elements up to a threshold of chunkSize
if (sb == null)
sb = new ArrayBuffer.OfRef<>(chunkSize);
else
sb.reset();
long permitsRequested = 0;
do { } while (s.tryAdvance(sb) && ++permitsRequested < chunkSize);
if (permitsRequested == 0)
return;
sb.forEach(action, acquirePermits(permitsRequested));
}
else {
// Must be UNLIMITED; let 'er rip
s.forEachRemaining(action);
return;
}
}
}
所以现在有一个实例字段chunkSize
。当存在定义的限制并且表达式的((skip + limit) / AbstractTask.LEAF_TARGET) + 1
计算结果小于时CHUNK_SIZE
,将使用该较小的值。因此,当限制chunkSize
较小时,该值会小得多。在您的限制为的情况下5
,块大小将始终为1
。
我正在研究一个涉及for循环和if-else语句的java函数。我需要根据多次迭代中的条件更改标志变量的值。我声明了一个名为flag的变量,并希望根据每次迭代中的条件进行更改。我需要在每次迭代结束时打印flag变量的值。但是当我打印变量时,它显示了一个错误,变量没有初始化。如果我给它一个初始值,它会一直打印初始值,而不是在If-else语句中处理的值。我不能根据自己的要求初始化for循环中的fla
然而看起来chrome headless消耗了太多的内存和cpu,有人知道我们如何限制chrome headless的cpu/内存使用吗?或者有什么变通办法。 提前道谢。
spark-defaults.conf中没有任何内容,以编程方式初始化spark上下文的代码是: 在所有这些之后,Spark UI的Environment选项卡的Spark.driver.maxResultSize为10G,Spark.driver.memory为20G,但是驱动程序的存储内存的executors选项卡显示为0.0B/4.3GB。 (请注意:我以前的Spark.Driver.Mem
我需要在我的主机上运行composer update,因此我使用ssh登录,并尝试在/www文件夹中运行以下命令,我在该文件夹中安装了Laravel和composer: 我得到这个错误: 我正在与我的主机提供商联系,他们告诉我运行命令: 我运行了这个命令,但得到:“无法打开文件:composer” 怎么办?这里的解决方案是什么?
我有一个具有以下模式的待售项目集合: 我继承了一个聚合查询,它返回匹配类别的项目,按商家分组,组按组中的最大评级排序: 在此之后,代码继续按评级对每组中的项目进行排序,并删除除每组中排名前2位之外的所有项目。 作为聚合函数的一部分,是否可以在组内执行此排序和限制,以便聚合只返回每组中评级最高的两个项目?
问题内容: 我有一些Resque工作者使用Rails应用程序。看来我最多只能同时运行2个工人(应用程序在生产Apache的EC2上运行)。有什么办法可以提高这个限制? 编辑: 我在redis.conf中有maxclients 0 我可以通过rake来启动10个工作程序,但是当他们实际上正在排队时,我在浏览器中得到“ ERR最大到达客户端数”。 编辑:更新的错误(在原始文件中是正确的) 编辑:实际上