当前位置: 首页 > 工具软件 > Phaser > 使用案例 >

Phaser详解

鲍俊杰
2023-12-01

参考:Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser

JDK版本:AdoptOpenJDK 11.0.10+9

1 概念

Phaser(阶段器,发音:非泽尔),是1.7引入的,用于分阶段执行任务的场景。

Phaser中定义了一些概念:

  • phase(阶段)

类似于CyclicBarrierPhaser也有栅栏的概念。在Phaser中,栅栏叫做phase(阶段),在任意的时间点,Phaser只处于某一个phase(阶段),初始为0,最大达到Integer.MAX_VALUE,然后重新归零。当所有的parties(参与者)都到达之后,phase将会递增。

  • parties(参与者)

parties(参与者)就是参与活动的线程。在Phaser中,既可以在初始构造的时候指定parties(参与者)的数量,也可以在中途通过registerbulkRegisterarriveAndDeregister等方法注册/注销参与者。

  • arrive(到达)、advance(进阶)

Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived(未到达的)。当参与者到达当前阶段之后,参与者的状态将会变成arrived(到达的)。当到达该阶段的参与者的数量等于注册的数量的时候,该阶段就会发生advance(进阶),也就是phase的值+1

  • Termination(终止)

表示当前的Phaser到达终止状态。

  • Tiering(分层)

Phaser支持Tiering(分层)—— 一种树形结构。当一个Phaser有大量参与者的时候,内部同步操作会使性能急剧下降,引入分层可以有效的降低竞争,提高性能。

2 方法

Phaser提供了一些方法,主要方法如下:

  1. 构造函数
方法说明
Phaser()创建一个Phaser,没有注册任何参与者,没有父节点,初始化phase(阶段)的值为0
Phaser(int parties)创建一个Phaser,指定注册的参与者数量,没有父节点,初始化phase(阶段)的值为0
Phaser(Phaser parent)创建一个Phaser,没有注册任何参与者,指定父节点,初始化phase(阶段)的值为0
Phaser(Phaser parent, int parties)创建一个Phaser,指定注册的参与者数量,指定父节点,初始化phase(阶段)的值为0
  1. 常用方法
方法说明
register()向这个Phaser注册一个新的unarrived状态的party(参与者)。
bulkRegister(int parties)批量注册一批参与者。
arrive()到达这个phase(阶段),不等待其他参与者到达。
arriveAndDeregister()到达这个phase(阶段),并且注销一个参与者,使得参与者的数量-1
arriveAndAwaitAdvance()到达这个phase(阶段),并且等待进阶。
onAdvance(int phase, int registeredParties)这是一个可以被重载的方法。当每个阶段完成将会自动调用这个方法,参数phase表示当前阶段的phase值,registeredParties表示注册的参与者数量。该方法的返回值是boolean类型,返回false表示进入下一阶段,返回true表示终止。

3 例子

下面的例子,模拟4个工人完成三个阶段的工作。只有当4个工人全部都完成了当前阶段的工作的时候,所有工人才会进入下一阶段的工作。

package com.example.demo.util;

import java.util.concurrent.Phaser;

public class PhaserTest {

	public static void main(String[] args) {
		WorkPhaser workPhaser = new WorkPhaser();
		// 4个工人参与工作
		for (int i = 0; i < 4; i++) {
			// 注册
			workPhaser.register();
			// 启动线程
			new Thread(new Worker(workPhaser), "work-" + (i + 1)).start();
		}
	}
}

/**
 * 工作阶段器
 */
class WorkPhaser extends Phaser {
	@Override
	protected boolean onAdvance(int phase, int registeredParties) {
		switch (phase) {
		case 0:
			System.out.println("第一阶段工作完成,进入第二阶段>>>");
			return false;
		case 1:
			System.out.println("第二阶段工作完成,进入第三阶段>>>");
			return false;
		case 2:
			System.out.println("第三阶段工作完成,退出.");
			return true;
		default:
			return true;
		}
	}
}

/**
 * 工人
 */
class Worker implements Runnable {

	private WorkPhaser workPhaser;

	public Worker(WorkPhaser workPhaser) {
		this.workPhaser = workPhaser;
	}

	@Override
	public void run() {
		String playerName = Thread.currentThread().getName();
		System.out.println(playerName + " 工人完成了第一阶段的工作.");
		// 到达阶段,等待进阶
		workPhaser.arriveAndAwaitAdvance();

		System.out.println(playerName + " 工人完成了第二阶段的工作.");
		// 到达阶段,等待进阶
		workPhaser.arriveAndAwaitAdvance();

		System.out.println(playerName + " 工人完成了第三阶段的工作.");
		// 到达阶段,等待进阶
		workPhaser.arriveAndAwaitAdvance();
	}

}

输入:

work-1 工人完成了第一阶段的工作.
work-4 工人完成了第一阶段的工作.
work-3 工人完成了第一阶段的工作.
work-2 工人完成了第一阶段的工作.
第一阶段工作完成,进入第二阶段>>>
work-2 工人完成了第二阶段的工作.
work-1 工人完成了第二阶段的工作.
work-4 工人完成了第二阶段的工作.
work-3 工人完成了第二阶段的工作.
第二阶段工作完成,进入第三阶段>>>
work-3 工人完成了第三阶段的工作.
work-1 工人完成了第三阶段的工作.
work-2 工人完成了第三阶段的工作.
work-4 工人完成了第三阶段的工作.
第三阶段工作完成,退出.

Phaser分层的例子暂无。

4 源码解析

Phaser的源码比较复杂,参见Java多线程进阶(二二)—— J.U.C之synchronizer框架:Phaser

 类似资料: