java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java并发LinkedBlockingQueue

Java并发LinkedBlockingQueue源码分析

作者:历河川

这篇文章主要为大家介绍了Java并发LinkedBlockingQueue源码分析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

简介

LinkedBlockingQueue是一个阻塞的有界队列,底层是通过一个个的Node节点形成的链表实现的,链表队列中的头节点是一个空的Node节点,在多线程下操作时会使用ReentrantLock锁来保证数据的安全性,并使用ReentrantLock下的Condition对象来阻塞以及唤醒线程。

常量

/**
 * 链表中的节点类
 */
static class Node<E> {
    //节点中的元素
    E item;
    //下一个节点
    Node<E> next;
    Node(E x) { item = x; }
}
/** 链表队列的容量大小,如果没有指定则使用Integer最大值 */
private final int capacity;
/** 记录链表中的节点的数量的原子类 */
private final AtomicInteger count = new AtomicInteger();
/**链表的头节点
 */
transient Node<E> head;
/**
 * 链表的尾节点
 */
private transient Node<E> last;
/** 从链表队列中获取节点时防止多个线程同时操作所产生数据安全问题时所加的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** 添加节点到链表队列中防止多个线程同时操作所产生数据安全问题时所加的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

构造方法

/**
 * 创建默认容量大小的链表队列
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}
/**
 * 创建指定容量大小的链表队列
 */
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //创建一个空节点,并将该节点设置为头尾节点
    last = head = new Node<E>(null);
}
/**
 * 根据指定集合中的元素创建一个默认容量大小的链表队列
 */
public LinkedBlockingQueue(Collection<? extends E> c) {
    //创建默认容量大小的链表队列
    this(Integer.MAX_VALUE);
    //获取添加元素节点的锁
    final ReentrantLock putLock = this.putLock;
    //加锁
    putLock.lock();
    try {
        //链表中节点的数量
        int n = 0;
        //遍历集合中的元素
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //为元素创建一个节点,并将节点添加到链表的尾部,并设置节点为尾节点
            enqueue(new Node<E>(e));
            //链表中节点的数量自增
            ++n;
        }
        //记录链表中节点的数量
        count.set(n);
    } finally {
        //释放锁
        putLock.unlock();
    }
}

第一个和第三个构造方法中都会调用第二个构造方法,而在第二个构造方法中会设置链表队列中容纳节点的数量以及创建一个空的头节点来填充,再看第三个构造方法中的代码,首先会获取putLock锁,代表当前是一个需要添加节点的线程,再将指定集合中的元素封装成一个Node节点,并依次将封装的节点追加到链表队列中的尾部,并使用AtomicInteger来记录链表队列中节点的数量。

put

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    //为指定元素创建节点
    Node<E> node = new Node<E>(e);
    //获取添加元素节点的锁
    final ReentrantLock putLock = this.putLock;
    //获取记录链表节点数量的原子类
    final AtomicInteger count = this.count;
    //加锁,如果加锁的线程被中断了则抛出异常
    putLock.lockInterruptibly();
    try {
        //校验链表中的节点数量是否到达了指定的容量
        //如果到达了指定的容量就进行阻塞等待
        //如果线程被唤醒了,但是链表中的节点数量还是未改变,则继续阻塞等待
        //只有当头节点出队,新的节点才能继续添加
        while (count.get() == capacity) {
            notFull.await();
        }
        //将新节点添加到链表的尾部并设置为尾节点
        enqueue(node);
        //获取没有添加当前节点时链表中的节点数量
        //并更新链表中的节点数量
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            //唤醒等待添加节点的线程
            //可能当前线程在等待队列中等待的时候
            //有新的线程要执行添加节点的操作
            //但是链表的容量已经到达最大,所以新的线程也会进行等待
            //当前线程被唤醒了并且链表的容量没有到达最大则尝试去唤醒等待的线程
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }
    if (c == 0)
        //c等于0说明添加当前节点的时候链表中没有节点
        //可能有线程在获取节点,但是链表中没有节点
        //从而一直进行等待,当添加了节点的时候就需要唤醒获取节点的线程
        signalNotEmpty();
}

LinkedBlockingQueue中的代码都比较简单,主要是ReentrantLock下的Condition中的方法比较复杂,我们先整体的了解一下put方法,首先通过new Node为将指定元素封装成一个节点,再获取putLock锁,当链表队列中的节点数量已经到达了capacity大小,那当前线程就需要调用Condition下的await方法进行等待将线程阻塞,直到有节点出队或者说有节点被删除或者当前线程被中断了,当前线程被中断了则会直接退出当前put方法并抛出异常,如果节点出队了或者节点被删除了,那当前线程被唤醒了则会继续执行添加节点的操作。

enqueue方法则会将封装的节点追加到链表队列中的尾部,通过getAndIncrement方法先获取没有添加当前节点时链表队列中节点的数量,然后更新添加了当前节点之后链表队列中节点的数量,c则是没有添加当前节点时链表队列中节点的数量,c+1则是添加当前节点后链表队列中节点的数量,如果说c+1小于capacity则说明线程在添加节点的时候,链表队列中的节点数量已经到达了最大值,后续添加节点的线程都需要进行阻塞,当有节点被删除或出队的时候,最开始阻塞的线程被唤醒,被唤醒的线程则会去执行添加节点的操作,当添加完节点之后链表队列中的节点数量没有到达最大值则会去唤醒后续被阻塞的线程执行添加节点的操作。

c等于0说明在添加当前节点之前,可能有线程在获取链表队列中的节点,但是链表队列中没有节点,导致获取节点的线程处于阻塞状态,当添加完节点之后,链表队列中有了节点,此时就需要唤醒阻塞的线程去获取节点。

添加元素的方法分为put和offer,区别在于阻塞与非阻塞,当链表队列中的节点数量已经到达最大值,put方法则会阻塞,而offer方法不会阻塞则是直接返回。

获取元素的方法分为take、poll、peek,take方法与put方法相似,只不过一个是入队,一个是出队,pollpeek都是非阻塞的,但是区别在于poll获取了节点之后,该节点会从链表队列中移除,而peek不会移除节点。

await

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        //线程被中断抛出异常
        throw new InterruptedException();
    //为当前线程创建一个等待模式的节点并入队,并将等待队列中已经取消等待的节点移除掉
    Node node = addConditionWaiter();
    //释放当前线程的锁,防止当前线程加了锁,导致其它在等待的线程被唤醒之后不能获取到锁从而导致一直阻塞
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果指定节点还在等待队列中等待则挂起
    //如果指定节点被中断了则会将指定节点添加到同步等待队列中
    //如果指定节点被唤醒了则会将指定节点添加到同步等待队列中
    while (!isOnSyncQueue(node)) {
        //节点在等待队列中则挂起
        LockSupport.park(this);
        //线程在等待队列中被中断则会添加到同步等待队列中
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //acquireQueued 指定节点中的线程被中断了或者被唤醒了则会尝试去获取锁
    //如果还未到指定节点中的线程获取锁的时候则会继续挂起
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        //指定节点的线程已经获取到了锁并且节点关联的下一个节点不为空
        //此时就需要将已经获取到锁的节点从等待队列中移除
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

首先通过addConditionWaiter方法将当前线程封装成一个等待模式的节点,并将节点添加到等待队列中以及会将等待队列中已经取消等待的线程节点从队列中移除,再通过fullyRelease方法释放掉当前线程加的所有的锁,之所以释放锁是防止其它线程获取不到锁从而一直阻塞,再看isOnSyncQueue方法,该方法是校验当前线程节点是否在等待队列中,如果在等待队列中那就将节点中的线程挂起等待。

isOnSyncQueue

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        //指定节点还在等待队列中此时就需要继续等待
        return false;
    if (node.next != null)
        //指定节点已经不在等待队列中了
        return true;
    //从等待队列中的尾节点开始向头节点遍历,校验指定的节点是否在其中
    return findNodeFromTail(node);
}

当节点的状态为CONDITION时,则说明该节点还在等待队列中,node.prev等于null为什么说也是在等待队列中呢?因为等待队列中的节点是没有prev指针和next指针的,如果prev指针和next指针指向的节点不为空,那就说明该节点是在同步等待队列中的,如果在同步等待队列中的话,那节点中的线程就可以尝试去获取锁并执行后续的操作。

当等待队列中的线程节点被唤醒和中断则会添加到同步等待队列中,如果是被中断的话则会通过checkInterruptWhileWaiting方法添加一个中断标识,再通过acquireQueued方法来获取锁,如果获取锁失败则继续等待,当获取锁成功之后则会该节点从等待队列中移除,如果说你是一个被中断的线程,最后会通过reportInterruptAfterWait方法抛出中断异常。

signal

public final void signal() {
    if (!isHeldExclusively())
        //加锁的线程不是当前线程则抛出异常
        throw new IllegalMonitorStateException();
    //头节点
    Node first = firstWaiter;
    if (first != null)
        //唤醒头节点
        doSignal(first);
}
/**
 * 唤醒等待队列中的头节点
 * 如果等待队列中的头节点被取消等待或已经被唤醒了
 * 此时就需要唤醒头节点的后续的一个节点
 * 直到成功的唤醒一个节点中的线程
 */
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 将指定的节点添加到同步等待队列中
* 并根据前一个节点的等待状态来决定是否需要立刻唤醒指定节点
*/
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        //更改节点状态失败说明该节点已经被唤醒了
        return false;
    //将要唤醒的节点添加到同步等待队列中
    //并返回前一个节点
    Node p = enq(node);
    //前一个节点的等待状态
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //如果前一个节点的等待状态大于0则说明已经被取消加锁,此时就需要唤醒后续的节点,就是当前节点
        //前一个节点的等待状态不大于0但是更改前一个节点的等待状态时失败则说明前一个节点已经被唤醒了并更改了状态
        //此时就需要尝试将当前节点中的线程唤醒
        LockSupport.unpark(node.thread);
    return true;
}

唤醒线程节点的方法主要还是看transferForSignal方法,首先会通过cas操作将需要唤醒的节点的状态设置为0,如果更改节点状态失败则说明该节点已经被唤醒了,更新节点状态成功则会通过enq方法将节点添加到同步等待队列中,此时就需要根据前一个节点来决定是否需要立即唤醒当前节点中的线程。

从下面的图片中能看出来其实同步等待队列和等待队列中使用的节点是共用的节点,并不会创建新的节点,同步等待队列中的节点使用next指针和prev指针来关联节点,而等待队列中则是使用nextWaiter指针来关联节点的。

以上就是Java并发LinkedBlockingQueue源码分析的详细内容,更多关于Java并发LinkedBlockingQueue的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文