当前位置: 首页 > 编程笔记 >

Java并发之条件阻塞Condition的应用代码示例

连乐
2023-03-14
本文向大家介绍Java并发之条件阻塞Condition的应用代码示例,包括了Java并发之条件阻塞Condition的应用代码示例的使用技巧和注意事项,需要的朋友参考一下

本文研究的主要是Java并发之条件阻塞Condition的应用示例代码,具体如下。

Condition将Object监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用。

1. Condition的基本使用

  由于Condition可以用来替代wait、notify等方法,所以可以对比着之前写过的线程间通信的代码来看,再来看一下原来那个问题:

有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次……如此往返执行50次。

  之前用wait和notify来实现的,现在用Condition来改写一下,代码如下:

public class ConditionCommunication {
	public static void main(String[] args) {
		Business bussiness = new Business();
		new Thread(new Runnable() {
			// 开启一个子线程
			@Override
			          public void run() {
				for (int i = 1; i <= 50; i++) {
					bussiness.sub(i);
				}
			}
		}
		).start();
		// main方法主线程
		for (int i = 1; i <= 50; i++) {
			bussiness.main(i);
		}
	}
}
class Business {
	Lock lock = new ReentrantLock();
	Condition condition = lock.newCondition();
	//Condition是在具体的lock之上的
	private Boolean bShouldSub = true;
	public void sub(int i) {
		lock.lock();
		try {
			while (!bShouldSub) {
				try {
					condition.await();
					//用condition来调用await方法
				}
				catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			for (int j = 1; j <= 10; j++) {
				System.out.println("sub thread sequence of " + j
				            + ", loop of " + i);
			}
			bShouldSub = false;
			condition.signal();
			//用condition来发出唤醒信号,唤醒某一个
		}
		finally {
			lock.unlock();
		}
	}
	public void main(int i) {
		lock.lock();
		try {
			while (bShouldSub) {
				try {
					condition.await();
					//用condition来调用await方法
				}
				catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			for (int j = 1; j <= 10; j++) {
				System.out.println("main thread sequence of " + j
				            + ", loop of " + i);
			}
			bShouldSub = true;
			condition.signal();
			//用condition来发出唤醒信号么,唤醒某一个
		}
		finally {
			lock.unlock();
		}
	}
}

从代码来看,Condition的使用时和Lock一起的,没有Lock就没法使用Condition,因为Condition是通过Lock来new出来的,这种用法很简单,只要掌握了synchronized和wait、notify的使用,完全可以掌握Lock和Condition的使用。

2. Condition的拔高

2.1 缓冲区的阻塞队列

  上面使用Lock和Condition来代替synchronized和Object监视器方法实现了两个线程之间的通信,现在再来写个稍微高级点应用:模拟缓冲区的阻塞队列。
什么叫缓冲区呢?举个例子,现在有很多人要发消息,我是中转站,我要帮别人把消息发出去,那么现在我  就需要做两件事,一件事是接收用户发过来的消息,并按顺序放到缓冲区,另一件事是从缓冲区中按顺序取出用户发过来的消息,并发送出去。

  现在把这个实际的问题抽象一下:缓冲区即一个数组,我们可以向数组中写入数据,也可以从数组中把数据取走,我要做的两件事就是开启两个线程,一个存数据,一个取数据。但是问题来了,如果缓冲区满了,说明接收的消息太多了,即发送过来的消息太快了,我另一个线程还来不及发完,导致现在缓冲区没地方放了,那么此时就得阻塞存数据这个线程,让其等待;相反,如果我转发的太快,现在缓冲区所有内容都被我发完了,还没有用户发新的消息来,那么此时就得阻塞取数据这个线程。

  好了,分析完了这个缓冲区的阻塞队列,下面就用Condition技术来实现一下:

class Buffer {
	final Lock lock = new ReentrantLock();
	//定义一个锁
	final Condition notFull = lock.newCondition();
	//定义阻塞队列满了的Condition
	final Condition notEmpty = lock.newCondition();
	//定义阻塞队列空了的Condition
	final Object[] items = new Object[10];
	//为了下面模拟,设置阻塞队列的大小为10,不要设太大
	int putptr, takeptr, count;
	//数组下标,用来标定位置的
	//往队列中存数据
	public void put(Object x) throws InterruptedException {
		lock.lock();
		//上锁
		try {
			while (count == items.length) {
				System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法存数据!");
				notFull.await();
				//如果队列满了,那么阻塞存数据这个线程,等待被唤醒
			}
			//如果没满,按顺序往数组中存
			items[putptr] = x;
			if (++putptr == items.length) //这是到达数组末端的判断,如果到了,再回到始端
			putptr = 0;
			++count;
			//消息数量
			System.out.println(Thread.currentThread().getName() + " 存好了值: " + x);
			notEmpty.signal();
			//好了,现在队列中有数据了,唤醒队列空的那个线程,可以取数据啦
		}
		finally {
			lock.unlock();
			//放锁
		}
	}
	//从队列中取数据
	public Object take() throws InterruptedException {
		lock.lock();
		//上锁
		try {
			while (count == 0) {
				System.out.println(Thread.currentThread().getName() + " 被阻塞了,暂时无法取数据!");
				notEmpty.await();
				//如果队列是空,那么阻塞取数据这个线程,等待被唤醒
			}
			//如果没空,按顺序从数组中取
			Object x = items[takeptr];
			if (++takeptr == items.length) //判断是否到达末端,如果到了,再回到始端
			takeptr = 0;
			--count;
			//消息数量
			System.out.println(Thread.currentThread().getName() + " 取出了值: " + x);
			notFull.signal();
			//好了,现在队列中有位置了,唤醒队列满的那个线程,可以存数据啦
			return x;
		}
		finally {
			lock.unlock();
			//放锁
		}
	}
}

这个程序很经典,我从官方JDK文档中拿出来的,然后加了注释。程序中定义了两个Condition,分别针对两个线程,等待和唤醒分别用不同的Condition来执行,思路很清晰,程序也很健壮。可以考虑一个问题,为啥要用两个Codition呢?之所以这么设计肯定是有原因的,如果用一个Condition,现在假设队列满了,但是有2个线程A和B同时存数据,那么都进入了睡眠,好,现在另一个线程取走一个了,然后唤醒了其中一个线程A,那么A可以存了,存完后,A又唤醒一个线程,如果B被唤醒了,那就出问题了,因为此时队列是满的,B不能存的,B存的话就会覆盖原来还没被取走的值,就因为使用了一个Condition,存和取都用这个Condition来睡眠和唤醒,就乱了套。到这里,就能体会到这个Condition的用武之地了,现在来测试一下上面的阻塞队列的效果:

public class BoundedBuffer {
	public static void main(String[] args) {
		Buffer buffer = new Buffer();
		for (int i = 0; i < 5; i ++) {
			//开启5个线程往缓冲区存数据
			new Thread(new Runnable() {
				@Override
				        public void run() {
					try {
						buffer.put(new Random().nextint(1000));
						//随机存数据
					}
					catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
			).start();
		}
		for (int i = 0; i < 10; i ++) {
			//开启10个线程从缓冲区中取数据
			new Thread(new Runnable() {
				@Override
				        public void run() {
					try {
						buffer.take();
						//从缓冲区取数据
					}
					catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
			).start();
		}
	}
}

我故意只开启5个线程存数据,10个线程取数据,就是想让它出现取数据被阻塞的情况发生,看运行的结果:

Thread-5 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-1 存好了值: 755
Thread-0 存好了值: 206
Thread-2 存好了值: 741
Thread-3 存好了值: 381
Thread-14 取出了值: 755
Thread-4 存好了值: 783
Thread-6 取出了值: 206
Thread-7 取出了值: 741
Thread-8 取出了值: 381
Thread-9 取出了值: 783
Thread-5 被阻塞了,暂时无法取数据!
Thread-11 被阻塞了,暂时无法取数据!
Thread-12 被阻塞了,暂时无法取数据!
Thread-10 被阻塞了,暂时无法取数据!
Thread-13 被阻塞了,暂时无法取数据!

  从结果中可以看出,线程5和10抢先执行,发现队列中没有,于是就被阻塞了,睡在那了,直到队列中有新的值存入才可以取,但是它们两运气不好,存的数据又被其他线程给抢先取走了,哈哈……可以多运行几次。如果想要看到存数据被阻塞,可以将取数据的线程设置少一点,这里我就不设了。

2.2 两个以上线程之间的唤醒

  还是原来那个题目,现在让三个线程来执行,看一下题目:

有三个线程,子线程1先执行10次,然后子线程2执行10次,然后主线程执行5次,然后再切换到子线程1执行10次,子线程2执行10次,主线程执行5次……如此往返执行50次。

  如过不用Condition,还真不好弄,但是用Condition来做的话,就非常方便了,原理很简单,定义三个Condition,子线程1执行完唤醒子线程2,子线程2执行完唤醒主线程,主线程执行完唤醒子线程1。唤醒机制和上面那个缓冲区道理差不多,下面看看代码吧,很容易理解。

public class ThreeConditionCommunication {
	public static void main(String[] args) {
		Business bussiness = new Business();
		new Thread(new Runnable() {
			// 开启一个子线程
			@Override
			          public void run() {
				for (int i = 1; i <= 50; i++) {
					bussiness.sub1(i);
				}
			}
		}
		).start();
		new Thread(new Runnable() {
			// 开启另一个子线程
			@Override
			      public void run() {
				for (int i = 1; i <= 50; i++) {
					bussiness.sub2(i);
				}
			}
		}
		).start();
		// main方法主线程
		for (int i = 1; i <= 50; i++) {
			bussiness.main(i);
		}
	}
	static class Business {
		Lock lock = new ReentrantLock();
		Condition condition1 = lock.newCondition();
		//Condition是在具体的lock之上的
		Condition condition2 = lock.newCondition();
		Condition conditionMain = lock.newCondition();
		private int bShouldSub = 0;
		public void sub1(int i) {
			lock.lock();
			try {
				while (bShouldSub != 0) {
					try {
						condition1.await();
						//用condition来调用await方法
					}
					catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				for (int j = 1; j <= 10; j++) {
					System.out.println("sub1 thread sequence of " + j
					              + ", loop of " + i);
				}
				bShouldSub = 1;
				condition2.signal();
				//让线程2执行
			}
			finally {
				lock.unlock();
			}
		}
		public void sub2(int i) {
			lock.lock();
			try {
				while (bShouldSub != 1) {
					try {
						condition2.await();
						//用condition来调用await方法
					}
					catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				for (int j = 1; j <= 10; j++) {
					System.out.println("sub2 thread sequence of " + j
					              + ", loop of " + i);
				}
				bShouldSub = 2;
				conditionMain.signal();
				//让主线程执行
			}
			finally {
				lock.unlock();
			}
		}
		public void main(int i) {
			lock.lock();
			try {
				while (bShouldSub != 2) {
					try {
						conditionMain.await();
						//用condition来调用await方法
					}
					catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				for (int j = 1; j <= 5; j++) {
					System.out.println("main thread sequence of " + j
					              + ", loop of " + i);
				}
				bShouldSub = 0;
				condition1.signal();
				//让线程1执行
			}
			finally {
				lock.unlock();
			}
		}
	}
}

代码看似有点长,但是是假象,逻辑非常简单。关于线程中的Condition技术就总结这么多吧。

总结

以上就是本文关于Java并发之条件阻塞Condition的应用代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!

 类似资料:
  • 9.7. 示例: 并发的非阻塞缓存 本节中我们会做一个无阻塞的缓存,这种工具可以帮助我们来解决现实世界中并发程序出现但没有现成的库可以解决的问题。这个问题叫作缓存(memoizing)函数(译注:Memoization的定义: memoization 一词是Donald Michie 根据拉丁语memorandum杜撰的一个词。相应的动词、过去分词、ing形式有memoiz、memoized、me

  • 本文向大家介绍Java并发编程之阻塞队列详解,包括了Java并发编程之阻塞队列详解的使用技巧和注意事项,需要的朋友参考一下 1、什么是阻塞队列?   队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素。阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略。使用阻塞队列,就会对当前

  • 问题内容: 我有一个经典的问题,线程将事件推送到第二个线程的传入队列。仅这次,我对性能非常感兴趣。我要实现的是: 我想要并发访问队列,生产者推送,接收者弹出。 当队列为空时,我希望消费者阻止队列,等待生产者。 我的第一个想法是使用,但是我很快意识到它不是并发的,并且会降低性能。另一方面,我现在使用,但仍要为每个出版物支付/ 的费用。由于使用者在找到空队列时不会阻塞,因此我必须进行同步并处于锁定状态

  • 在完美的世界,将没有战争或饥饿,所有 Api 将使用异步写,阳光明媚,绿色的草地有跳来跳去的兔子和手牵手的小羊羔。 但是,现实世界并不是这样。(你看过新闻最近吗?) 事实是,大多数库,特别是在JVM的生态,Y有许多是同步API,许多的方法有可能阻塞。一个很好的例子是JDBC API - 这是本质上的同步,不管如何努力尝试,Vert.x 不能撒上魔法使之同步。 我们不打算在一夜之间把一切改写成异步,

  • 主要内容:Condition类的方法,实例接口提供一个线程挂起执行的能力,直到给定的条件为真。 对象必须绑定到,并使用方法获取对象。 Condition类的方法 以下是类中可用的重要方法的列表。 序号 方法名称 描述 1 使当前线程等待,直到发出信号或中断信号。 2 使当前线程等待直到发出信号或中断,或指定的等待时间过去。 3 使当前线程等待直到发出信号或中断,或指定的等待时间过去。 4 使当前线程等待直到发出信号。 5 使当前线程等待直

  • 本文向大家介绍java并发编程之同步器代码示例,包括了java并发编程之同步器代码示例的使用技巧和注意事项,需要的朋友参考一下 同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger 队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框