当前位置: 首页 > 工具软件 > Notify.js > 使用案例 >

notify notifyAll

东门仲卿
2023-12-01
package info;

import  java.util.concurrent.*;

class Blocker {
	private boolean stop = false;
	synchronized void waitingCall() {
		try {
			while (!stop) {
				System.out.println(Thread.currentThread() + "begin to wait");
				wait();
				System.out.println(Thread.currentThread() + " waiting");
			}
		}
		catch(InterruptedException e) {
			System.out.println(Thread.currentThread() + "catch");
		}
		System.out.println(Thread.currentThread() + "over");
	}
	
	public void setStop() {
		stop = true;
	}
	
	synchronized void prod(){ notify(); }
	synchronized void prodAll() {notifyAll(); }
}

class Task implements Runnable {
	static Blocker block = new Blocker();
	public void run() {
		block.waitingCall();
	}
}

public class Factory {
	
	
	public static void main(String[] args) {
		ExecutorService exe = Executors.newCachedThreadPool();
		for (int i = 0; i < 5; ++i)
			exe.execute(new Task());
		try {
		Thread.sleep(2000);
		}
		catch (InterruptedException e) {
			
		}
		Task.block.setStop();
		//Task.block.prod();//只会执行完一个线程
		Task.block.prodAll();//会检测每一个waiting的线程,如果满足条件,就唤醒并执行完
		
	}

}


prodAll:

Thread[pool-1-thread-1,5,main]begin to wait
Thread[pool-1-thread-2,5,main]begin to wait
Thread[pool-1-thread-3,5,main]begin to wait
Thread[pool-1-thread-5,5,main]begin to wait
Thread[pool-1-thread-4,5,main]begin to wait
Thread[pool-1-thread-4,5,main] waiting
Thread[pool-1-thread-4,5,main]over
Thread[pool-1-thread-5,5,main] waiting
Thread[pool-1-thread-5,5,main]over
Thread[pool-1-thread-3,5,main] waiting
Thread[pool-1-thread-3,5,main]over
Thread[pool-1-thread-2,5,main] waiting
Thread[pool-1-thread-2,5,main]over
Thread[pool-1-thread-1,5,main] waiting
Thread[pool-1-thread-1,5,main]over


prod:

Thread[pool-1-thread-1,5,main]begin to wait
Thread[pool-1-thread-3,5,main]begin to wait
Thread[pool-1-thread-2,5,main]begin to wait
Thread[pool-1-thread-4,5,main]begin to wait
Thread[pool-1-thread-5,5,main]begin to wait
Thread[pool-1-thread-1,5,main] waiting
Thread[pool-1-thread-1,5,main]over


//

1. wait notify 必须在同步块中执行,或者获取锁之后执行. 
    注意wait,notify是基于对象的而不是线程
    wait的内部实现是:1.释放锁;2. 等待收到notify的信号,否则就一直堵塞(注意此时不占用cpu);3. 加锁
    notify的内部实现是:仅仅是发送信号,不会释放锁,由外部决定释放锁的操作

从中可以总结出,wait和notify必须要被同步块包围。

2. notify和notifyall都只会唤醒本对象锁定的线程,区别在于,notify只会任选一个本对象锁定的wait的线程,而这个线程的实现往往是 while(condition) wait, 如果这个线程又发现条件        condition还是不满足,还是进入wait阶段
 notifyall则会唤醒所有本对象锁定的wait线程, 但是处在wait的队列中的线程都必须依次获取到对象的锁,进行下部的执行


3. wait(timeout) 如果超过了timeout还是没有收到notify就会自动解除堵塞,执行下步操作


通过synchronized同步也叫做是 Monitor同步,其自带了简单的线程间同步的notify和wait,但是灵活性比较差,这是因为这只是一个信号,所以一般而言只能表示一个状态的变化,而在同步逻辑中多个状态的变化也比较多。这时,就需要用到 ReentrantLock和其生成的Condition进行多个状态之间的同步。

Java用wait/notify机制实际上默认给一个内部锁绑定了一个条件队列,但是,有时候,针对一个状态(锁),我们的程序需要两个或以上的条件队列,比如,刚才的Account例子,如果某个2B银行有这样的规定“一个账户存款不得多于10000元”,这个时候,我们的存钱需要满足“余额+要存的数目不大于10000,否则等待,直到满足这个限制”,取钱需要满足“余额足够,否则等待,直到有钱为止”,这里需要两个条件队列,一个等待“存款不溢出”,一个等待“存款足够”,这时,一个默认的条件队列够用么?你可能又说,够用,我们可以模仿network里的“多路复用”,一个队列就能当多个来使,像这样:

public class Account {
	public static final int BOUND = 10000;
	private int balance;
	
	public Account(int balance) {
		this.balance = balance;
	}
	
	synchronized public boolean withdraw(int amount) throws InterruptedException{
			while(balance<amount)
				wait();// no money, wait
			balance -= amount;
			notifyAll();// not full, notify
			return true;
	}
	
	synchronized public void deposit(int amount) throws InterruptedException{
			while(balance+amount >BOUND)
				wait();//full, wait
			balance +=amount;
			notifyAll();// has money, notify
	}
}

不是挺好吗?恩,没错,是可以。但是,仍然存在性能上的缺陷:每次都有多个线程被唤醒,而实际只有一个会运行,频繁的上下文切换和锁请求是件很废的事情。我们能不能不要notifyAll,而每次只用notify(只唤醒一个)呢?不好意思,想要“多路复用”,就必须notifyAll(有时,我们只希望唤醒一部分的堵塞线程,而不是全部的堵塞线程),否则会有丢失信号之虞(不解释了)。只有满足下面两个条件,才能使用notify:

一,只有一个条件谓词与条件队列相关,每个线程从wait返回执行相同的逻辑。

二,一进一出:一个对条件变量的通知,语义上至多只激活一个线程。


比如 ArrayBlockingQueue

需要有两个状态,一个是队列是不是满了,另一个是队列是不是空的,用monitor实现就很复杂,这时就要用condition实现


ArrayBlockingQueue 是jdk1.5 新提供的阻塞队列,实现了固定大小的队列。 

功能: 
1、阻塞的效果 。put时如果元素已经满,那么阻塞,get时 如果队列为空,那么阻塞。 
2、是实现生产者消费者模型的极好的备选工具。 

实现依赖: 
1、lock锁(内存的可见性、互斥访问、限制编译器的代码优化调整) 
2、Condition条件通知(线程间的协作)
 

注意点:代码中多次用的signal 而不是signalAll 有原因的: 
1、signalAll 唤醒所有等待的线程,事实上只能有一个通过获得锁,那么会增加锁竞争的几率。效率也低,如果用signal ,那么仅唤醒一个线程,这正是我们所需要的场景! 


package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;


public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    
    private static final long serialVersionUID = -817911632652898426L;

    /** 队列的项,底层是数组,final 修饰意味着 ArrayBlockingQueue 是固定大小的队列 */
    private final E[] items;
    /** 对数组items 下一个 take, poll or remove 操作的索引index */
    private int takeIndex;
    /** 对数组items 下一个 put, offer, or add 操作的索引index */
    private int putIndex;
    /**队列中数据项的数量 ,必须在lock当前锁的情况下才可以使用
     注意:count 不需要volitile 修饰, tackIndex putIndex 都一样,因为使用它们的场景都有lock 锁,所以内存可见性和编译代码调整的优化 得到安全的保障!!
     */
    private int count;

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /**  数据访问的锁*/
    private final ReentrantLock lock;
    /** 非空等待条件 */
    private final Condition notEmpty;
    /** 非满等待条件 */
    private final Condition notFull;




    /**
     * 在数组的尾部放数据
     */
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;//先加锁
        lock.lockInterruptibly();
        try {
            try {
                while (count == items.length) //如果已经满,那么等待
                    notFull.await();
            } catch (InterruptedException ie) {
                notFull.signal(); // 防止这个线程一直堵塞着,所以先要给自己signal下。或者,有这个极端情况:这个线程在堵塞时,抛出了中断异常,这时,却把notFull的信号发给了这个线程,
				//如果不再把这个信号发出去,很容易造成所有线程的死锁。即使重复发了notFull信号,其他写线程还可以通过while循环来判断是不是真的不满
                throw ie;
            }
            insert(e);
        } finally {//
            lock.unlock();
        }
    }
	//如果用lock,则必须用finally来释放这个lock,防止有异常抛出,而导致永远不能释放锁
	//如果用await,在catch的时候,最好要再发下这个信号
	
	
    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
      当前的putIndex位置上放置x元素(只能在获取锁的情况下调用)
     */
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex); //插入数据后,下一个要插入的位置需要 +1
        ++count; //数量+1
        notEmpty.signal(); //含义:一旦插入新数据,那么需要通知notEmpty.wait 条件下等待的线程(take数据的线程)
    }



    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0) //如果为空,那么notEmpty  await 进入等待状态
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            E x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }
	
	    /**
     * Extracts element at current take position, advances, and signals.
     * 取一个元素 (只能在获取锁的情况下调用)
     */
    private E extract() {
        final E[] items = this.items;
        E x = items[takeIndex];// 获取当前takeIndex的元素
        items[takeIndex] = null;
        takeIndex = inc(takeIndex); // 调整下一个获取元素的的位置
        --count; //数量要减一个
        notFull.signal(); //取出后,通知添加数据的线程,items已经不满了。因为这里只取了一个值,所以只要从堵塞在put上的写线程上任选一个激活就行了
        return x;
    }


    /**
     * 情况缓冲
     */
    public void clear() {
        final E[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            int i = takeIndex; //取元素的位置
            int k = count; //总元素的数量
            while (k-- > 0) {
                items[i] = null;
                i = inc(i); //让下一个元素的位置+1
            }
            count = 0;
            putIndex = 0;
            takeIndex = 0;
            notFull.signalAll(); //非满的条件通知,这样就可以让所有的堵塞在put函数的线程都依次获得锁,判断是不是满,并插入了
        } finally {
            lock.unlock();
        }
    }

    
}






 类似资料:

相关阅读

相关文章

相关问答