在上一章我们讲解了ArrayBlockingQueue,用数组形式实现的阻塞队列。
数组的长度在创建时就必须确定,如果数组长度小了,那么ArrayBlockingQueue队列很容易就被阻塞,如果数组长度大了,就容易浪费内存。
而队列这个数据结构天然适合用链表这个形式,而LinkedBlockingQueue就是使用链表方式实现的阻塞队列。
一. 链表实现
1.1 Node内部类
/** * 链表的节点,同时也是通过它来实现一个单向链表 */ static class Node<E> { E item; // 指向链表的下一个节点 Node<E> next; Node(E x) { item = x; } }
有一个变量e储存数据,有一个变量next指向下一个节点的引用。它可以实现最简单地单向列表。
1.2 怎样实现链表
/** * 它的next指向队列头,这个节点不存储数据 */ transient Node<E> head; /** * 队列尾节点 */ private transient Node<E> last;
要实现链表,必须有两个变量,一个表示链表头head,一个表示链表尾last。head和last都会在LinkedBlockingQueue对象创建的时候被初始化。
last = head = new Node<E>(null);
注意这里用了一个小技巧,链表头head节点并没有存放数据,它指向的下一个节点,才真正存储了链表中第一个数据。而链表尾last的确储存了链表最后一个数据。
1.3 插入和删除节点
/** * 向队列尾插入节点 */ private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // 当前线程肯定获取了putLock锁 // 将原队列尾节点的next引用指向新节点node,然后再将node节点设置成队列尾节点last // 这样就实现了向队列尾插入节点 last = last.next = node; }
在链表尾插入节点很简单,将原队列尾last的下一个节点next指向新节点node,再将新节点node赋值给队列尾last节点。这样就实现了插入一个新节点。
// 移除队列头节点,并返回被删除的节点数据 private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // 当前线程肯定获取了takeLock锁 // assert head.item == null; Node<E> h = head; // first节点中才存储了队列中第一个元素的数据 Node<E> first = h.next; h.next = h; // help GC // 设置新的head值,相当于删除了first节点。因为head节点本身不储存数据 head = first; // 队列头的数据 E x = first.item; // 移除原先的数据 first.item = null; return x; }
要注意head并不是链表头,它的next才是指向链表头,所以删除链表头也很简单,就是将head.next赋值给head,然后返回原先head.next节点的数据。
删除的时候,就要注意链表为空的情况。head.next的值使用enqueue方法添加的。当head.next==last的时候,表示已经删除到最后一个元素了,当head.next==null的时候,就不能删除了,因为链表已经为空了。这里没有做判断,是因为在调用dequeue方法的地方已经做过判断了。
二. 同步锁ReentrantLock和条件Condition
因为阻塞队列在队列为空和队列已满的情况下,都必须阻塞等待,那么就天然需要两个条件。而为了保证多线程并发安全,又需要一个同步锁。这个在ArrayBlockingQueue中已经说过了。
这里我们来说说LinkedBlockingQueue不一样的地方。
/** 独占锁,用于处理插入队列操作的并发问题,即put与offer操作 */ private final ReentrantLock putLock = new ReentrantLock(); /** 队列不满的条件Condition,它是由putLock锁生成的 */ private final Condition notFull = putLock.newCondition(); /** 独占锁,用于处理删除队列头操作的并发问题,即take与poll操作 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 队列不为空的条件Condition, 它使用takeLock锁生成的 */ private final Condition notEmpty = takeLock.newCondition();
2.1 putLock与takeLock
我们发现使用了两把锁:
按照上面的说法,可能会出现同时插入元素和删除元素的操作,那么就不会出现问题么?
我们来具体分析一个下,对于队列来说操作分为三种:
因此使用putLock锁来保持last变量的安全,使用takeLock锁来保持head变量的安全。
对于都涉及了队列中元素个数count变量,所以使用AtomicInteger来保证并发安全问题。
/** 队列中元素的个数,这里使用AtomicInteger变量,保证并发安全问题 */ private final AtomicInteger count = new AtomicInteger();
2.2 notFull与notEmpty
2.3 控制流程
当插入元素时:
当删除元素时:
还要注意一下,Condition的signal和await方法必须在获取锁的情况下调用。因此就有了signalNotEmpty和signalNotFull方法:
/** * 唤醒在notEmpty条件下等待的线程,即移除队列头时,发现队列为空而被迫等待的线程。 * 注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了takeLock.lock()方法。 * 当队列中插入元素(即put或offer操作),那么队列肯定不为空,就会调用这个方法。 */ private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 唤醒在notFull条件下等待的线程,即队列尾添加元素时,发现队列已满而被迫等待的线程。 * 注意,因为要调用Condition的signal方法,必须获取对应的锁,所以这里调用了putLock.lock()方法 * 当队列中删除元素(即take或poll操作),那么队列肯定不满,就会调用这个方法。 */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
三. 插入元素方法
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // 记录插入操作前元素的个数 int c = -1; // 创建新节点node Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { //表示队列已满,那么就要调用notFull.await方法,让当前线程等待 while (count.get() == capacity) { notFull.await(); } // 向队列尾插入新元素 enqueue(node); // 将当前队列元素个数加1,并返回加1之前的元素个数。 c = count.getAndIncrement(); // c + 1 < capacity表示队列未满,就唤醒可能等待插入操作的线程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } // c == 0表示插入之前,队列是空的。队列从空到放入一个元素时, // 才唤醒等待删除的线程 // 防止频繁获取takeLock锁,消耗性能 if (c == 0) signalNotEmpty(); }
以put方法为例,大体流程与我们前面介绍一样,这里有一个非常怪异的代码,当插入完元素时,如果发现队列未满,那么调用notFull.signal()唤醒等待插入的线程。
大家就很疑惑了,一般来说,这个方法应该放在删除元素(take系列的方法里),因为当我们删除一个元素,那么队列肯定是未满的,那么调用notFull.signal()方法,唤醒等待插入的线程。
这里这么做主要是因为调用signal方法,必须先获取对应的锁,而在take系列的方法里使用的锁是takeLock,那么想调用notFull.signal()方法,必须先获取putLock锁,这样的话会性能就会下降,所以用了另一种方式。
四. 删除队列头元素
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //表示队列为空,那么就要调用notEmpty.await方法,让当前线程等待 while (count.get() == 0) { notEmpty.await(); } // 删除队列头元素,并返回它 x = dequeue(); // 返回当前队列个数,然后将队列个数减一 c = count.getAndDecrement(); // c > 1表示队列不为空,就唤醒可能等待删除操作的线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } /** * c == capacity表示删除操作之前,队列是满的。只有从满队列中删除一个元素时, * 才唤醒等待插入的线程 * 防止频繁获取putLock锁,消耗性能 */ if (c == capacity) signalNotFull(); return x; }
为什么调用notEmpty.signal()方法原因,对比一下我们在插入元素方法中的解释。
五. 查看队列头元素
// 查看队列头元素 public E peek() { // 队列为空,返回null if (count.get() == 0) return null; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { // 获取队列头节点first Node<E> first = head.next; // first == null表示队列为空,返回null if (first == null) return null; else // 返回队列头元素 return first.item; } finally { takeLock.unlock(); } }
查看队列头元素,涉及到head节点,所以必须使用takeLock锁。
六. 其他重要方法
6.1 remove(Object o)方法
// 从队列中删除指定元素o public boolean remove(Object o) { if (o == null) return false; // 因为不是删除列表头元素,所以就涉及到head和last两个变量, // putLock与takeLock都要加锁 fullyLock(); try { // 遍历整个队列,p表示当前节点,trail表示当前节点的前一个节点 // 因为是单向链表,所以需要记录两个节点 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { // 如果找到了指定元素,那么删除节点p if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { fullyUnlock(); } }
从列表中删除指定元素,因为删除的元素不一定在列表头,所以可能会head和last两个变量,所以必须同时使用putLock与takeLock两把锁。因为是单向链表,需要一个辅助变量trail来记录前一个节点,这样才能删除当前节点p。
6.2 unlink(Node<E> p, Node<E> trail) 方法
// 删除当前节点p,trail代表p的前一个节点 void unlink(Node<E> p, Node<E> trail) { // 将当前节点的数据设置为null p.item = null; // 这样就在链表中删除了节点p trail.next = p.next; // 如果节点p是队列尾last,而它被删除了,那么就要将trail设置为last if (last == p) last = trail; // 将元素个数减一,如果原队列是满的,那么就调用notFull.signal()方法 // 其实这个不用判断直接调用的,因为这里肯定获取了putLock锁 if (count.getAndDecrement() == capacity) notFull.signal(); }
要在链表中删除一个节点p,只需要将p的前一个节点trail的next指向节点p的下一个节点next。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持小牛知识库。
本文向大家介绍详细分析Java并发集合ArrayBlockingQueue的用法,包括了详细分析Java并发集合ArrayBlockingQueue的用法的使用技巧和注意事项,需要的朋友参考一下 在上一章中,我们介绍了阻塞队列BlockingQueue,下面我们介绍它的常用实现类ArrayBlockingQueue。 一. 用数组来实现队列 因为队列这种数据结构的特殊要求,所以它天然适合用链表的方
前言 在前面的文章ArrayBlockingQueue中,已经对JDK中的BlockingQueue中的做了一个回顾,同时对ArrayBlockingQueue中的核心方法作了说明,而LinkedBlockingQueue作为JDK中BlockingQueue家族系列中一员,由于其作为固定大小线程池(Executors.newFixedThreadPool())底层所使用的阻塞队列,分析它的目的主
主要内容:1 LinkedBlockingQueue的概述,2 LinkedBlockingQueue的原理,2.1 主要属性,2.2 构造器,2.3 入队操作,2.4 出队操作,2.5 检查操作,2.6 size操作,2.7 迭代操作,3 LinkedBlockingQueue的总结基于JDK1.8详细介绍了LinkedBlockingQueue的底层源码实现,包括锁分离的原理,以及入队列、出队列等操作源码。实际上LinkedBlockingQueue的源码还是非常简单的! 1 LinkedB
本文向大家介绍Java并发编程:volatile关键字详细解析,包括了Java并发编程:volatile关键字详细解析的使用技巧和注意事项,需要的朋友参考一下 volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在Java 5之后,volatile关键字才得以重获生机。 volatile关键字虽然从
本文向大家介绍详细分析android的MessageQueue.IdleHandler,包括了详细分析android的MessageQueue.IdleHandler的使用技巧和注意事项,需要的朋友参考一下 我们知道android是基于Looper消息循环的系统,我们通过Handler向Looper包含的MessageQueue投递Message, 不过我们常见的用法是这样吧? 一般我们比较少接触
本文向大家介绍详细分析java 动态代理,包括了详细分析java 动态代理的使用技巧和注意事项,需要的朋友参考一下 1、动态代理的特点: 字节码随用随创建,随用随加载 2、作用: 不修改源码的基础上对源码进行加强 3、分类: (1)基于接口的动态代理: 涉及到的类:Proxy,由JDK官方提供,使用Proxy类中的newProxyInstance方法创建对