注意:我已经在另一篇文章中解决了这个问题,所以在嵌套的Java8并行流操作中使用信号量可能会造成死锁。这是窃听器吗?-,但这篇文章的标题暗示问题与一个信号量的使用有关--这多少分散了讨论的注意力。我创建这个例子是为了强调嵌套循环可能存在性能问题--尽管这两个问题可能有一个共同的原因(也许是因为我花了很多时间来解决这个问题)。(我不认为它是一个重复,因为它强调了另一个症状--但如果你真的这么做了,就把它删掉吧)。
问题:如果嵌套两个Java8stream.parallel().foreach循环,并且所有任务都是独立的、无状态的等等--除了提交到公共的FJ池之外--那么在并行循环中嵌套并行循环的性能要比在并行循环中嵌套顺序循环的性能差得多。更糟糕的是:如果包含内部循环的操作是同步的,您将会得到死锁。
绩效问题的论证
没有“synchronized”,您仍然可以观察到性能问题。您可以在http://svn.finmath.net/finmath%20experiments/trunk/src/net/finmath/experiments/concurrency/nestedParallelForeachTest.java找到一个演示代码(更详细的描述请参阅此处的JavaDoc)。
我们在这里的设置如下:我们有一个嵌套的stream.parallel().foreach()。
现在:将24个外循环任务提交到并行度为8的池中,我们最多期望24/8*11=33秒(在8核或更好的机器上)。
问题:你能证实这一行为吗?这是一个可以从框架中得到的东西吗?(我现在更小心了一点,声称这是一个bug,但我个人认为这是由于ForkJointAsk实现中的一个bug造成的。备注:我已经将此发布到了concurrency-interest(参见http://cs.oswego.edu/pipermail/concurrency-interest/2014-may/012652.html),但到目前为止我还没有从那里得到证实)。
僵局演示
以下代码将死锁
// Outer loop
IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
doWork();
synchronized(this) {
// Inner loop
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
doWork();
});
}
});
其中NumberOfTaskSinouterLoop=24
,NumberOfTaskSininnerLoop=240
,OuterLoopOverHeadFactor=10000
和DoWork
是某个无状态CPU刻录机。
您可以在http://svn.finmath.net/finMath%20experiments/trunk/src/net/finMath/experiments/concurrency/nestedParallelForeachandsynchronization.java找到完整的演示代码(更详细的描述请参阅此处的JavaDoc)。
这种行为在意料之中吗?请注意,关于Java并行流的文档没有提到嵌套或同步的任何问题。另外,没有提到两者都使用一个公共的fork-join-pool。
更新
看起来这个问题和信号量更严重的死锁在Java8 U40中已经解决了。
问题是,您配置的相当有限的并行性被外部流处理吃掉了:如果您说您需要8个线程,并使用parallel()
处理一个超过8个项目的流,它将创建8个工作线程,并让它们处理项目。
然后,在您的使用者中,您正在使用parallel()
处理另一个流,但是没有工作线程了。由于工作线程在等待内部流处理结束时被阻塞,因此forkjoinpool
必须创建新的工作线程,这违反了您配置的并行性。在我看来,它并没有回收这些扩展线程,而是让它们在处理完后立即死亡。因此,在您的内部处理中,创建并释放新线程,这是一个昂贵的操作。
您可能会将其视为一个缺陷,即启动线程不对并行流处理的计算做出贡献,而只是等待结果,但即使修复了该问题,您仍然有一个很难修复的一般性问题:
每当工作线程数量与外部流项之间的比率较低时,实现就会将它们全部用于外部流,因为它不知道该流是外部流。因此并行执行内部流请求的工作线程比可用线程多。使用调用方线程对计算做出贡献可以使计算的性能与串行计算相等,但在这里获得并行执行的优势与固定数量的工作线程的概念不能很好地工作。
请注意,您在这里只是触及了问题的表面,因为您对项目的处理时间相当平衡。如果两者的处理,内部项目和外部项目,发散(与同一级别的项目相比),问题将会更糟。
更新:通过分析和查看代码,forkjoinpool
似乎尝试使用等待线程进行“工作窃取”,但根据thread
是工作线程还是其他线程使用不同的代码。因此,一个工作线程实际上是等待大约80%的时间,并做很少或没有工作,而其他线程真正贡献了计算…
更新2:为了完整起见,这里使用注释中描述的简单并行执行方法。由于它将每个项目都排入队列,当单个项目的执行时间相当小时,它将会有很大的开销。所以这不是一个复杂的解决方案,而是一个演示,它可以处理长时间运行的任务,而不需要太多的魔法…
import java.lang.reflect.UndeclaredThrowableException;
import java.util.concurrent.*;
import java.util.function.IntConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class NestedParallelForEachTest1 {
static final boolean isInnerStreamParallel = true;
// Setup: Inner loop task 0.01 sec in worse case. Outer loop task: 10 sec + inner loop. This setup: (100 * 0.01 sec + 10 sec) * 24/8 = 33 sec.
static final int numberOfTasksInOuterLoop = 24; // In real applications this can be a large number (e.g. > 1000).
static final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
static final int concurrentExecutionsLimitForStreams = 8;
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println(System.getProperty("java.version")+" "+System.getProperty("java.home"));
new NestedParallelForEachTest1().testNestedLoops();
E.shutdown();
}
final static ThreadPoolExecutor E = new ThreadPoolExecutor(
concurrentExecutionsLimitForStreams, concurrentExecutionsLimitForStreams,
2, TimeUnit.MINUTES, new SynchronousQueue<>(), (r,e)->r.run() );
public static void parallelForEach(IntStream s, IntConsumer c) {
s.mapToObj(i->E.submit(()->c.accept(i))).collect(Collectors.toList())
.forEach(NestedParallelForEachTest1::waitOrHelp);
}
static void waitOrHelp(Future f) {
while(!f.isDone()) {
Runnable r=E.getQueue().poll();
if(r!=null) r.run();
}
try { f.get(); }
catch(InterruptedException ex) { throw new RuntimeException(ex); }
catch(ExecutionException eex) {
Throwable t=eex.getCause();
if(t instanceof RuntimeException) throw (RuntimeException)t;
if(t instanceof Error) throw (Error)t;
throw new UndeclaredThrowableException(t);
}
}
public void testNestedLoops(NestedParallelForEachTest1 this) {
long start = System.nanoTime();
// Outer loop
parallelForEach(IntStream.range(0,numberOfTasksInOuterLoop), i -> {
if(i < 10) sleep(10 * 1000);
if(isInnerStreamParallel) {
// Inner loop as parallel: worst case (sequential) it takes 10 * numberOfTasksInInnerLoop millis
parallelForEach(IntStream.range(0,numberOfTasksInInnerLoop), j -> sleep(10));
}
else {
// Inner loop as sequential
IntStream.range(0,numberOfTasksInInnerLoop).sequential().forEach(j -> sleep(10));
}
if(i >= 10) sleep(10 * 1000);
});
long end = System.nanoTime();
System.out.println("Done in "+TimeUnit.NANOSECONDS.toSeconds(end-start)+" sec.");
}
static void sleep(int milli) {
try {
Thread.sleep(milli);
} catch (InterruptedException ex) {
throw new AssertionError(ex);
}
}
}
这是我的代码。我遇到的问题是,我希望将HP在我的PHP代码中的数字转换为我的HP HTML代码,以及与Cylinder相同的内容。我已经想好了其他的东西,但说到这一部分我就卡住了
我有一个这样的数组 我想做的是前面的模型,为其数量绘制徽标,因此三星=3,索尼=7,以此类推,将绘制3个索尼徽标和7个三星徽标。 我想出了这样的办法 但是当然,所有这些都是为了每个数组条目,呼应出名称,所以我最终打印了5个三星,打印了5个索尼,等等。 如何使其使用 qty 数组的值而不是条目数?
我的代码有一个问题,它有两个嵌套的foreach循环: 所以我在“消息”中有2个条目,在“评论”中有4个条目,第一个条目为“消息”,第二个条目为2。我想要: 消息1-评论1-评论2 消息2-注释3-注释4 这就是我所拥有的: 消息1-评论1-评论2 消息2-注释1-注释2-注释3-注释4 我搜索了两个小时,在mysqli中没有找到解决方案:((当我找到时 mysqli_data_seek($com
用python编写了这个函数,可以转换矩阵: 在这个过程中,我意识到我并不完全理解单行嵌套for循环是如何执行的。请回答以下问题,帮助我理解: 这个for循环的执行顺序是什么 鉴于 对象必须是什么类型才能将其用于循环结构? i和j分配给对象元素的顺序是什么? 可以用不同的for循环结构来模拟吗? 这个for循环可以嵌套在一个类似或不同的for循环结构中吗?它看起来怎么样? 如需更多信息,我们也将不
我想在一个并行外部循环中运行一个包含for循环(应该并行运行)的函数。因此看起来如下所示: 给定上面的代码,我希望在函数中为循环创建5个并行线程,并且希望这5个线程中的每个线程创建另一个线程来运行自己的并行for循环。
我正在寻找一种用函数式编程方法替换嵌套foreach循环的方法。情况是这样的: 目前我的代码是这样的: 这将生成如下所示的输出: 有谁知道如何用函数式编程替代方案替换foreach代码块?