Java中的LinkedBlockingQueue源码解析
作者:茫然背影
基本认识
LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。主要对象的定义如下:
//容量,如果没有指定,该值为Integer.MAX_VALUE; private final int capacity; //当前队列中的元素 private final AtomicInteger count = new AtomicInteger(); //队列头节点,始终满足head.item==null transient Node<E> head; //队列的尾节点,始终满足last.next==null private transient Node<E> last; //用于出队的锁 private final ReentrantLock takeLock = new ReentrantLock(); //当队列为空时,保存执行出队的线程 private final Condition notEmpty = takeLock.newCondition(); //用于入队的锁 private final ReentrantLock putLock = new ReentrantLock(); //当队列满时,保存执行入队的线程 private final Condition notFull = putLock.newCondition();
构造方法
LinkedBlockingQueue的构造方法有三个,分别如下:
从构造方法中可以得出3点结论:
1. 当调用无参的构造方法时,容量是int的最大值
2. 队列中至少包含一个节点,哪怕队列对外表现为空
3. LinkedBlockingQueue不支持null元素
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null);//last和head在队列为空时都存在,所以队列中至少有一个节点 } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put(E e)方法
首先获得入队的锁putLock,判断队列是否已满:count == capacity
- 未满:将节点链入尾部,元素数量+1,此时如果发现队列还没满还可以生产,就唤醒其他生产线程notFull.signal也进行生产,生产一个后,如果此时队列是有空变为非空的(证明此时所有的消费者都在阻塞notEmpty.await(),此时必须由生产者进行唤醒,不然无法向下进行,也就是从空到非空这个时候必须由生产者唤醒消费者,之后的就是消费者唤醒自己的兄弟姐妹们),就唤醒消费者队列notEmpty的头结点notEmpty.signal,通知消费这消费这个消费者消费后发现还有可以消费的元素,就通知notEmpty队列里的头结点,就这样notEmpty队列一次被唤醒了,notEmpty只在第一次是被生产者唤醒的。
- 已满:就调用notFull.await阻塞,释放锁,从AQS队列移除,将生产者加入到notFull条件队列尾部,等待着被唤醒后继续生产
public void put(E e) throws InterruptedException { //不允许元素为null if (e == null) throw new NullPointerException(); int c = -1; //以当前元素新建一个节点 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; //获得入队的锁 putLock.lockInterruptibly(); try { //如果队列已满,那么将该线程加入到Condition的等待队列中 while (count.get() == capacity) { notFull.await(); } //将节点入队 enqueue(node); //得到插入之前队列的元素个数 c = count.getAndIncrement(); //如果还可以插入元素,那么释放等待的入队线程 if (c + 1 < capacity) notFull.signal(); } finally { //解锁 putLock.unlock(); } //通知出队线程队列非空 if (c == 0) signalNotEmpty(); }
private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; //获取takeLock takeLock.lock(); try { //释放notEmpty条件队列中的第一个等待线程 notEmpty.signal(); } finally { takeLock.unlock(); } }
E take()方法
首先获取takeLodck,判断队列是否可以消费:count.get() == 0
- 可以消费:返回头节点的下一个节点(头结点不存数据,是永远存在的一个节点),并移除此节点,元素数量-1,如果此时发现队列中还有元素可以消费,就唤醒其他消费者notEmpty.signal,进行消费。如果此时队列是从满变为未满的(证明此时所有的生产者都在阻塞,此时必须有消费者唤醒生产者的第一个节点,之后就是生产者唤醒自己的兄弟姐妹了),
- 不可以消费:调用notEmpty.awati进行阻塞,释放锁,从AQS队列移除,进入NotEmpty队列尾部,等待被唤醒
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; //获取takeLock锁 takeLock.lockInterruptibly(); try { //如果队列为空,那么加入到notEmpty条件的等待队列中 while (count.get() == 0) { notEmpty.await(); } //得到队头元素 x = dequeue(); //得到取走一个元素之前队列的元素个数 c = count.getAndDecrement(); //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } //如果队列中的元素从满到非满,通知put线程 if (c == capacity) signalNotFull(); return x; }
private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
remove()方法
remove(Object)操作会从队列的头遍历到尾,用到了队列的两端,所以需要对两端加锁,而对两端加锁就需要获取两把锁;
remove()是从头结点删除,所以这个方法只需要获取take锁。
public boolean remove(Object o) { //因为队列不包含null元素,返回false if (o == null) return false; //获取两把锁 fullyLock(); try { //从头的下一个节点开始遍历 for (Node<E> trail = head, p = trail.next; p != null; trail = p, p = p.next) { //如果匹配,那么将节点从队列中移除,trail表示前驱节点 if (o.equals(p.item)) { unlink(p, trail); return true; } } return false; } finally { //释放两把锁 fullyUnlock(); } }
size()方法
由于count是一个AtomicInteger的变量,所以该方法是一个原子性的操作,是线程安全的。
public int size() { return count.get(); }
LinkedBlockingDeque
从上面的字段,可以得到LinkedBlockingDeque内部只有一把锁以及该锁上关联的两个条件,同一时刻只有一个线程可以在队头或者队尾执行入队或出队操作。可以发现这点和LinkedBlockingQueue不同,LinkedBlockingQueue可以同时有两个线程在两端执行操作。
LinkedBlockingQueue实现总结
LinkedBlockingQueue底层是一个链表(可以指定容量,默认是Integer.MAX_VALUE),维持了两把锁,一把锁用于入队,一把锁用于出队,并且使用一个AtomicInterger类型的变量保证线程安全,AtomicInterger:表示当前队列中含有的元素个数:
- 生产者不断进行生产会向链表尾部不断链入元素,直到达到容量后,此时所有生产者依次进入notFull条件队列进行阻塞,此时如果任意一个消费者消费了一个元素,就会通知notFull队列第一个节点进行生产,notFull第一个节点生产完毕后发现还有位置可以生产就会唤醒notFull的第二个节点,notFull第二个节点生产后发现还有位置则唤醒notFull第三个节点,就这样就可以唤醒notFull里的所有生产者
- 消费从链表头部开始向后消费,只要还有元素就可以不断消费,消费完所有的元素后,此时所有消费者依次进入notEmpty条件队列进行阻塞, 这个时候一旦生产者生产了一个元素,就会唤醒notEmpty的第一个节点,而这个节点消费完后如果发现还有元素可以消费,就会唤醒自己的兄弟姐妹(notEmpty的第二个节点),notEmpty的第二个节点消费完后如果发现还有元素可以消费就会再唤醒notEmpty的第三个节点,就这样就唤醒了notEmpty里的所有的消费者
- 消费者一直在砍头,生产者一直在添尾
到此这篇关于Java中的LinkedBlockingQueue源码解析的文章就介绍到这了,更多相关LinkedBlockingQueue源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!