前言
CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。
内部实现差异
前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。
public class { //Synchronization control For CountDownLatch. //Uses AQS state to represent count. private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } } private final Sync sync; ... ...// }
public class CyclicBarrier { /** * Each use of the barrier is represented as a generation instance. * The generation changes whenever the barrier is tripped, or * is reset. There can be many generations associated with threads * using the barrier - due to the non-deterministic way the lock * may be allocated to waiting threads - but only one of these * can be active at a time (the one to which {@code count} applies) * and all the rest are either broken or tripped. * There need not be an active generation if there has been a break * but no subsequent reset. */ private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count; /** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } /** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); } /** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } } ... ... // }
实战 - 展示各自的使用场景
/** *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行 */ public class UseCountDownLatch { static CountDownLatch latch = new CountDownLatch(6); /*初始化线程*/ private static class InitThread implements Runnable{ public void run() { System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work......"); latch.countDown(); for(int i =0;i<2;i++) { System.out.println("Thread_"+Thread.currentThread().getId() +" ........continue do its work"); } } } /*业务线程等待latch的计数器为0完成*/ private static class BusiThread implements Runnable{ public void run() { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } for(int i =0;i<3;i++) { System.out.println("BusiThread_"+Thread.currentThread().getId() +" do business-----"); } } } public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { public void run() { SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 1st......"); latch.countDown(); System.out.println("begin step 2nd......."); SleepTools.ms(1); System.out.println("Thread_"+Thread.currentThread().getId() +" ready init work step 2nd......"); latch.countDown(); } }).start(); new Thread(new BusiThread()).start(); for(int i=0;i<=3;i++){ Thread thread = new Thread(new InitThread()); thread.start(); } latch.await(); System.out.println("Main do ites work........"); } }
/** *类说明:共4个子线程,他们全部完成工作后,交出自己结果, *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串 */ class UseCyclicBarrier { private static CyclicBarrier barrier = new CyclicBarrier(4,new CollectThread()); //存放子线程工作结果的容器 private static ConcurrentHashMap<String,Long> resultMap = new ConcurrentHashMap<String,Long>(); public static void main(String[] args) { for(int i=0;i<4;i++){ Thread thread = new Thread(new SubThread()); thread.start(); } } /*汇总的任务*/ private static class CollectThread implements Runnable{ @Override public void run() { StringBuilder result = new StringBuilder(); for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ result.append("["+workResult.getValue()+"]"); } System.out.println(" the result = "+ result); System.out.println("do other business........"); } } /*相互等待的子线程*/ private static class SubThread implements Runnable{ @Override public void run() { long id = Thread.currentThread().getId(); resultMap.put(Thread.currentThread().getId()+"",id); try { Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do something "); barrier.await(); Thread.sleep(1000+id); System.out.println("Thread_"+id+" ....do its business "); barrier.await(); } catch (Exception e) { e.printStackTrace(); } } } }
两者总结
1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;
2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;
3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;
4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;
5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;
6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;
到此这篇关于详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别的文章就介绍到这了,更多相关java CountDownLatch和CyclicBarrier区别内容请搜索小牛知识库以前的文章或继续浏览下面的相关文章希望大家以后多多支持小牛知识库!
本文向大家介绍xgboost和lightgbm的区别和适用场景?相关面试题,主要包含被问及xgboost和lightgbm的区别和适用场景?时的应答技巧和注意事项,需要的朋友参考一下 参考回答: (1)xgboost采用的是level-wise的分裂策略,而lightGBM采用了leaf-wise的策略,区别是xgboost对每一层所有节点做无差别分裂,可能有些节点的增益非常小,对结果影响不大,但
我该如何使用和部署 Akka? Akka 可以有几种使用方式: 作为一个库: 以普通jar包的形式放在classpath上,或放到web应用中的 WEB-INF/lib位置 作为一个独立的应用程序,使用微内核(Scala) / 微内核(Java) ,自己使用一个main类来初始化Actor系统 将Akka作为一个库 当编写web应用的时候,你很可能要使用这种方式。通过添加更多的模块,可以有多种使用
本文向大家介绍详解Angular 中 ngOnInit 和 constructor 使用场景,包括了详解Angular 中 ngOnInit 和 constructor 使用场景的使用技巧和注意事项,需要的朋友参考一下 1. constructor constructor应该是ES6中明确使用constructor来表示构造函数的,构造函数使用在class中,用来做初始化操作。当包含constru
本文向大家介绍TCP和UDP的区别和各自适用的场景相关面试题,主要包含被问及TCP和UDP的区别和各自适用的场景时的应答技巧和注意事项,需要的朋友参考一下 参考回答: 1)TCP和UDP区别 1) 连接 TCP是面向连接的传输层协议,即传输数据之前必须先建立好连接。 UDP无连接。 2) 服务对象 TCP是点对点的两点间服务,即一条TCP连接只能有两个端点; UDP支持一对一,一对多,多对一,多对
本文向大家介绍python中dict和list的区别,dict的内部实现?相关面试题,主要包含被问及python中dict和list的区别,dict的内部实现?时的应答技巧和注意事项,需要的朋友参考一下 参考回答: dict查找速度快,占用的内存较大,list查找速度慢,占用内存较小,dict不能用来存储有序集合。Dict用{}表示,list用[]表示。 dict是通过hash表实现的,dict为
本文向大家介绍Yii2中的场景(scenario)和验证规则(rule)详解,包括了Yii2中的场景(scenario)和验证规则(rule)详解的使用技巧和注意事项,需要的朋友参考一下 前言 场景,顾名思义,就是一个情景,一种场面。在yii2中也有场景,这个场景跟你所理解的场景含义差不多。 和用户有交互的系统必不可少的功能包括收集用户数据、校验和处理。实际业务中,往往还需要将数据进行持久化存储。