Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;
一.简单使用
同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:
package com.example.demo.study; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Study0217 { //创建一个信号量的实例,信号量初始值为0 static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); pool.submit(()->{ System.out.println("Thread1---start"); //信号量加一 semaphore.release(); }); pool.submit(()->{ System.out.println("Thread2---start"); //信号量加一 semaphore.release(); }); pool.submit(()->{ System.out.println("Thread3---start"); //信号量加一 semaphore.release(); }); //等待两个子线程执行完毕就放过,必须要信号量等于2才放过 semaphore.acquire(2); System.out.println("两个子线程执行完毕"); //关闭线程池,正在执行的任务继续执行 pool.shutdown(); } }
这个信号量也可以复用,类似CyclicBarrier:
package com.example.demo.study; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class Study0217 { //创建一个信号量的实例,信号量初始值为0 static Semaphore semaphore = new Semaphore(0); public static void main(String[] args) throws Exception { ExecutorService pool = Executors.newFixedThreadPool(3); pool.submit(()->{ System.out.println("Thread1---start"); //信号量加一 semaphore.release(); }); pool.submit(()->{ System.out.println("Thread2---start"); //信号量加一 semaphore.release(); }); //等待两个子线程执行完毕就放过,必须要信号量等于2才放过 semaphore.acquire(2); System.out.println("子线程1,2执行完毕"); pool.submit(()->{ System.out.println("Thread3---start"); //信号量加一 semaphore.release(); }); pool.submit(()->{ System.out.println("Thread4---start"); //信号量加一 semaphore.release(); }); semaphore.acquire(2); System.out.println("子线程3,4执行完毕"); //关闭线程池,正在执行的任务继续执行 pool.shutdown(); } }
二.信号量原理
看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;
构造器:
//默认是非公平策略 public Semaphore(int permits) { sync = new NonfairSync(permits); } //可以根据第二个参数选择是公平策略还是非公平策略 public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
acquire(int permits)方法:
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //AQS中的方法 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //这里根据子类是公平策略还是非公平策略 if (tryAcquireShared(arg) < 0) //获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); } //非公平策略 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) { //一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将 //最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么 //这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法, //那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state // for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } //公平策略 //和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1, //就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了 protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 ||compareAndSetState(available, remaining)) return remaining; } }
再看看release(int permits)方法:
//这个方法的作用就是将信号量加一 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } //AQS中方法 public final boolean releaseShared(int arg) { //tryReleaseShared尝试释放资源 if (tryReleaseShared(arg)) { //释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程 doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { //一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了 //没有溢出的话,就用CAS更新state的值 for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; //ws==Node.SIGNAL表示节点中线程需要被唤醒 if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //调用阻塞队列中线程的unpark方法唤醒线程 unparkSuccessor(h); } //ws == 0表示节点中线程是初始状态 else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;
这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;
线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;
其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!
以上就是详解Java 信号量Semaphore的详细内容,更多关于Java 信号量Semaphore的资料请关注小牛知识库其它相关文章!
本文向大家介绍JAVA 多线程之信号量(Semaphore)实例详解,包括了JAVA 多线程之信号量(Semaphore)实例详解的使用技巧和注意事项,需要的朋友参考一下 java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确、合理的使用公共资源。 一个计数信号量
本文向大家介绍Java并发编程Semaphore计数信号量详解,包括了Java并发编程Semaphore计数信号量详解的使用技巧和注意事项,需要的朋友参考一下 Semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通
本文向大家介绍Java并发编程之Semaphore(信号量)详解及实例,包括了Java并发编程之Semaphore(信号量)详解及实例的使用技巧和注意事项,需要的朋友参考一下 Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预
主要内容:1 Semaphore的概述,2 Semaphore的原理,2.1 基本结构,2.2 可中断获取信号量,2.3 不可中断获取信号量,2.4 超时可中断获取信号量,2.5 尝试获取信号量,2.6 释放信号量,3 Semaphore的使用,4 Semaphore的总结详细介绍了Semaphore信号量的原理和应用,以及与CountDownLatch的对比! 1 Semaphore的概述 public class Semaphore extends Object implements Ser
我试图理解这个旧考试任务的答案,在这个任务中,学生应该使用JavasReentrantLock实现一个公平的二进制信号量。我不明白这些计数器的意义: 它在任务的描述中说,“你可以假设程序中使用信号量最多有20个线程。此外,最多1000万信号量操作将在程序的一次运行中执行。"在任务的解决方案中,它说:“每个试图获取信号量的线程必须在队列中注册自己,并且只有在之前的线程离开队列后才离开队列。每个线程使
本文向大家介绍详解Linux多线程使用信号量同步,包括了详解Linux多线程使用信号量同步的使用技巧和注意事项,需要的朋友参考一下 信号量、同步这些名词在进程间通信时就已经说过,在这里它们的意思是相同的,只不过是同步的对象不同而已。但是下面介绍的信号量的接口是用于线程的信号量,注意不要跟用于进程间通信的信号量混淆。 一、什么是信号量 线程的信号量与进程间通信中使用的信号量的概念是一样,它是一种特殊