通过使用 CyclicBarrier 类解决了 CountDownLatch 类的种种缺点,但不可否认的是,CyclicBarrier 类还是有 些自身上的缺陷,比如不可以动态添加 parties计数,调用await()方法仅仅占用 parties 计数,所以在 JDKl.7 中新增加了 个名称为 Phaser 的类来解决这样的问题。
单词 Phaser ,中文翻译为移相器 移相器这个术语是在电子专业中使用的,但在 Java 语言中,该类是在 JDKl.7 版本中新增的。
1.方法 arriveAndAwaitAdvance()的作用与 CountDownLatch 类中的 await()方法大体一样,通过从方法的名称解释来看, arrive 是到达的意思, wait 是等待的意思,而 advance 是前进、促进的意思,所以执行这个方法的作用就是当前线程已经到达屏障,在此等待一段时间,等条件满足后继续向下一个屏障继续执行
2.通过前面的解释可以发现,类 Phaser 具有设置多屏障的功能,有些类似于体育竞赛中“赛段”的作用,运动员第一赛段结束后,开始休整准备,然后集体到达第二赛段的起跑点,等待比赛开始后,运动员们又继续比赛了,说明 Phaser类与 CyclicBarrier 类在功能上有重叠。
使用 Phaser 来实现一个比赛过程中的“多赛段”问题
3.当计数不足时,线程 阻塞状态,不继续向下运行
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"第一次开始"+System.currentTimeMillis());
//模拟线程1需要处理业务,2S后才会到达屏障
if(Thread.currentThread().getName().equals("线程1")){
Thread.sleep(2000);
}
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次结束"+System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"第二次开始"+System.currentTimeMillis());
//模拟线程1需要处理业务,2S后才会到达屏障
if(Thread.currentThread().getName().equals("线程1")){
Thread.sleep(2000);
}
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第二次结束"+System.currentTimeMillis());
System.out.println(Thread.currentThread().getName()+"第三次开始"+System.currentTimeMillis());
//模拟线程1需要处理业务,2S后才会到达屏障
if(Thread.currentThread().getName().equals("线程1")){
Thread.sleep(2000);
}
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次结束"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
MyThread[] myThreads = new MyThread[3];
for (int i = 0; i < myThreads.length; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
}
}
线程1第一次开始1594654146099
线程2第一次开始1594654146099
线程3第一次开始1594654146099
线程1第一次结束1594654148099
线程1第二次开始1594654148099
线程3第一次结束1594654148099
线程3第二次开始1594654148099
线程2第一次结束1594654148099
线程2第二次开始1594654148100
线程1第二次结束1594654150099
线程1第三次开始1594654150099
线程2第二次结束1594654150099
线程3第二次结束1594654150099
线程3第三次开始1594654150100
线程2第三次开始1594654150100
线程1第三次结束1594654152101
线程2第三次结束1594654152101
线程3第三次结束1594654152101
方法 arriveAndDeregister()的作用是使当前线程(运动员)退出比赛,并且使 parties值减1
下面代码初始化屏障等待线程数3,线程3执行arriveAndDeregister方法将屏障等待线程数置为了2,其实线程3并未退出,会继续执行下面代码,输出了“第一次抵达…”。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障,当前线程设置的屏障线程数:"+phaser.getRegisteredParties());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障,当前线程设置的屏障线程数:"+phaser.getRegisteredParties());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障,当前线程设置的屏障线程数:"+phaser.getRegisteredParties());
} catch (Exception e) {
e.printStackTrace();
}
}
}
//业务线程
public static class MyThreadB extends Thread{
private Phaser phaser;
public MyThreadB(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
//线程本次通过屏障之后,将会移除且将设置的屏障线程数-1
phaser.arriveAndDeregister();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障,当前线程设置的屏障线程数:"+phaser.getRegisteredParties());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
System.out.println(Thread.currentThread().getName()+"phaser对象初始化设置的屏障线程数:"+phaser.getRegisteredParties());
MyThread[] myThreads = new MyThread[3];
//两个MyThread线程执行arriveAndAwaitAdvance方法
for (int i = 0; i < myThreads.length-1; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
//一个MyThread线程执行arriveAndDeregister方法
MyThreadB myThreadB = new MyThreadB("线程3",phaser);
myThreadB.start();
}
}
mainphaser对象初始化设置的屏障线程数:3
线程3第一次抵达屏障,当前线程设置的屏障线程数:2
线程2第一次抵达屏障,当前线程设置的屏障线程数:2
线程1第一次抵达屏障,当前线程设置的屏障线程数:2
线程1第二次抵达屏障,当前线程设置的屏障线程数:2
线程2第二次抵达屏障,当前线程设置的屏障线程数:2
线程1第三次抵达屏障,当前线程设置的屏障线程数:2
线程2第三次抵达屏障,当前线程设置的屏障线程数:2
方法 Phase()是 获取已经到达第几个屏障。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障,通过屏障数:"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障,通过屏障数:"+phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障,通过屏障数:"+phaser.getPhase());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3);
System.out.println(Thread.currentThread().getName()+"phaser对象初始化设置的屏障线程数:"+phaser.getRegisteredParties());
MyThread[] myThreads = new MyThread[3];
//两个MyThread线程执行arriveAndAwaitAdvance方法
for (int i = 0; i < myThreads.length; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
}
}
mainphaser对象初始化设置的屏障线程数:3
线程1第一次抵达屏障,通过屏障数:1
线程3第一次抵达屏障,通过屏障数:1
线程2第一次抵达屏障,通过屏障数:1
线程1第二次抵达屏障,通过屏障数:2
线程3第二次抵达屏障,通过屏障数:2
线程2第二次抵达屏障,通过屏障数:2
线程1第三次抵达屏障,通过屏障数:3
线程2第三次抵达屏障,通过屏障数:3
线程3第三次抵达屏障,通过屏障数:3
1.onAdvance()方法是Phaser类中受保护的方法,返回boolean ,phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。
2.return registeredParties == 0;代码表示如果registeredParties已经注册的parties个数为0返回true表示当前屏障禁用。如果当前注册数不为0,返回false,屏障继续使用。
3.此方法可以被重写,此方法会在每次通过屏障时调用(如果屏障没禁用)。
onAdvance()源码
protected boolean onAdvance(int phase, int registeredParties) {
return registeredParties == 0;
}
示例1:返回true,取消屏障
线程3未执行onAdvance()方法时,所有线程在屏障处等待线程3。线程3执行完onAdvance()方法后,屏障禁用,所有线程略过屏障直接执行下面代码不等待任何线程,因为没有再经过屏障所以onAdvance()方法没有再次执行。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
if(Thread.currentThread().getName().equals("线程3")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程3")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程1")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3){
//重新onAdvance返回true
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return true;
}
};
System.out.println(Thread.currentThread().getName()+"phaser对象初始化设置的屏障线程数:"+phaser.getRegisteredParties());
MyThread[] myThreads = new MyThread[3];
//两个MyThread线程执行arriveAndAwaitAdvance方法
for (int i = 0; i < myThreads.length; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
}
}
mainphaser对象初始化设置的屏障线程数:3
线程1第一次抵达屏障开始,通过屏障数:0,时间:1594657633039
线程2第一次抵达屏障开始,通过屏障数:0,时间:1594657633039
线程3第一次抵达屏障开始,通过屏障数:0,时间:1594657635050
线程3执行了onAdvance()方法
线程3第一次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程1第一次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程2第一次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程2第二次抵达屏障开始,通过屏障数:-2147483647,时间:1594657635050
线程1第二次抵达屏障开始,通过屏障数:-2147483647,时间:1594657635050
线程2第二次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程2第三次抵达屏障开始,通过屏障数:-2147483647,时间:1594657635050
线程1第二次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程2第三次抵达屏障结束,通过屏障数:-2147483647,时间:1594657635050
线程3第二次抵达屏障开始,通过屏障数:-2147483647,时间:1594657637050
线程1第三次抵达屏障开始,通过屏障数:-2147483647,时间:1594657637050
线程3第二次抵达屏障结束,通过屏障数:-2147483647,时间:1594657637050
线程1第三次抵达屏障结束,通过屏障数:-2147483647,时间:1594657637050
线程3第三次抵达屏障开始,通过屏障数:-2147483647,时间:1594657637050
线程3第三次抵达屏障结束,通过屏障数:-2147483647,时间:1594657637050
示例1:返回false,使用屏障
线程继续堵塞,继续等待线程3,屏障正常使用。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
if(Thread.currentThread().getName().equals("线程3")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程3")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程1")){
//模拟线程3执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障开始,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障结束,通过屏障数:"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
System.out.println(Thread.currentThread().getName()+"phaser对象初始化设置的屏障线程数:"+phaser.getRegisteredParties());
MyThread[] myThreads = new MyThread[3];
for (int i = 0; i < myThreads.length; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
}
}
mainphaser对象初始化设置的屏障线程数:3
线程1第一次抵达屏障开始,通过屏障数:0,时间:1594657897199
线程2第一次抵达屏障开始,通过屏障数:0,时间:1594657897199
线程3第一次抵达屏障开始,通过屏障数:0,时间:1594657899210
线程3执行了onAdvance()方法
线程3第一次抵达屏障结束,通过屏障数:1,时间:1594657899210
线程2第一次抵达屏障结束,通过屏障数:1,时间:1594657899210
线程1第一次抵达屏障结束,通过屏障数:1,时间:1594657899210
线程2第二次抵达屏障开始,通过屏障数:1,时间:1594657899210
线程1第二次抵达屏障开始,通过屏障数:1,时间:1594657899210
线程3第二次抵达屏障开始,通过屏障数:1,时间:1594657901230
线程3执行了onAdvance()方法
线程3第二次抵达屏障结束,通过屏障数:2,时间:1594657901230
线程3第三次抵达屏障开始,通过屏障数:2,时间:1594657901230
线程2第二次抵达屏障结束,通过屏障数:2,时间:1594657901230
线程2第三次抵达屏障开始,通过屏障数:2,时间:1594657901230
线程1第二次抵达屏障结束,通过屏障数:2,时间:1594657901230
线程1第三次抵达屏障开始,通过屏障数:2,时间:1594657903240
线程1执行了onAdvance()方法
线程1第三次抵达屏障结束,通过屏障数:3,时间:1594657903240
线程2第三次抵达屏障结束,通过屏障数:3,时间:1594657903240
线程3第三次抵达屏障结束,通过屏障数:3,时间:1594657903240
方法 getRegisteredParties() 获得注册的 parties 数量。
每执行一次方法 register (),就动态添加一个 parties值。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"初始化,当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
/动态添加一个Parties
phaser.register();
System.out.println(Thread.currentThread().getName()+"执行一次register(),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
phaser.register();
System.out.println(Thread.currentThread().getName()+"执行两次register(),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
phaser.register();
System.out.println(Thread.currentThread().getName()+"执行两次register(),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3){
//重新onAdvance返回true
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return true;
}
};
MyThread myThread= new MyThread("线程",phaser);
myThread.start();
}
}
线程初始化,当前需要注册的Parties数:3,时间:1594692673660
线程执行一次register(),当前需要注册的Parties数:4,时间:1594692673662
线程执行两次register(),当前需要注册的Parties数:5,时间:1594692673662
线程执行两次register(),当前需要注册的Parties数:6,时间:1594692673662
方法 bulkRegister()可 以批量增加Parties。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"初始化,当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
//批量增加Parties数量
phaser.bulkRegister(3);
System.out.println(Thread.currentThread().getName()+"执行一次bulkRegister(3),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
phaser.bulkRegister(4);
System.out.println(Thread.currentThread().getName()+"执行一次bulkRegister(4),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
phaser.bulkRegister(5);
System.out.println(Thread.currentThread().getName()+"执行一次bulkRegister(5),当前需要注册的Parties数:"+phaser.getRegisteredParties()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3){
//重新onAdvance返回true
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return true;
}
};
MyThread myThread= new MyThread("线程",phaser);
myThread.start();
}
}
线程初始化,当前需要注册的Parties数:3,时间:1594694287586
线程执行一次bulkRegister(3),当前需要注册的Parties数:6,时间:1594694287587
线程执行一次bulkRegister(4),当前需要注册的Parties数:10,时间:1594694287587
线程执行一次bulkRegister(5),当前需要注册的Parties数:15,时间:1594694287587
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
phaser.arriveAndAwaitAdvance();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(3){
//重新onAdvance返回true
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return true;
}
};
MyThread myThread= new MyThread("线程",phaser);
myThread.start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//已经到达屏障的线程
System.out.println("已经到达屏障的线程数" + phaser.getArrivedParties());
//屏障处未注册的线程
System.out.println("屏障处未到达的线程数" + phaser.getUnarrivedParties());
}
}
已经到达屏障的线程数1
屏障处未到达的线程数2
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
if(Thread.currentThread().getName().equals("线程2")){
//模拟线程2执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障开始,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第一次抵达屏障结束,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程2")){
//模拟线程2执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障开始,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
//执行arrive方法
phaser.arrive();
System.out.println(Thread.currentThread().getName()+"第二次抵达屏障结束,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
if(Thread.currentThread().getName().equals("线程2")){
//模拟线程2执行业务,其他线程屏障处等待
Thread.sleep(2000);
}
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障开始,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"第三次抵达屏障结束,通过屏障数:"+phaser.getPhase()+"phaser对象当前设置的屏障线程数:"+phaser.getRegisteredParties()+"phaser对象当前到达屏障的线程数:"+phaser.getArrivedParties()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
System.out.println(Thread.currentThread().getName()+"phaser对象初始化设置的屏障线程数:"+phaser.getRegisteredParties());
MyThread[] myThreads = new MyThread[2];
for (int i = 0; i < myThreads.length; i++) {
myThreads[i] = new MyThread("线程"+(i+1),phaser);
myThreads[i].start();
}
}
}
mainphaser对象初始化设置的屏障线程数:2
线程1第一次抵达屏障开始,通过屏障数:0phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710020640
线程2第一次抵达屏障开始,通过屏障数:0phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:1,时间:1594710022638
//线程2休眠2s抵达屏障,执行onAdvance方法
线程2执行了onAdvance()方法
线程2第一次抵达屏障结束,通过屏障数:1phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710022638
线程1第一次抵达屏障结束,通过屏障数:1phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710022638
线程1第二次抵达屏障开始,通过屏障数:1phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710022638
//线程1执行arrive()方法,parties加1,则当前到达屏障的线程数:1,此时线程2在休眠
//线程1并未等待线程2直接运行,注意此时当前到达屏障的线程数:1
线程1第二次抵达屏障结束,通过屏障数:1phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:1,时间:1594710022638
线程1第三次抵达屏障开始,通过屏障数:1phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:1,时间:1594710022638
//线程1执行了arriveAndAwaitAdvance()方法,自动执行onAdvance()方法
线程1执行了onAdvance()方法
//之前到达屏障的线程数:1,本次线程1到达之后线程数为2,屏障释放重置为0
线程1第三次抵达屏障结束,通过屏障数:2phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710022638
线程2第二次抵达屏障开始,通过屏障数:2phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710024639
//线程2休眠结束执行arrive()方法,不等待,注意此时当前到达屏障的线程数:1
线程2第二次抵达屏障结束,通过屏障数:2phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:1,时间:1594710024640
线程2第三次抵达屏障开始,通过屏障数:2phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:1,时间:1594710026644
线程2执行了onAdvance()方法
线程2第三次抵达屏障结束,通过屏障数:3phaser对象当前设置的屏障线程数:2phaser对象当前到达屏障的线程数:0,时间:1594710026644
方法 waitAdvance(int Phase的作用 :如果传入参数 phase 值和当前 getPhase() 方法返回值一样,则在屏障处等待,否则继续向下面运行,有些类似于旁观者的作用,当观察的条件满足了就等待(旁观),如果条件不满足,则程序向下继续运行。
方法 awaitAdvance(int Phase )并不参与 parties 计数的操作,仅仅具有判断的功能
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class MyThreadB extends Thread{
private Phaser phaser;
public MyThreadB(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//判断当前屏障处等待线程是否为1,为1继续运行,否则等待
System.out.println(phaser.getPhase());
phaser.awaitAdvance(1);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
//正常执行arriveAndAwaitAdvance的线程
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
//执行awaitAdvance,此时myThreadA已经在屏障处等待,当前通过屏障数0,不满足awaitAdvance(1),程序继续执行
MyThreadB myThreadC = new MyThreadB("线程C",phaser);
myThreadC.start();
//myThreadB延时2s执行,让myThreadA在屏障处持续等待,当前通过屏障数0
Thread.sleep(2000);
//正常执行arriveAndAwaitAdvance的线程,此时屏障处线程为2开启屏障,重置parties为0,当前通过屏障数1
MyThread myThreadB = new MyThread("线程B",phaser);
myThreadB.start();
Thread.sleep(2000);
//执行awaitAdvance,当前通过屏障数1,满足awaitAdvance(1),awaitAdvance()方法会一直等待
MyThreadB myThreadD = new MyThreadB("线程D",phaser);
myThreadD.start();
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594740567757
线程C抵达屏障开始,当前通过屏障数0,时间:1594740567758
0
线程C抵达屏障结束,当前通过屏障数0,时间:1594740567758
线程B抵达屏障开始,当前通过屏障数0,时间:1594740569758
线程B执行了onAdvance()方法
线程B抵达屏障结束,当前通过屏障数1,时间:1594740569758
线程A抵达屏障结束,当前通过屏障数1,时间:1594740569758
线程D抵达屏障开始,当前通过屏障数1,时间:1594740571758
1
1.方法 awaitAdvanceInterruptibly( int phase)的作用是当线程通过的屏障数不符合指定的参数phase值时,则继续执行下面的代码,同awaitAdvance(int phase)方法
2.方法 awaitAdvanceInterruptibly( int phase)的作用是当线程通过的屏障数等于指定的参数phase值时,则等待不执行下面代码,同awaitAdvance(int phase)方法
3.不同点:awaitAdvanceInterruptibly()方法对interrupt()线程中断抛异常,awaitAdvance()方法不处理interrupt中断
示例1.awaitAdvance(int phase)对interrupt()方法不处理,忽略
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//如果0等于当前通过屏障数,则不执行下面代码,一直等待
phaser.awaitAdvance(0);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
Thread.sleep(1000);
//线程终止,不影响awaitAdvance()等待
myThreadA.interrupt();
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594742170057
线程myThreadA 依旧继续等待,不处理interrupt()中断。
示例2:awaitAdvanceInterruptibly(int phase) 对interrupt()操作抛异常
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//如果0等于当前通过屏障数,则不执行下面代码,一直等待
phaser.awaitAdvanceInterruptibly(0);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
Thread.sleep(1000);
//线程终止,不影响awaitAdvance()等待
myThreadA.interrupt();
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594742254318
java.lang.InterruptedException
at java.base/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:749)
at com.example.demo.PhaserDemo$MyThread.run(PhaserDemo.java:20)
当执行myThreadA.interrupt();方法后,myThreadA抛异常。
方法 awa tAdvancelnterruptib ly( int,long, Time Unit)的作用是在指定的栏数等待最大的单位时间,如果在指定的时间内,栏数未变,也就是通过的屏障数没变,则出现异常,否则继续向下运行。
对于 interrupt()方法依然会抛异常。
示例1:2s内通过屏障数如果依然是0,抛异常TimeoutException
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//如果0等于当前通过屏障数,则不执行下面代码,一直等待
phaser.awaitAdvanceInterruptibly(0,2, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594742440475
java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:789)
at com.example.demo.PhaserDemo$MyThread.run(PhaserDemo.java:21)
示例2:2s内通过屏障数变为1,正常向下执行
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//如果0等于当前通过屏障数,则不执行下面代码,一直等待
phaser.awaitAdvanceInterruptibly(0,2, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
Thread.sleep(500);
//不等待其他线程,直接将parties加1
phaser.arrive();
System.out.println(Thread.currentThread().getName()+"执行arrive(),当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//此时parties为2,执行通过屏障方法
phaser.arrive();
System.out.println(Thread.currentThread().getName()+"执行arrive(),当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594742684443
main执行arrive(),当前通过屏障数0,时间:1594742684944
main执行了onAdvance()方法
main执行arrive(),当前通过屏障数1,时间:1594742684945
线程A抵达屏障结束,当前通过屏障数1,时间:1594742684945
示例3:2s内执行interrupt()方法,抛异常
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//如果0等于当前通过屏障数,则不执行下面代码,一直等待
phaser.awaitAdvanceInterruptibly(0,2, TimeUnit.SECONDS);
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(2){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
Thread.sleep(500);
myThreadA.interrupt();
}
}
线程A抵达屏障开始,当前通过屏障数0,时间:1594743195764
java.lang.InterruptedException
at java.base/java.util.concurrent.Phaser.awaitAdvanceInterruptibly(Phaser.java:787)
at com.example.demo.PhaserDemo$MyThread.run(PhaserDemo.java:21)
方法forceTermination()是使 Phaser屏障功能失效,而方法isTerminated()判断Phaser 对象是否已经呈销毁。
类 Phaser 执行forceTermination()方法时仅仅将屏障取消,线程继续执行后面的代码,并不出现异常,而CyclicBarrier 类的 reset()方法执行时,屏障处线程却出现异常。
public class PhaserDemo {
//业务线程
public static class MyThread extends Thread{
private Phaser phaser;
public MyThread(String name, Phaser phaser) {
super(name);
this.phaser = phaser;
}
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getName()+"抵达屏障开始,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
//等待其他线程一起过屏障
phaser.arriveAndAwaitAdvance();
System.out.println(Thread.currentThread().getName()+"抵达屏障结束,当前通过屏障数"+phaser.getPhase()+",时间:"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Phaser phaser = new Phaser(3){
//重新onAdvance返回false
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(Thread.currentThread().getName()+"执行了onAdvance()方法");
return false;
}
};
MyThread myThreadA = new MyThread("线程A",phaser);
myThreadA.start();
MyThread myThreadB = new MyThread("线程B",phaser);
myThreadB.start();
//上面两个线程都会在屏障处等待,直到parties=3
Thread.sleep(500);
//强制取消屏障,并不影响myThreadA,myThreadB,没有屏障之后两线程会向下继续执行
phaser.forceTermination();
System.out.println("当前屏障被取消");
//屏障是否被取消
System.out.println("当前屏障是否被取消:"+phaser.isTerminated());
}
}
线程B抵达屏障开始,当前通过屏障数0,时间:1594743561946
线程A抵达屏障开始,当前通过屏障数0,时间:1594743561946
当前屏障被取消
线程B抵达屏障结束,当前通过屏障数-2147483648,时间:1594743562446
线程A抵达屏障结束,当前通过屏障数-2147483648,时间:1594743562446
当前屏障是否被取消:true
类Phaser 提供 了动态增减 parties 计数,这点比 CyclicBarrier 类操作 parties 更加方便,通过若干个方法来控制多个线程之间同步运行的效果,还可以实现针对某一个线程取消同步运行的效果,而且支持在指定屏障处等待,在等待时还支持中断或非中断等功能,使用 Java并发类对线程进行分组同步控制时, Phaser比CyclicBarrier 类功能更加强大,建议使用。