考虑以下情况:我们正在使用一个Java8并行流来执行一个并行的forEach循环,例如,
IntStream.range(0,20).parallel().forEach(i -> { /* work done here */})
现在假设我们想要限制一个特定工作的并行执行的数量--例如,因为该部分是内存密集型的,而内存约束意味着并行执行的限制。
限制并行执行的一个明显而优雅的方法是使用信号量(这里建议),例如,下面的pice代码将并行执行的数量限制为5:
final Semaphore concurrentExecutions = new Semaphore(5);
IntStream.range(0,20).parallel().forEach(i -> {
concurrentExecutions.acquireUninterruptibly();
try {
/* WORK DONE HERE */
}
finally {
concurrentExecutions.release();
}
});
这个很好用!
还要注意,如果存在内部并行foreach的话,它可能不是透明的。
问题:这个行为是否符合Java8规范(在这种情况下,它意味着禁止在并行流工作者内部使用信号量),还是一个bug?
为了方便起见:下面是一个完整的测试用例。这两个布尔函数的任何组合都起作用,但“true,true”除外,“true”会导致死锁。
澄清:为了明确这一点,让我强调一个方面:死锁不会发生在信号量的acquire
处。请注意,代码由以下内容组成
死锁发生在2。如果那段代码正在使用另一个并行流。则在另一个流中发生死锁。因此,似乎不允许同时使用嵌套的并行流和阻塞操作(如信号量)!
请注意,有文档说明并行流使用ForkJoinPool并且ForkJoinPool和信号量属于同一个包java.util.concurrent
(因此希望它们能够良好地互操作)。
/*
* (c) Copyright Christian P. Fries, Germany. All rights reserved. Contact: email@christian-fries.de.
*
* Created on 03.05.2014
*/
package net.finmath.experiments.concurrency;
import java.util.concurrent.Semaphore;
import java.util.stream.IntStream;
/**
* This is a test of Java 8 parallel streams.
*
* The idea behind this code is that the Semaphore concurrentExecutions
* should limit the parallel executions of the outer forEach (which is an
* <code>IntStream.range(0,numberOfTasks).parallel().forEach</code> (for example:
* the parallel executions of the outer forEach should be limited due to a
* memory constrain).
*
* Inside the execution block of the outer forEach we use another parallel stream
* to create an inner forEach. The number of concurrent
* executions of the inner forEach is not limited by us (it is however limited by a
* system property "java.util.concurrent.ForkJoinPool.common.parallelism").
*
* Problem: If the semaphore is used AND the inner forEach is active, then
* the execution will be DEADLOCKED.
*
* Note: A practical application is the implementation of the parallel
* LevenbergMarquardt optimizer in
* {@link http://finmath.net/java/finmath-lib/apidocs/net/finmath/optimizer/LevenbergMarquardt.html}
* In one application the number of tasks in the outer and inner loop is very large (>1000)
* and due to memory limitation the outer loop should be limited to a small (5) number
* of concurrent executions.
*
* @author Christian Fries
*/
public class ForkJoinPoolTest {
public static void main(String[] args) {
// Any combination of the booleans works, except (true,true)
final boolean isUseSemaphore = true;
final boolean isUseInnerStream = true;
final int numberOfTasksInOuterLoop = 20; // In real applications this can be a large number (e.g. > 1000).
final int numberOfTasksInInnerLoop = 100; // In real applications this can be a large number (e.g. > 1000).
final int concurrentExecusionsLimitInOuterLoop = 5;
final int concurrentExecutionsLimitForStreams = 10;
final Semaphore concurrentExecutions = new Semaphore(concurrentExecusionsLimitInOuterLoop);
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism",Integer.toString(concurrentExecutionsLimitForStreams));
System.out.println("java.util.concurrent.ForkJoinPool.common.parallelism = " + System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
IntStream.range(0,numberOfTasksInOuterLoop).parallel().forEach(i -> {
if(isUseSemaphore) {
concurrentExecutions.acquireUninterruptibly();
}
try {
System.out.println(i + "\t" + concurrentExecutions.availablePermits() + "\t" + Thread.currentThread());
if(isUseInnerStream) {
runCodeWhichUsesParallelStream(numberOfTasksInInnerLoop);
}
else {
try {
Thread.sleep(10*numberOfTasksInInnerLoop);
} catch (Exception e) {
}
}
}
finally {
if(isUseSemaphore) {
concurrentExecutions.release();
}
}
});
System.out.println("D O N E");
}
/**
* Runs code in a parallel forEach using streams.
*
* @param numberOfTasksInInnerLoop Number of tasks to execute.
*/
private static void runCodeWhichUsesParallelStream(int numberOfTasksInInnerLoop) {
IntStream.range(0,numberOfTasksInInnerLoop).parallel().forEach(j -> {
try {
Thread.sleep(10);
} catch (Exception e) {
}
});
}
}
当您将问题分解为任务时,如果这些任务可能被其他任务阻塞,并且尝试在有限线程池中执行它们,您就会面临池导致死锁的风险。请参阅实践中的Java并发8.1。
这无疑是代码中的一个bug。您正在用将要阻塞的任务填充FJ池,以等待同一池中其他任务的结果。有时你很幸运,事情没有死锁(就像不是所有的锁排序错误都会导致死锁一样),但基本上你在这里是在一个非常薄的冰上滑冰。
我有一个关于管理线程的简单问题。我有3个进程,它们与一个许可证共享相同的信号量。在正常情况下,第一道工序采用该许可证,第二道工序发放两个许可证。第二个过程版本3允许进行第三个过程。我举了一个例子来说明我的问题。 第一个: 第二道工序: } 最后一个: 问题是。当我运行这三个进程并确保进程3是第一个执行的进程时。我会死锁。进程2永远不会打印“Hello 3”,进程1永远不会打印“Hello 2”。为
最近我在写一些复杂的基于RX的流程,发现它总是在特定情况下产生死锁。我花了几个小时才找出问题所在,似乎可以在这个简单的示例中重现: 此程序应打印以下值:11、21、22、31、32、33、。。。,通常,值可以表示为XY。每组X中的值的顺序可以是随机的,但组的顺序应该是升序。如果previous仍在计算,则不应发出新组(这是我的原始情况)。 问题是,如果您运行这段代码,您将只看到前几个元素的输出-我
我有一个事务性grails服务,它正在进行一些(长时间运行的)处理。在处理过程中,我想更新一个“percentComplete”值(最终将用于在前端显示进度条)。显然,这必须立即写入(即不作为当前事务的一部分),否则它毫无价值。 因此,我使用Grails事务处理插件,特别是“withNewTransaction”方法来启动一个新事务,其中百分比完成属性被更新。 我的问题是,第二次使用“update
示例2: 如果流被设置为,就像在第二个示例中那样,我可以想象内部工作者在等待外部工作队列中的线程可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,而默认线程池只有有限数量的线程。但是,似乎不会出现死锁: 两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成之后才能完成,因为在每个并行流的末端有一个完成屏障。 如何通过共享的工作线程池来
读者: 这看起来像是一个僵局,但有几件事让我怀疑: 我找不到另一个可能持有相同锁的线程 4秒后进行线程转储会得到相同的结果,但现在所有线程都报告,这与第一次转储中的不同。
注意:我已经在另一篇文章中解决了这个问题,所以在嵌套的Java8并行流操作中使用信号量可能会造成死锁。这是窃听器吗?-,但这篇文章的标题暗示问题与一个信号量的使用有关--这多少分散了讨论的注意力。我创建这个例子是为了强调嵌套循环可能存在性能问题--尽管这两个问题可能有一个共同的原因(也许是因为我花了很多时间来解决这个问题)。(我不认为它是一个重复,因为它强调了另一个症状--但如果你真的这么做了,就