当前位置: 首页 > 工具软件 > Phaser > 使用案例 >

Java高并发编程中Phaser的使用及详细介绍-刘宇

黄浩涆
2023-12-01

作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。

一、什么是Phaser?

Phaser又称“阶段器”,用来解决控制多个线程分阶段共同完成任务的情景问题。它与CountDownLatch和CyclicBarrier类似,都是等待一组线程完成工作后再执行下一步,协调线程的工作。但在CountDownLatch和CyclicBarrier中我们都不可以动态的配置parties,而Phaser可以动态注册需要协调的线程,相比CountDownLatch和CyclicBarrier就会变得更加灵活。

二、Phaser的常用方法

1、register方法

动态添加一个parties

int register()

2、bulkRegister方法

动态添加多个parties

  • parties:需要添加的个数
int bulkRegister(int parties)

3、getRegisteredParties方法

获取当前的parties数

int getRegisteredParties()

4、arriveAndAwaitAdvance方法

到达并等待其他线程到达

int arriveAndAwaitAdvance()

5、arriveAndDeregister方法

到达并注销该parties,这个方法不会使线程阻塞

int arriveAndDeregister()

6、arrive方法

到达,但不会使线程阻塞

int arrive()

7、awaitAdvance方法

等待前行,可阻塞也可不阻塞,判断条件为传入的phase是否为当前phaser的phase。如果相等则阻塞,反之不进行阻塞

  • phase:阶段数值
int awaitAdvance(int phase)

8、awaitAdvanceInterruptibly方法

该方法与awaitAdvance类似,唯一不一样的就是它可以进行打断。

  • phase:阶段数值
  • timeout:超时时间
  • unit:时间单位
int awaitAdvanceInterruptibly(int phase)
int awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)

9、getArrivedParties方法

获取当前到达的parties数

int getArrivedParties()

10、getUnarrivedParties方法

获取当前未到达的parties数

int getUnarrivedParties()

11、getPhase方法

获取当前属于第几阶段,默认从0开始,最大为integer的最大值

int getPhase()

12、isTerminated方法

判断当前phaser是否关闭

boolean isTerminated()

13、forceTermination方法

强制关闭当前phaser

void forceTermination()

二、案例

1、使用Phaser动态注册parties

package com.test.part3.lock;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample {
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        Phaser phaser = new Phaser();
        //创建5个任务
        for (int i=0;i<5;i++){
            new Task(phaser).start();
        }
        //动态注册
        phaser.register();
        //等待其他线程完成工作
        phaser.arriveAndAwaitAdvance();
        System.out.println("All of worker finished the task");
    }

    private static class Task extends Thread{
        private Phaser phaser;

        public Task(Phaser phaser) {
            this.phaser = phaser;
            //动态注册任务
            this.phaser.register();
        }

        @Override
        public void run() {
            try {
                System.out.println("The thread ["+getName()+"] is working");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("The thread ["+getName()+"] work finished");
            //等待其他线程完成工作
            phaser.arriveAndAwaitAdvance();
        }
    }
}

运行结果:

The thread [Thread-0] is working
The thread [Thread-1] is working
The thread [Thread-2] is working
The thread [Thread-3] is working
The thread [Thread-4] is working
The thread [Thread-2] work finished
The thread [Thread-4] work finished
The thread [Thread-0] work finished
The thread [Thread-1] work finished
The thread [Thread-3] work finished
All of worker finished the task

Process finished with exit code 0

2、使用Phaser设置多个阶段

  • 这边使用的案例是运动员,模拟多个运动员参加多个项目。
package com.test.part3.lock;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample2 {
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        //初始化5个parties
        Phaser phaser = new Phaser(5);
        for (int i=1;i<6;i++){
            new Athlete(phaser,i).start();
        }
    }
    //创建运动员类
    private static class Athlete extends Thread{
        private Phaser phaser;
        private int no;//运动员编号

        public Athlete(Phaser phaser,int no) {
            this.phaser = phaser;
            this.no = no;
        }

        @Override
        public void run() {
            try {
                System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                System.out.println(no+": start running");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(no+": end running");
                //等待其他运动员完成跑步
                phaser.arriveAndAwaitAdvance();

                System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                System.out.println(no+": start bicycle");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(no+": end bicycle");
                //等待其他运动员完成骑行
                phaser.arriveAndAwaitAdvance();

                System.out.println(no+": 当前处于第:"+phaser.getPhase()+"阶段");
                System.out.println(no+": start long jump");
                TimeUnit.SECONDS.sleep(random.nextInt(5));
                System.out.println(no+": end long jump");
                //等待其他运动员完成跳远
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

1: 当前处于第:0阶段
3: 当前处于第:0阶段
3: start running
2: 当前处于第:0阶段
2: start running
1: start running
4: 当前处于第:0阶段
4: start running
5: 当前处于第:0阶段
5: start running
5: end running
2: end running
1: end running
4: end running
3: end running
3: 当前处于第:1阶段
5: 当前处于第:1阶段
5: start bicycle
1: 当前处于第:1阶段
1: start bicycle
2: 当前处于第:1阶段
2: start bicycle
3: start bicycle
4: 当前处于第:1阶段
4: start bicycle
1: end bicycle
3: end bicycle
4: end bicycle
5: end bicycle
2: end bicycle
3: 当前处于第:2阶段
1: 当前处于第:2阶段
1: start long jump
4: 当前处于第:2阶段
4: start long jump
5: 当前处于第:2阶段
5: start long jump
2: 当前处于第:2阶段
2: start long jump
3: start long jump
2: end long jump
4: end long jump
1: end long jump
5: end long jump
3: end long jump

Process finished with exit code 0

3、常用方法演示

package com.brycen.part3.lock;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserExample3 {
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) throws InterruptedException {
        //初始化5个parties
        Phaser phaser = new Phaser(5);

        //只有当全部线程通过时才会进入下一阶段,从0开始
        System.out.println("当前阶段数:"+phaser.getPhase());

        //添加一个parties
        phaser.register();
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());
        //添加多个parties
        phaser.bulkRegister(4);
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());

        new Thread(new Runnable() {
            @Override
            public void run() {
                //到达并等待其他线程到达
                phaser.arriveAndAwaitAdvance();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                //到达后注销该parties,不等待其他线程
                phaser.arriveAndDeregister();
                System.out.println("go on");
            }
        }).start();
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("当前Parties数:"+phaser.getRegisteredParties());
        System.out.println("当前到达数:"+phaser.getArrivedParties());
        System.out.println("当前未达数:"+phaser.getUnarrivedParties());

        //何时会停止,只有当parties中的数量为0时或者调用forceTermination方法就会停止了,我们也可以重写phaser中的onAdvance,给他返回true就会使这个phaser停止了
        System.out.println("phaser是否结束:"+phaser.isTerminated());
        phaser.forceTermination();
        System.out.println("phaser是否结束:"+phaser.isTerminated());
    }

}

运行结果:

当前阶段数:0
当前Parties数:6
当前Parties数:10
go on
当前Parties数:9
当前到达数:1
当前未达数:8
phaser是否结束:false
phaser是否结束:true

4、利用arrive只监听线程完成第一部分任务

package com.brycen.part3.lock;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class PhaserExample4 {
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) throws InterruptedException {
        //初始化6个parties
        Phaser phaser = new Phaser(6);
        //创建5个任务
        IntStream.rangeClosed(1,5).forEach(i->new ArrayTask(i,phaser).start());
        //等待5个任务的第一部分完成
        phaser.arriveAndAwaitAdvance();
        System.out.println("all work finished");
    }

    private static class ArrayTask extends Thread{
        private Phaser phaser;

        public ArrayTask(int name,Phaser phaser) {
            super(String.valueOf(name));
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                //模拟第一部分工作
                System.out.println(getName()+" start working");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(getName()+" end working");
                //该方法表示到达但不会使线程阻塞
                phaser.arrive();
                //模拟第二部分工作
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(getName()+" do other thing");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

1 start working
2 start working
3 start working
3 end working
4 start working
5 start working
1 end working
4 end working
2 end working
2 do other thing
5 end working
3 do other thing
all work finished
5 do other thing
1 do other thing
4 do other thing

5、awaitAdvance演示

package com.test.part3.lock;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class PhaserExample5 {
    private static Random random = new Random(System.currentTimeMillis());
    public static void main(String[] args) {
        //初始化6个parties
        Phaser phaser = new Phaser(5);
        //创建5个任务
        IntStream.rangeClosed(1,5).forEach(i->new ArrayTask(i,phaser).start());
        //当phaser中的当前阶段等于传入的阶段则该方法会阻塞,反之不会
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("all work finished");
    }

    private static class ArrayTask extends Thread{
        private Phaser phaser;

        public ArrayTask(int name,Phaser phaser) {
            super(String.valueOf(name));
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                System.out.println(getName()+" start working");
                TimeUnit.SECONDS.sleep(random.nextInt(3));
                System.out.println(getName()+" end working");
                phaser.arriveAndAwaitAdvance();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

运行结果:

1 start working
2 start working
3 start working
4 start working
5 start working
5 end working
1 end working
2 end working
3 end working
4 end working
all work finished
 类似资料: