参考:Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser
JDK
版本:AdoptOpenJDK 11.0.10+9
Phaser
(阶段器,发音:非泽尔
),是1.7
引入的,用于分阶段执行任务的场景。
Phaser
中定义了一些概念:
phase
(阶段)类似于CyclicBarrier
,Phaser
也有栅栏的概念。在Phaser
中,栅栏叫做phase
(阶段),在任意的时间点,Phaser
只处于某一个phase
(阶段),初始为0
,最大达到Integer.MAX_VALUE
,然后重新归零。当所有的parties
(参与者)都到达之后,phase
将会递增。
parties
(参与者)parties
(参与者)就是参与活动的线程。在Phaser
中,既可以在初始构造的时候指定parties
(参与者)的数量,也可以在中途通过register
、bulkRegister
、arriveAndDeregister
等方法注册/注销参与者。
arrive
(到达)、advance
(进阶)Phaser
注册完parties
(参与者)之后,参与者的初始状态是unarrived
(未到达的)。当参与者到达当前阶段之后,参与者的状态将会变成arrived
(到达的)。当到达该阶段的参与者的数量等于注册的数量的时候,该阶段就会发生advance
(进阶),也就是phase
的值+1
。
Termination
(终止)表示当前的Phaser
到达终止状态。
Tiering
(分层)Phaser
支持Tiering
(分层)—— 一种树形结构。当一个Phaser
有大量参与者的时候,内部同步操作会使性能急剧下降,引入分层可以有效的降低竞争,提高性能。
Phaser
提供了一些方法,主要方法如下:
方法 | 说明 |
---|---|
Phaser() | 创建一个Phaser ,没有注册任何参与者,没有父节点,初始化phase (阶段)的值为0 。 |
Phaser(int parties) | 创建一个Phaser ,指定注册的参与者数量,没有父节点,初始化phase (阶段)的值为0 。 |
Phaser(Phaser parent) | 创建一个Phaser ,没有注册任何参与者,指定父节点,初始化phase (阶段)的值为0 。 |
Phaser(Phaser parent, int parties) | 创建一个Phaser ,指定注册的参与者数量,指定父节点,初始化phase (阶段)的值为0 。 |
方法 | 说明 |
---|---|
register() | 向这个Phaser 注册一个新的unarrived 状态的party (参与者)。 |
bulkRegister(int parties) | 批量注册一批参与者。 |
arrive() | 到达这个phase (阶段),不等待其他参与者到达。 |
arriveAndDeregister() | 到达这个phase (阶段),并且注销一个参与者,使得参与者的数量-1 。 |
arriveAndAwaitAdvance() | 到达这个phase (阶段),并且等待进阶。 |
onAdvance(int phase, int registeredParties) | 这是一个可以被重载的方法。当每个阶段完成将会自动调用这个方法,参数phase 表示当前阶段的phase 值,registeredParties 表示注册的参与者数量。该方法的返回值是boolean 类型,返回false 表示进入下一阶段,返回true 表示终止。 |
下面的例子,模拟4个工人完成三个阶段的工作。只有当4个工人全部都完成了当前阶段的工作的时候,所有工人才会进入下一阶段的工作。
package com.example.demo.util;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) {
WorkPhaser workPhaser = new WorkPhaser();
// 4个工人参与工作
for (int i = 0; i < 4; i++) {
// 注册
workPhaser.register();
// 启动线程
new Thread(new Worker(workPhaser), "work-" + (i + 1)).start();
}
}
}
/**
* 工作阶段器
*/
class WorkPhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一阶段工作完成,进入第二阶段>>>");
return false;
case 1:
System.out.println("第二阶段工作完成,进入第三阶段>>>");
return false;
case 2:
System.out.println("第三阶段工作完成,退出.");
return true;
default:
return true;
}
}
}
/**
* 工人
*/
class Worker implements Runnable {
private WorkPhaser workPhaser;
public Worker(WorkPhaser workPhaser) {
this.workPhaser = workPhaser;
}
@Override
public void run() {
String playerName = Thread.currentThread().getName();
System.out.println(playerName + " 工人完成了第一阶段的工作.");
// 到达阶段,等待进阶
workPhaser.arriveAndAwaitAdvance();
System.out.println(playerName + " 工人完成了第二阶段的工作.");
// 到达阶段,等待进阶
workPhaser.arriveAndAwaitAdvance();
System.out.println(playerName + " 工人完成了第三阶段的工作.");
// 到达阶段,等待进阶
workPhaser.arriveAndAwaitAdvance();
}
}
输入:
work-1 工人完成了第一阶段的工作.
work-4 工人完成了第一阶段的工作.
work-3 工人完成了第一阶段的工作.
work-2 工人完成了第一阶段的工作.
第一阶段工作完成,进入第二阶段>>>
work-2 工人完成了第二阶段的工作.
work-1 工人完成了第二阶段的工作.
work-4 工人完成了第二阶段的工作.
work-3 工人完成了第二阶段的工作.
第二阶段工作完成,进入第三阶段>>>
work-3 工人完成了第三阶段的工作.
work-1 工人完成了第三阶段的工作.
work-2 工人完成了第三阶段的工作.
work-4 工人完成了第三阶段的工作.
第三阶段工作完成,退出.
Phaser
分层的例子暂无。
Phaser
的源码比较复杂,参见Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser