第5章 多线程 - 同步工具类

优质
小牛编辑
136浏览
2023-12-01

这里主要介绍了java5中线程锁技术以外的其他同步工具,首先介绍Semaphore:一个计数信号量。用于控制同时访问资源的线程个数,CyclicBarrier同步辅助类:从字面意思看是路障,这里用于线程之间的相互等待,到达某点后,继续向下执行。CountDownLatch同步辅助类:在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。犹如倒计时计数器,然后是Exchanger:实现两个对象之间数据交换,可阻塞队列:ArrayBlockingQueue,通过阻塞队列间的通信来演示其作用,最后介绍了几个同步集合。

1. Semaphore实现信号灯

Semaphore可以维护当前访问自身的线程个数,并提供了同步机制,使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。Semaphore 只对可用许可的号码进行计数,并采取相应的行动。

Semaphore实现的功能就像:银行办理业务,一共有5个窗口,但一共有10个客户,一次性最多有5个客户可以进行办理,其他的人必须等候,当5个客户中的任何一个离开后,在等待的客户中有一个人可以进行业务办理。

Semaphore提供了两种规则:

  • 一种是公平的:获得资源的先后,按照排队的先后。在构造函数中设置true实现
  • 一种是野蛮的:谁有本事抢到资源,谁就可以获得资源的使用权。

与传统的互斥锁的异同:

单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁“,再由另外一个线程释放”锁“,这可以应用于死锁恢复的一些场合。

应用场景:共享资源的争夺,例如游戏中选手进入房间的情况。

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. import java.util.concurrent.Semaphore;
  4. public class SemaphoreTest {
  5. public static void main(String[] args) {
  6.   //创建一个可根据需要创建新线程的线程池
  7.   ExecutorService service = Executors.newCachedThreadPool();
  8. final Semaphore sp = new Semaphore(3);
  9.   //创建10个线程
  10.   for(int i=0;i<10;i++){
  11. Runnable runnable = new Runnable(){
  12. public void run(){
  13. try {
  14. sp.acquire(); //获取灯,即许可权
  15. } catch (InterruptedException e1) {
  16. e1.printStackTrace();
  17. }
  18. System.out.println("线程" + Thread.currentThread().getName() +
  19. "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
  20. try {
  21. Thread.sleep((long)(Math.random()*10000));
  22. } catch (InterruptedException e) {
  23. e.printStackTrace();
  24. }
  25. System.out.println("线程" + Thread.currentThread().getName() +
  26. "即将离开");
  27. sp.release(); // 释放一个许可,将其返回给信号量
  28. //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
  29. System.out.println("线程" + Thread.currentThread().getName() +
  30. "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");
  31. }
  32. };
  33. service.execute(runnable);
  34. }
  35. }
  36. }

输出结果

  1. 线程pool-1-thread-3进入,当前已有3个并发
  2. 线程pool-1-thread-2进入,当前已有3个并发
  3. 线程pool-1-thread-1进入,当前已有3个并发
  4. 线程pool-1-thread-2即将离开
  5. 线程pool-1-thread-2已离开,当前已有2个并发
  6. 线程pool-1-thread-5进入,当前已有3个并发
  7. 线程pool-1-thread-1即将离开
  8. 线程pool-1-thread-1已离开,当前已有2个并发
  9. 线程pool-1-thread-4进入,当前已有3个并发
  10. 线程pool-1-thread-4即将离开
  11. 线程pool-1-thread-4已离开,当前已有2个并发
  12. 线程pool-1-thread-8进入,当前已有3个并发
  13. 线程pool-1-thread-3即将离开
  14. 线程pool-1-thread-7进入,当前已有3个并发
  15. 线程pool-1-thread-3已离开,当前已有3个并发
  16. 线程pool-1-thread-8即将离开
  17. 线程pool-1-thread-8已离开,当前已有2个并发
  18. 线程pool-1-thread-9进入,当前已有3个并发
  19. 线程pool-1-thread-7即将离开
  20. 线程pool-1-thread-7已离开,当前已有2个并发
  21. 线程pool-1-thread-6进入,当前已有3个并发
  22. 线程pool-1-thread-9即将离开
  23. 线程pool-1-thread-9已离开,当前已有2个并发
  24. 线程pool-1-thread-10进入,当前已有3个并发
  25. 线程pool-1-thread-5即将离开
  26. 线程pool-1-thread-5已离开,当前已有2个并发
  27. 线程pool-1-thread-6即将离开
  28. 线程pool-1-thread-6已离开,当前已有1个并发
  29. 线程pool-1-thread-10即将离开
  30. 线程pool-1-thread-10已离开,当前已有0个并发

控制一个方法的并发量,比如同时只能有3个线程进来

  1. public class ThreadPoolTest {
  2. //信号量
  3. private static Semaphore semaphore = new Semaphore(3);//允许个数,相当于放了3把锁
  4. public static void main(String[] args) {
  5. for(int i=0;i<10;i++){
  6. new Thread(new Runnable() {
  7. @Override
  8. public void run() {
  9. try {
  10. method();
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }).start();
  16. }
  17. }
  18. //同时最多只允许3个线程过来
  19. public static void method() throws InterruptedException{
  20. semaphore.acquire();//获取一把锁
  21. System.out.println("ThreadName="+Thread.currentThread().getName()+"过来了");
  22. try {
  23. Thread.sleep(1000);
  24. } catch (InterruptedException e) {
  25. e.printStackTrace();
  26. }
  27. System.out.println("ThreadName="+Thread.currentThread().getName()+"出去了");
  28. semaphore.release();//释放一把锁
  29. }
  30. }

输出结果

  1. ThreadName=Thread-1过来了
  2. ThreadName=Thread-4过来了
  3. ThreadName=Thread-0过来了
  4. ThreadName=Thread-1出去了
  5. ThreadName=Thread-4出去了
  6. ThreadName=Thread-2过来了
  7. ThreadName=Thread-3过来了
  8. ThreadName=Thread-0出去了
  9. ThreadName=Thread-5过来了
  10. ThreadName=Thread-3出去了
  11. ThreadName=Thread-2出去了
  12. ThreadName=Thread-6过来了
  13. ThreadName=Thread-7过来了
  14. ThreadName=Thread-5出去了
  15. ThreadName=Thread-9过来了
  16. ThreadName=Thread-7出去了
  17. ThreadName=Thread-6出去了
  18. ThreadName=Thread-8过来了
  19. ThreadName=Thread-9出去了
  20. ThreadName=Thread-8出去了

三个线程a、b、c 并发运行,b,c 需要a 线程的数据怎么实现

根据问题的描述,我将问题用以下代码演示,ThreadA、ThreadB、ThreadC,ThreadA 用于初始化数据num,
只有当num 初始化完成之后再让ThreadB 和ThreadC 获取到初始化后的变量num。

分析过程如下:

考虑到多线程的不确定性,因此我们不能确保ThreadA 就一定先于ThreadB 和ThreadC 前执行,就算ThreadA先执行了,我们也无法保证ThreadA 什么时候才能将变量num 给初始化完成。因此我们必须让ThreadB 和ThreadC去等待ThreadA 完成任何后发出的消息。

现在需要解决两个难题,一是让ThreadB 和ThreadC 等待ThreadA 先执行完,二是ThreadA 执行完之后给
ThreadB 和ThreadC 发送消息。

解决上面的难题我能想到的两种方案,一是使用纯Java API 的Semaphore 类来控制线程的等待和释放,二是使用Android 提供的Handler 消息机制

  1. public class ThreadCommunication {
  2. private static int num;//定义一个变量作为数据
  3. public static void main(String[] args) {
  4. Thread threadA = new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. try {
  8. //模拟耗时操作之后初始化变量num
  9. Thread.sleep(1000);
  10. num = 1;
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. });
  16. Thread threadB = new Thread(new Runnable() {
  17. @Override
  18. public void run() {
  19. System.out.println(Thread.currentThread().getName()
  20. +"获取到num 的值为:"+num);
  21. }
  22. });
  23. Thread threadC = new Thread(new Runnable() {
  24. @Override
  25. public void run() {
  26. System.out.println(Thread.currentThread().getName()
  27. +"获取到num 的值为:"+num);
  28. }
  29. });
  30. //同时开启3 个线程
  31. threadA.start();
  32. threadB.start();
  33. threadC.start();
  34. }
  35. }
  1. public class ThreadCommunication {
  2. private static int num;
  3. /**
  4. * 定义一个信号量,该类内部维持了多个线程锁,可以阻塞多个线程,释放多个线程,
  5. * 线程的阻塞和释放是通过permit 概念来实现的线程通过semaphore.acquire()方法获取permit,
  6. * 如果当前semaphore 有permit 则分配给该线程,如果没有则阻塞该线程直到semaphore
  7. * 调用release()方法释放permit。构造函数中参数:permit(允许) 个数
  8. */
  9. private static Semaphore semaphore = new Semaphore(0);
  10. public static void main(String[] args) {
  11. Thread threadA = new Thread(new Runnable() {
  12. @Override
  13. public void run() {
  14. try {
  15. //模拟耗时操作之后初始化变量num
  16. Thread.sleep(1000);
  17. num = 1;
  18. //初始化完参数后释放两个permit
  19. semaphore.release(2);
  20. } catch (InterruptedException e) {
  21. e.printStackTrace();
  22. }
  23. }
  24. });
  25. Thread threadB = new Thread(new Runnable() {
  26. @Override
  27. public void run() {
  28. try {
  29. //获取permit,如果semaphore 没有可用的permit 则等待
  30. // 如果有则消耗一个
  31. semaphore.acquire();
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. System.out.println(Thread.currentThread().getName()
  36. +"获取到num 的值为:"+num);
  37. }
  38. });
  39. Thread threadC = new Thread(new Runnable() {
  40. @Override
  41. public void run() {
  42. try {
  43. //获取permit,如果semaphore 没有可用的permit 则等待
  44. // 如果有则消耗一个
  45. semaphore.acquire();
  46. } catch (InterruptedException e) {
  47. e.printStackTrace();
  48. }
  49. System.out.println(Thread.currentThread().getName()
  50. +"获取到num 的值为:"+num);
  51. }
  52. });
  53. //同时开启3 个线程
  54. threadA.start();
  55. threadB.start();
  56. threadC.start();
  57. }
  58. }

2. CyclicBarrier

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

3个线程到达某个集合点后再向下执行,使用await方法实现

  1. import java.util.concurrent.CyclicBarrier;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class CyclicBarrierTest {
  5. public static void main(String[] args) {
  6. ExecutorService service = Executors.newCachedThreadPool();
  7. final CyclicBarrier cb = new CyclicBarrier(3);
  8. for(int i=0;i<3;i++){
  9. Runnable runnable = new Runnable(){
  10. public void run(){
  11. try {
  12. Thread.sleep((long)(Math.random()*10000));
  13. System.out.println("线程" + Thread.currentThread().getName() +
  14. "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  15. cb.await();//在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
  16. Thread.sleep((long)(Math.random()*10000));
  17. System.out.println("线程" + Thread.currentThread().getName() +
  18. "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  19. cb.await();
  20. Thread.sleep((long)(Math.random()*10000));
  21. System.out.println("线程" + Thread.currentThread().getName() +
  22. "即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
  23. cb.await();
  24. } catch (Exception e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. };
  29. service.execute(runnable);
  30. }
  31. service.shutdown();
  32. }
  33. }

输出结果

  1. 线程pool-1-thread-3即将到达集合地点1,当前已有1个已经到达,正在等候
  2. 线程pool-1-thread-1即将到达集合地点1,当前已有2个已经到达,正在等候
  3. 线程pool-1-thread-2即将到达集合地点1,当前已有3个已经到达,都到齐了,继续走啊
  4. 线程pool-1-thread-1即将到达集合地点2,当前已有1个已经到达,正在等候
  5. 线程pool-1-thread-2即将到达集合地点2,当前已有2个已经到达,正在等候
  6. 线程pool-1-thread-3即将到达集合地点2,当前已有3个已经到达,都到齐了,继续走啊
  7. 线程pool-1-thread-3即将到达集合地点3,当前已有1个已经到达,正在等候
  8. 线程pool-1-thread-1即将到达集合地点3,当前已有2个已经到达,正在等候
  9. 线程pool-1-thread-2即将到达集合地点3,当前已有3个已经到达,都到齐了,继续走啊

3. CountDownLatch

一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,则所有等待者或单个等待者开始执行。

可以实现一个人(也可以是多个人)等待其他所有人都来通知他,也可以实现一个人通知多个人的效果,类似裁判一声口令,运动员开始奔跑(一对多),或者所有运送员都跑到终点后裁判才可以公布结果(多对一)。

用指定的计数 初始化 CountDownLatch。在调用 countDown() 方法之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。

实现运动员比赛的效果

  1. import java.util.concurrent.CountDownLatch;
  2. import java.util.concurrent.CyclicBarrier;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. public class CountdownLatchTest {
  6. public static void main(String[] args) {
  7.   ExecutorService service = Executors.newCachedThreadPool();
  8.   
  9.   //构造一个用给定计数初始化的 CountDownLatch,相当于裁判的口哨
  10.   final CountDownLatch cdOrder = new CountDownLatch(1);
  11.   
  12.    //相当于定义3个运行员
  13. final CountDownLatch cdAnswer = new CountDownLatch(3);
  14. for (int i = 0; i < 3; i++) {
  15. Runnable runnable = new Runnable() {
  16. public void run() {
  17. try {
  18. System.out.println("线程"
  19. + Thread.currentThread().getName() + "正准备接受命令");
  20. // 等待发令枪
  21. cdOrder.await();//使当前线程在锁存器倒计数至零之前一直等待
  22. System.out.println("线程"
  23. + Thread.currentThread().getName() + "已接受命令");
  24. Thread.sleep((long) (Math.random() * 10000));
  25. System.out
  26. .println("线程"
  27. + Thread.currentThread().getName()
  28. + "回应命令处理结果");
  29. // 各个运动员完报告成绩之后,通知裁判
  30. cdAnswer.countDown();//递减锁存器的计数,如果计数到达零,则释放所有等待的线程
  31. } catch (Exception e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. };
  36. service.execute(runnable);
  37. }
  38. try {
  39. Thread.sleep((long) (Math.random() * 10000));
  40. System.out.println("线程" + Thread.currentThread().getName()
  41. + "即将发布命令");
  42. // 发令枪打响,比赛开始
  43. cdOrder.countDown();
  44. System.out.println("线程" + Thread.currentThread().getName()
  45. + "已发送命令,正在等待结果");
  46. // 裁判等待各个运动员的结果
  47. cdAnswer.await();
  48. // 裁判公布获得所有运动员的成绩
  49. System.out.println("线程" + Thread.currentThread().getName()
  50. + "已收到所有响应结果");
  51. } catch (Exception e) {
  52. e.printStackTrace();
  53. }
  54. service.shutdown();
  55. }
  56. }

输出结果

  1. 线程pool-1-thread-2正准备接受命令
  2. 线程pool-1-thread-3正准备接受命令
  3. 线程pool-1-thread-1正准备接受命令
  4. 线程main即将发布命令
  5. 线程main已发送命令,正在等待结果
  6. 线程pool-1-thread-1已接受命令
  7. 线程pool-1-thread-2已接受命令
  8. 线程pool-1-thread-3已接受命令
  9. 线程pool-1-thread-1回应命令处理结果
  10. 线程pool-1-thread-3回应命令处理结果
  11. 线程pool-1-thread-2回应命令处理结果
  12. 线程main已收到所有响应结果

4. Exchanger

用于实现两个对象之间的数据交换,每个对象在完成一定的事务后想与对方交换数据,第一个先拿出数据的对象将一直等待第二个对象拿着数据到来时,彼此才能交换数据。

方法:exchange(V x)

等待另一个线程到达此交换点(除非当前线程被中断),然后将给定的对象传送给该线程,并接收该线程的对象。

应用:使用 Exchanger 在线程间交换缓冲区

示例:模拟毒品交易情景

  1. import java.util.concurrent.Exchanger;
  2. import java.util.concurrent.ExecutorService;
  3. import java.util.concurrent.Executors;
  4. public class ExchangerTest {
  5. public static void main(String[] args) {
  6. ExecutorService service = Executors.newCachedThreadPool();
  7. final Exchanger exchanger = new Exchanger();
  8. service.execute(new Runnable(){
  9. public void run() {
  10. try {
  11. String data1 = "毒品";
  12. System.out.println("线程" + Thread.currentThread().getName() +
  13. "正在把: " + data1 +" 交易出去");
  14. Thread.sleep((long)(Math.random()*10000));
  15. String data2 = (String)exchanger.exchange(data1);
  16. System.out.println("线程" + Thread.currentThread().getName() +
  17. "换得了: " + data2);
  18. }catch(Exception e){
  19. }
  20. }
  21. });
  22. service.execute(new Runnable(){
  23. public void run() {
  24. try {
  25. String data1 = "美金";
  26. System.out.println("线程" + Thread.currentThread().getName() +
  27. "正在把: " + data1 +" 交易出去");
  28. Thread.sleep((long)(Math.random()*10000));
  29. String data2 = (String)exchanger.exchange(data1);
  30. System.out.println("线程" + Thread.currentThread().getName() +
  31. "换得了: " + data2);
  32. }catch(Exception e){
  33. }
  34. }
  35. });
  36. }
  37. }

运行结果

  1. 线程pool-1-thread-1正在把: 毒品 交易出去
  2. 线程pool-1-thread-2正在把: 美金 交易出去
  3. 线程pool-1-thread-1换得了: 美金
  4. 线程pool-1-thread-2换得了: 毒品

5. ArrayBlockingQueue

一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列包含固定长度的队列和不固定长度的队列。

这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。

通俗的讲:当指定队列大小,如果已经放满,其他存入数据的线程就阻塞,等着该队列中有空位,才能放进去。当取的比较快,队列中没有数据,取数据的线程阻塞,等队列中放入了数据,才可以取。

ArrayBlockingQueue中只有put和take方法才具有阻塞功能。方法类型如下

功能抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()不可用不可用

示例:用3个空间的队列来演示向阻塞队列中存取数据的效果。

  1. package cn.xushuai.thread;
  2. import java.util.concurrent.ArrayBlockingQueue;
  3. import java.util.concurrent.BlockingQueue;
  4. public class BlockingQueueTest {
  5. public static void main(String[] args) {
  6. final BlockingQueue queue = new ArrayBlockingQueue(3);
  7. for(int i=0;i<2;i++){
  8. new Thread(){
  9. public void run(){
  10. while(true){
  11. try {
  12. Thread.sleep((long)(Math.random()*1000));
  13. System.out.println(Thread.currentThread().getName() + "准备放数据!");
  14. queue.put(1); //放进去后,可能立即执行“准备取数据”
  15. System.out.println(Thread.currentThread().getName() + "已经放了数据," + "队列目前有" + queue.size() + "个数据");
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }
  20. }
  21. }.start();
  22. }
  23. new Thread(){
  24. public void run(){
  25. while(true){
  26. try {
  27. //将此处的睡眠时间分别改为100和1000,观察运行结果
  28. Thread.sleep(1000);
  29. System.out.println(Thread.currentThread().getName() + "准备取数据!");
  30. queue.take(); //取出后可能來不及执行下面的打印语句,就跑到了“准备放数据”,
  31. System.out.println(Thread.currentThread().getName() + "已经取走数据," + "队列目前有" + queue.size() + "个数据");
  32. } catch (InterruptedException e) {
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. }.start();
  38. }
  39. }

输出结果

  1. Thread-0准备放数据!
  2. Thread-0已经放了数据,队列目前有1个数据
  3. Thread-0准备放数据!
  4. Thread-0已经放了数据,队列目前有2个数据
  5. Thread-1准备放数据!
  6. Thread-1已经放了数据,队列目前有3个数据
  7. Thread-2准备取数据!
  8. Thread-2已经取走数据,队列目前有2个数据
  9. Thread-0准备放数据!
  10. Thread-0已经放了数据,队列目前有3个数据
  11. Thread-0准备放数据!
  12. Thread-1准备放数据!
  13. Thread-2准备取数据!
  14. Thread-2已经取走数据,队列目前有2个数据
  15. Thread-0已经放了数据,队列目前有3个数据
  16. Thread-0准备放数据!
  17. Thread-2准备取数据!
  18. Thread-2已经取走数据,队列目前有2个数据
  19. Thread-1已经放了数据,队列目前有3个数据
  20. Thread-1准备放数据!
  21. Thread-2准备取数据!
  22. Thread-2已经取走数据,队列目前有2个数据
  23. Thread-0已经放了数据,队列目前有3个数据
  24. Thread-0准备放数据!
  25. Thread-2准备取数据!
  26. ...

6. 阻塞队列间的通信

A队列向空间中存数据,B从空间里取数据,A存入后,通知B去取,B取过之后,通知A去放,依次循环

示例:子线程先循环10次,接着主线程循环100次,接着又回到子线程,循环10次,再回到主线程又循环100,如此循环50次。

说明:这里通过使 用两个具有1个空间的队列来实现同步通知的功能(实现了锁和condition的功能),以便实现队列间的通信,其中使用到了构造代码块为主队列先存入一个数据,以使其先阻塞,子队列先执行。

使用构造代码块的原因:

成员变量在创建类的实例对象时,才分配空间,才能有值,所以创建一个构造方法来给main_quene赋值,这里不可以使用静态代码块,因为静态在还没创建对象就存在, 而sub_quene和main_quene是对象创建以后的成员变量,所以这里用匿名构造方法,它的运行时期在任何构造方法之前,创建几个对象就执行几次

  1. import java.util.concurrent.ArrayBlockingQueue;
  2. import java.util.concurrent.BlockingQueue;
  3. public class BlockingQueueCommunication {
  4. public static void main(String[] args){
  5. final Business business = new Business();
  6. new Thread(new Runnable(){
  7. @Override
  8. public void run() {
  9. for(int i=1;i<=50;i++){
  10. business.sub(i);
  11. }
  12. }
  13. }).start();
  14. //主线程外部循环
  15. for(int i=1;i<=50;i++){
  16. business.main(i);
  17. }
  18. }
  19. //业务类
  20. static class Business{
  21. BlockingQueue<Integer> sub_quene = new ArrayBlockingQueue<Integer>(1);
  22. BlockingQueue<Integer> main_quene = new ArrayBlockingQueue<Integer>(1);
  23. {
  24.    //为了让子队列先走,所以在一开始就往主队列中存入一个对象,使其阻塞。
  25. try {
  26. main_quene.put(1);
  27. } catch (InterruptedException e) {
  28. e.printStackTrace();
  29. }
  30. }
  31. //子队列先走
  32. public void sub(int i){
  33. try {
  34. sub_quene.put(1); //子队列第一次存入,可以执行,但由于只有1个空间,已经存满,所以只有在执行后要等到take之后才能继续下次执行
  35. } catch (InterruptedException e) {
  36. e.printStackTrace();
  37. }
  38. //子队列循环执行
  39. for(int j=1;j<=10;j++){
  40. System.out.println("sub thread sequence of"+i+",loop of "+j);
  41. }
  42. try {
  43. main_quene.take(); //让主队列从已经填满的队列中取出数据,使其开始第一次执行
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. public void main(int i){
  49. try {
  50. main_quene.put(1); //主队列先前放过1个空间,现在处于阻塞状态,等待子队列通知,即子线程中的main_quene.take();
  51. } catch (InterruptedException e) {
  52. e.printStackTrace();
  53. }
  54.   //主队列循环执行
  55. for(int j=1;j<=100;j++){
  56. System.out.println("main thread sequence of"+i+", loop of "+j);
  57. }
  58. try {
  59. sub_quene.take(); //让子队列从已经填满的队列中取出数据,使其执行
  60. } catch (InterruptedException e) {
  61. e.printStackTrace();
  62. }
  63. }
  64. }
  65. }

7. 同步集合类

Collections

  • Collections.synchronizedList()
  • Collections.synchronizedSet()
  • Collections.synchronizedMap()

同步Map集合

  • ConcurrentMap
  • ConcurrentHashMap
  • ConcurrentNavigableMap
  • ConcurrentSkipListMap

ConcurrentHashMap

使用锁分段技术

同步的HashMap,支持获取的完全并发和更新的所期望可调整并发的哈希表。此类遵守与 Hashtable 相同的功能规范,并且包括对应于 Hashtable 的每个方法的方法版本。

不过,尽管所有操作都是线程安全的,但获取操作不 必锁定,并且不 支持以某种防止所有访问的方式锁定整个表。此类可以通过程序完全与 Hashtable 进行互操作,这取决于其线程安全,而与其同步细节无关。

内部原理:

其实内部使用了代理模式,你给我一个HashMap,我就给你一个同步的HashMap。同步的HashMap在调用方法时,是去分配给原始的HashMap只是在去调用方法的同时加上了Synchronized,以此实现同步效果

ConcurrentHashMap是线程安全的HashMap的实现,默认构造同样有initialCapacity和loadFactor属性,不过还多了一个concurrencyLevel属性,三属性默认值分别为16、0.75及16。其内部使用锁分段技术,维持这锁Segment的数组,在Segment数组中又存放着Entity[]数组,内部hash算法将数据较均匀分布在不同锁中。

  • put(key , value)

并没有在此方法上加上synchronized,首先对key.hashcode进行hash操作,得到key的hash值。hash操作的算法和map也不同,根据此hash值计算并获取其对应的数组中的Segment对象(继承自ReentrantLock),接着调用此Segment对象的put方法来完成当前操作。

ConcurrentHashMap基于concurrencyLevel划分出了多个Segment来对key-value进行存储,从而避免每次put操作都得锁住整个数组。在默认的情况下,最佳情况下可允许16个线程并发无阻塞的操作集合对象,尽可能地减少并发时的阻塞现象。

  • get(key)

首先对key.hashCode进行hash操作,基于其值找到对应的Segment对象,调用其get方法完成当前操作。而Segment的get操作首先通过hash值和对象数组大小减1的值进行按位与操作来获取数组上对应位置的HashEntry。在这个步骤中,可能会因为对象数组大小的改变,以及数组上对应位置的HashEntry产生不一致性,那么ConcurrentHashMap是如何保证的?

对象数组大小的改变只有在put操作时有可能发生,由于HashEntry对象数组对应的变量是volatile类型的,因此可以保证如HashEntry对象数组大小发生改变,读操作可看到最新的对象数组大小。

在获取到了HashEntry对象后,怎么能保证它及其next属性构成的链表上的对象不会改变呢?这点ConcurrentHashMap采用了一个简单的方式,即HashEntry对象中的hash、key、next属性都是final的,这也就意味着没办法插入一个HashEntry对象到基于next属性构成的链表中间或末尾。这样就可以保证当获取到HashEntry对象后,其基于next属性构建的链表是不会发生变化的。

ConcurrentHashMap默认情况下采用将数据分为16个段进行存储,并且16个段分别持有各自不同的锁Segment,锁仅用于put和remove等改变集合对象的操作,基于volatile及HashEntry链表的不变性实现了读取的不加锁。这些方式使得ConcurrentHashMap能够保持极好的并发支持,尤其是对于读远比插入和删除频繁的Map而言,而它采用的这些方法也可谓是对于Java内存模型、并发机制深刻掌握的体现。

ConcurrentNavigableMap

java.util.concurrent.ConcurrentNavigableMap 是一个支持并发访问的 java.util.NavigableMap,它还能让它的子 map 具备并发访问的能力。所谓的 “子 map” 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。

NavigableMap 中的方法不再赘述,本小节我们来看一下 ConcurrentNavigableMap 添加的方法。

headMap()

headMap(T toKey) 方法返回一个包含了小于给定 toKey 的 key 的子 map。
如果你对原始 map 里的元素做了改动,这些改动将影响到子 map 中的元素(译者注:map 集合持有的其实只是对象的引用)。

以下示例演示了对 headMap() 方法的使用:

  1. ConcurrentNavigableMap map = new ConcurrentSkipListMap();
  2. map.put("1", "one");
  3. map.put("2", "two");
  4. map.put("3", "three");
  5. ConcurrentNavigableMap headMap = map.headMap("2");

headMap 将指向一个只含有键 “1” 的 ConcurrentNavigableMap,因为只有这一个键小于 “2”。关于这个方法及其重载版本具体是怎么工作的细节请参考 Java 文档。

tailMap()

tailMap(T fromKey) 方法返回一个包含了不小于给定 fromKey 的 key 的子 map。
如果你对原始 map 里的元素做了改动,这些改动将影响到子 map 中的元素(译者注:map 集合持有的其实只是对象的引用)。

以下示例演示了对 tailMap() 方法的使用:

  1. ConcurrentNavigableMap map = new ConcurrentSkipListMap();
  2. map.put("1", "one");
  3. map.put("2", "two");
  4. map.put("3", "three");
  5. ConcurrentNavigableMap tailMap = map.tailMap("2");

tailMap 将拥有键 “2” 和 “3”,因为它们不小于给定键 “2”。关于这个方法及其重载版本具体是怎么工作的细节请参考 Java 文档。

subMap()

subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。示例如下:

  1. ConcurrentNavigableMap map = new ConcurrentSkipListMap();
  2. map.put("1", "one");
  3. map.put("2", "two");
  4. map.put("3", "three");
  5. ConcurrentNavigableMap subMap = map.subMap("2", "3");

返回的 submap 只包含键 “2”,因为只有它满足不小于 “2”,比 “3” 小。

更多方法

ConcurrentNavigableMap 接口还有其他一些方法可供使用,比如:

  • descendingKeySet()
  • descendingMap()
  • navigableKeySet()

关于这些方法更多信息参考官方 Java 文档。

同步List集合

  • ConcurrentSkipListSet
  • CopyOnWriteArraySet
  • CopyOnWriteArrayList

ConcurrentSkipListSet

一个基于 ConcurrentSkipListMap 的可缩放并发 NavigableSet 实现。类似于TreeSet,set 的元素可以根据它们的自然顺序进行排序,也可以根据创建 set 时所提供的Comparator 进行排序,具体取决于使用的构造方法。

CopyOnWriteArrayList

ArrayList 的一个线程安全的变体,可解决线程安全问题,在遍历的时候,同时进行添加操作。其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。

CopyOnWriteArrayList是一个线程安全、并且在读操作时无锁的ArrayList,其具体实现方法如下。

  • CopyOnWriteArrayList()

和ArrayList不同,此步的做法为创建一个大小为0的数组。

  • add(E)

add方法并没有加上synchronized关键字,它通过使用ReentrantLock来保证线程安全。此处和ArrayList的不同是每次都会创建一个新的Object数组,此数组的大小为当前数组大小加1,将之前数组中的内容复制到新的数组中,并将新增加的对象放入数组末尾,最后做引用切换将新创建的数组对象赋值给全局的数组对象。

  • remove(E)

和add方法一样,此方法也通过ReentrantLock来保证其线程安全,但它和ArrayList删除元素采用的方式并不一样。

首先创建一个比当前数组小1的数组,遍历新数组,如找到equals或均为null的元素,则将之后的元素全部赋值给新的数组对象,并做引用切换,返回true;如未找到,则将当前的元素赋值给新的数组对象,最后特殊处理数组中的最后一个元素,如最后一个元素等于要删除的元素,即将当前数组对象赋值为新创建的数组对象,完成删除操作,如最后一个元素也不等于要删除的元素,那么返回false。

此方法和ArrayList除了锁不同外,最大的不同在于其复制过程并没有调用System的arrayCopy来完成,理论上来说会导致性能有一定下降。

  • get(int)

此方法非常简单,直接获取当前数组对应位置的元素,这种方法是没有加锁保护的,因此可能会出现读到脏数据的现象。但相对而言,性能会非常高,对于写少读多且脏数据影响不大的场景而言是不错的选择。

  • iterator()

调用iterator方法后创建一个新的COWIterator对象实例,并保存了一个当前数组的快照,在调用next遍历时则仅对此快照数组进行遍历,因此遍历此list时不会抛出ConcurrentModificatiedException。

与ArrayList的性能对比,在读多写少的并发场景中,较之ArrayList是更好的选择,单线程以及多线程下增加元素及删除元素的性能不比ArrayList好

CopyOnWriteArraySet

对其所有操作使用内部 CopyOnWriteArrayList 的 Set。因此,它共享以下相同的基本属性:

  • 它最适合于 set 大小通常保持很小、只读操作远多于可变操作以及需要在遍历期间防止线程间冲突的应用程序。
  • 它是线程安全的。
  • 因为通常需要复制整个基础数组,所以可变操作(添加、设置、移除,等等)的开销巨大。
  • 迭代器不支持可变移除操作。
  • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

CopyOnWriteArraySet基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法。保证了无重复元素,但在add时每次都要进行数组的遍历,因此性能会略低于上个。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链接节点的、无界的、线程安全的队列。此队列按照 FIFO(先进先出)原则对元素进行排序,队列的头部 是队列中时间最长的元素。队列的尾部 是队列中时间最短的元素。新的元素插入到队列的尾部,队列检索操作从队列头部获得元素。当许多线程共享访问一个公共 collection 时,ConcurrentLinkedQueue 是一个恰当的选择,此队列不允许 null 元素。

ConcurrentLinkedDeque

一个基于链接节点的、无界的、线程安全的双端队列