package info;
import java.util.concurrent.*;
class Blocker {
private boolean stop = false;
synchronized void waitingCall() {
try {
while (!stop) {
System.out.println(Thread.currentThread() + "begin to wait");
wait();
System.out.println(Thread.currentThread() + " waiting");
}
}
catch(InterruptedException e) {
System.out.println(Thread.currentThread() + "catch");
}
System.out.println(Thread.currentThread() + "over");
}
public void setStop() {
stop = true;
}
synchronized void prod(){ notify(); }
synchronized void prodAll() {notifyAll(); }
}
class Task implements Runnable {
static Blocker block = new Blocker();
public void run() {
block.waitingCall();
}
}
public class Factory {
public static void main(String[] args) {
ExecutorService exe = Executors.newCachedThreadPool();
for (int i = 0; i < 5; ++i)
exe.execute(new Task());
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
}
Task.block.setStop();
//Task.block.prod();//只会执行完一个线程
Task.block.prodAll();//会检测每一个waiting的线程,如果满足条件,就唤醒并执行完
}
}
prodAll:
Thread[pool-1-thread-1,5,main]begin to wait
Thread[pool-1-thread-2,5,main]begin to wait
Thread[pool-1-thread-3,5,main]begin to wait
Thread[pool-1-thread-5,5,main]begin to wait
Thread[pool-1-thread-4,5,main]begin to wait
Thread[pool-1-thread-4,5,main] waiting
Thread[pool-1-thread-4,5,main]over
Thread[pool-1-thread-5,5,main] waiting
Thread[pool-1-thread-5,5,main]over
Thread[pool-1-thread-3,5,main] waiting
Thread[pool-1-thread-3,5,main]over
Thread[pool-1-thread-2,5,main] waiting
Thread[pool-1-thread-2,5,main]over
Thread[pool-1-thread-1,5,main] waiting
Thread[pool-1-thread-1,5,main]over
prod:
Thread[pool-1-thread-1,5,main]begin to wait
Thread[pool-1-thread-3,5,main]begin to wait
Thread[pool-1-thread-2,5,main]begin to wait
Thread[pool-1-thread-4,5,main]begin to wait
Thread[pool-1-thread-5,5,main]begin to wait
Thread[pool-1-thread-1,5,main] waiting
Thread[pool-1-thread-1,5,main]over
//
public class Account {
public static final int BOUND = 10000;
private int balance;
public Account(int balance) {
this.balance = balance;
}
synchronized public boolean withdraw(int amount) throws InterruptedException{
while(balance<amount)
wait();// no money, wait
balance -= amount;
notifyAll();// not full, notify
return true;
}
synchronized public void deposit(int amount) throws InterruptedException{
while(balance+amount >BOUND)
wait();//full, wait
balance +=amount;
notifyAll();// has money, notify
}
}
不是挺好吗?恩,没错,是可以。但是,仍然存在性能上的缺陷:每次都有多个线程被唤醒,而实际只有一个会运行,频繁的上下文切换和锁请求是件很废的事情。我们能不能不要notifyAll,而每次只用notify(只唤醒一个)呢?不好意思,想要“多路复用”,就必须notifyAll(有时,我们只希望唤醒一部分的堵塞线程,而不是全部的堵塞线程),否则会有丢失信号之虞(不解释了)。只有满足下面两个条件,才能使用notify:
一,只有一个条件谓词与条件队列相关,每个线程从wait返回执行相同的逻辑。
二,一进一出:一个对条件变量的通知,语义上至多只激活一个线程。
package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -817911632652898426L;
/** 队列的项,底层是数组,final 修饰意味着 ArrayBlockingQueue 是固定大小的队列 */
private final E[] items;
/** 对数组items 下一个 take, poll or remove 操作的索引index */
private int takeIndex;
/** 对数组items 下一个 put, offer, or add 操作的索引index */
private int putIndex;
/**队列中数据项的数量 ,必须在lock当前锁的情况下才可以使用
注意:count 不需要volitile 修饰, tackIndex putIndex 都一样,因为使用它们的场景都有lock 锁,所以内存可见性和编译代码调整的优化 得到安全的保障!!
*/
private int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** 数据访问的锁*/
private final ReentrantLock lock;
/** 非空等待条件 */
private final Condition notEmpty;
/** 非满等待条件 */
private final Condition notFull;
/**
* 在数组的尾部放数据
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final E[] items = this.items;
final ReentrantLock lock = this.lock;//先加锁
lock.lockInterruptibly();
try {
try {
while (count == items.length) //如果已经满,那么等待
notFull.await();
} catch (InterruptedException ie) {
notFull.signal(); // 防止这个线程一直堵塞着,所以先要给自己signal下。或者,有这个极端情况:这个线程在堵塞时,抛出了中断异常,这时,却把notFull的信号发给了这个线程,
//如果不再把这个信号发出去,很容易造成所有线程的死锁。即使重复发了notFull信号,其他写线程还可以通过while循环来判断是不是真的不满
throw ie;
}
insert(e);
} finally {//
lock.unlock();
}
}
//如果用lock,则必须用finally来释放这个lock,防止有异常抛出,而导致永远不能释放锁
//如果用await,在catch的时候,最好要再发下这个信号
/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
当前的putIndex位置上放置x元素(只能在获取锁的情况下调用)
*/
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex); //插入数据后,下一个要插入的位置需要 +1
++count; //数量+1
notEmpty.signal(); //含义:一旦插入新数据,那么需要通知notEmpty.wait 条件下等待的线程(take数据的线程)
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (count == 0) //如果为空,那么notEmpty await 进入等待状态
notEmpty.await();
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
/**
* Extracts element at current take position, advances, and signals.
* 取一个元素 (只能在获取锁的情况下调用)
*/
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];// 获取当前takeIndex的元素
items[takeIndex] = null;
takeIndex = inc(takeIndex); // 调整下一个获取元素的的位置
--count; //数量要减一个
notFull.signal(); //取出后,通知添加数据的线程,items已经不满了。因为这里只取了一个值,所以只要从堵塞在put上的写线程上任选一个激活就行了
return x;
}
/**
* 情况缓冲
*/
public void clear() {
final E[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = takeIndex; //取元素的位置
int k = count; //总元素的数量
while (k-- > 0) {
items[i] = null;
i = inc(i); //让下一个元素的位置+1
}
count = 0;
putIndex = 0;
takeIndex = 0;
notFull.signalAll(); //非满的条件通知,这样就可以让所有的堵塞在put函数的线程都依次获得锁,判断是不是满,并插入了
} finally {
lock.unlock();
}
}
}