在使用Java并行流时,当一些并行操作在静态初始化器块内完成时,我遇到了死锁。
import java.util.Arrays;
public class Example1 {
static {
// displays the numbers from 1 to 10 ordered => no thread issue
Arrays.asList(1,2,3,4,5,6,7,8,9,10)
.forEach(s->System.out.println(s));
}
public static final void main(String[] args) {}
}
当并行处理流时,所有的工作(数字不按顺序显示):
import java.util.Arrays;
public class Example2 {
static {
// displays the numbers from 1 to 10 unordered => no thread issue
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEach(s->System.out.println(s));
}
public static final void main(String[] args) {}
}
但是,当使用foreachOrdered()
处理流时,会出现死锁(我假设这与主线程和ForkJoinPool管理之间的交互有关):
import java.util.Arrays;
public class Example3 {
static {
// hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks)
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(s->System.out.println(s));
}
public static final void main(String[] args) {}
}
但是当在一个单独的线程中生成流处理时,一切都很顺利:
import java.util.Arrays;
public class Example4 {
static {
// displays the numbers from 1 to 10 ordered => no thread issue
new Thread(()->
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(s->System.out.println(s))
).start();
}
public static final void main(String[] args) {}
}
如果能理解为什么在某些情况下会出现僵局,而在其他情况下却不会出现这种情况,我将不胜感激。这显然不仅仅是因为使用了静态初始化器块、并行流和lambda,因为example2
、example3
和example4
使用了这三个概念,但只有example3
会导致死锁。
虽然这个问题看起来像是“为什么静态初始化器中带有lambda的并行流会导致死锁?”的重复,但事实并非如此。我的问题超出了链接的问题,因为它提供了example2
,我们有静态初始化器块、并行流和lambda,但没有死锁。这就是为什么问题标题中含有“可能导致僵局但不一定”的原因。
这种死锁行为有两个根本原因:
main
线程正在等待另一个线程(例如otherThread
)完成其工作(在示例3中,otherThread
是ForeachOrdered()
操作使用的ForkJoinPool
的线程之一)otherthread
使用了一个Lambda表达式,该表达式将由main
线程定义,但以后会定义(请记住:Lambda是在运行时创建的,而不是在编译时创建的)。在示例3中,此Lambda是.ForeachOrdered()
.让我们回顾一下这些示例,并解释它们为什么会产生死锁或不产生死锁。
只有一个线程(main
)执行以下操作:
由于只有一个线程,所以不会发生死锁。
为了更好地理解处理过程,我们可以将其改写为:
import java.util.Arrays;
public class Example2Instrumented {
static {
// displays the numbers from 1 to 10 unordered => no thread issue
System.out.println(Thread.currentThread().getName()+" : "+"static initializer");
Arrays.asList(1,2,3,4,5,6,7,8,9,10)
.parallelStream()
.forEach(s->System.out.println(Thread.currentThread().getName()+" : "+s));
}
public static final void main(String[] args) {}
}
这将产生以下结果:
main : static initializer
main : 7
main : 6
ForkJoinPool.commonPool-worker-2 : 9
ForkJoinPool.commonPool-worker-4 : 5
ForkJoinPool.commonPool-worker-9 : 3
ForkJoinPool.commonPool-worker-11 : 2
ForkJoinPool.commonPool-worker-2 : 10
ForkJoinPool.commonPool-worker-4 : 4
ForkJoinPool.commonPool-worker-9 : 1
ForkJoinPool.commonPool-worker-13 : 8
import java.util.Arrays;
import java.util.function.Consumer;
public class Example3NoDeadlock {
static {
// displays the numbers from 1 to 10 ordered => no thread issue anymore
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.forEachOrdered(
new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(t);
}});
}
public static final void main(String[] args) {}
}
为了更好地理解,我们可以编写以下代码:
import java.util.Arrays;
import java.util.function.Consumer;
public class Example3Instrumented {
static {
System.out.println("static initializer");
// hangs forever (deadlock between the main thread which loads the class and the underlying ForkJoinPool which join several tasks)
Arrays.asList(1,2,3,4,5,6,7,8,9,10).parallelStream()
.peek(new Consumer<Integer>() {
@Override
public void accept(Integer t) {
System.out.println(Thread.currentThread().getName()+" "+t);
}})
.forEachOrdered(s->System.out.println(s));
}
public static final void main(String[] args) {}
}
这将产生以下输出:
main : static initializer
ForkJoinPool.commonPool-worker-6 1
ForkJoinPool.commonPool-worker-9 3
main 7
ForkJoinPool.commonPool-worker-4 2
ForkJoinPool.commonPool-worker-13 6
ForkJoinPool.commonPool-worker-11 8
ForkJoinPool.commonPool-worker-15 5
ForkJoinPool.commonPool-worker-2 9
ForkJoinPool.commonPool-worker-4 10
ForkJoinPool.commonPool-worker-9 4
main
线程处理静态初始值设定项,然后通过为流中的每个元素创建一个任务来开始处理forEachOrdered(为了保持顺序,使用了一个复杂的基于树的算法,请参见Foreachops.ForeachOrderedTask
:创建任务,从代码中可以看出每个任务都在等待另一个任务完成才能运行)。所有任务都提交给forkjoinpool
。我认为死锁的发生是因为第一个任务是由forkjoinpool
的工作线程处理的,并且该线程等待main
线程构建lambda。并且main
线程已经开始处理其任务,正在等待另一个完成其任务的工作线程运行。因此出现了僵局。
在Example4中,我们生成了一个异步运行的新线程(即我们不等待结果)。这就是为什么main
线程没有被锁定,现在有时间在运行时构建Lambdas。
外卖的教训是:如果您混合了静态初始化器、线程和lambdas,那么您应该真正理解这些概念是如何实现的,否则您可能会出现死锁。
我遇到了一个奇怪的情况,在静态初始化器中使用带有lambda的并行流似乎永远不会占用CPU。代码如下: 这似乎是该行为的最小再现测试用例。如果我: null 我使用的是OpenJDK版本1.8.0_66-internal。
问题内容: 注意: 这是不是重复,请仔细阅读题目 сarefully报价: 真正的问题是为什么代码有时在不应该运行的情况下仍然有效。即使没有lambda,该问题也会重现。这使我认为可能存在JVM错误。 在http://codingdict.com/questions/122889的评论中,我试图找出原因,导致代码行为从一个起点到另一个起点有所不同,而该讨论的参与者为我提供了一个建议,以创建一个单独
死锁描述了另外两个线程因为永远等待对方而被阻塞的情况。当死锁发生时,程序永远挂起,你唯一能做的就是杀死程序。 为什么在下面给出的示例生产者-消费者问题中没有发生死锁: 我想知道为什么当同步对象正在等待其他线程释放锁时,在同步块中调用等待方法不会导致死锁?
问题内容: 以下代码导致死锁(在我的电脑上): 但是,如果我将reducelambda参数替换为匿名类,则不会导致死锁: 你能解释一下这种情况吗? P.S. 我发现该代码(与之前的代码有些不同): 工作不稳定。在大多数情况下,它挂起了,但是有时它成功完成了: 在此处输入图片说明 我真的不明白为什么这种行为不稳定。实际上,我重新测试了第一个代码段,并且行为相同。因此,最新的代码等于第一个。 为了了解
最近我在写一些复杂的基于RX的流程,发现它总是在特定情况下产生死锁。我花了几个小时才找出问题所在,似乎可以在这个简单的示例中重现: 此程序应打印以下值:11、21、22、31、32、33、。。。,通常,值可以表示为XY。每组X中的值的顺序可以是随机的,但组的顺序应该是升序。如果previous仍在计算,则不应发出新组(这是我的原始情况)。 问题是,如果您运行这段代码,您将只看到前几个元素的输出-我
我们正在创建一条从RDS中的Mysql到弹性搜索的数据管道,用于创建搜索索引,并为此使用debezium cdc及其Mysql源和弹性接收器连接器。 现在,由于mysql在rds中,我们必须为我们想要cdc的两个表授予mysql用户锁表权限,如文档中所述。 我们还有其他各种mysql用户执行事务,这可能需要这两个表中的任何一个。 当我们将mysql连接器连接到我们的生产数据库时,就产生了一个锁,我