java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > LinkedBlockingQueue源码解析

Java中的LinkedBlockingQueue源码解析

作者:茫然背影

这篇文章主要介绍了Java中的LinkedBlockingQueue源码解析,LinkedBlockingQueue底层是一个链表(可以指定容量,默认是Integer.MAX_VALUE),维持了两把锁,一把锁用于入队,一把锁用于出队,并且使用一个AtomicInterger类型的变量保证线程安全,需要的朋友可以参考下

基本认识

LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。主要对象的定义如下:

    //容量,如果没有指定,该值为Integer.MAX_VALUE;
    private final int capacity;
    //当前队列中的元素
    private final AtomicInteger count = new AtomicInteger();
    //队列头节点,始终满足head.item==null
    transient Node<E> head;
    //队列的尾节点,始终满足last.next==null
    private transient Node<E> last;
    //用于出队的锁
    private final ReentrantLock takeLock = new ReentrantLock();
    //当队列为空时,保存执行出队的线程
    private final Condition notEmpty = takeLock.newCondition();
    //用于入队的锁
    private final ReentrantLock putLock = new ReentrantLock();
    //当队列满时,保存执行入队的线程
    private final Condition notFull = putLock.newCondition();

构造方法

LinkedBlockingQueue的构造方法有三个,分别如下:

从构造方法中可以得出3点结论: 

1. 当调用无参的构造方法时,容量是int的最大值 

2. 队列中至少包含一个节点,哪怕队列对外表现为空 

3. LinkedBlockingQueue不支持null元素

 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//last和head在队列为空时都存在,所以队列中至少有一个节点
    }
    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

put(E e)方法

首先获得入队的锁putLock,判断队列是否已满:count == capacity

public void put(E e) throws InterruptedException {
        //不允许元素为null
        if (e == null) throw new NullPointerException();
        int c = -1;
        //以当前元素新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获得入队的锁
        putLock.lockInterruptibly();
        try {
            //如果队列已满,那么将该线程加入到Condition的等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            //将节点入队
            enqueue(node);
            //得到插入之前队列的元素个数
            c = count.getAndIncrement();
            //如果还可以插入元素,那么释放等待的入队线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //解锁
            putLock.unlock();
        }
        //通知出队线程队列非空
        if (c == 0)
            signalNotEmpty();
    }
 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock
        takeLock.lock();
        try {
            //释放notEmpty条件队列中的第一个等待线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

E take()方法

首先获取takeLodck,判断队列是否可以消费:count.get() == 0

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock锁
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,那么加入到notEmpty条件的等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //得到队头元素
            x = dequeue();
            //得到取走一个元素之前队列的元素个数
            c = count.getAndDecrement();
            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果队列中的元素从满到非满,通知put线程
        if (c == capacity)
            signalNotFull();
        return x;
    }
private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

remove()方法

remove(Object)操作会从队列的头遍历到尾,用到了队列的两端,所以需要对两端加锁,而对两端加锁就需要获取两把锁;

remove()是从头结点删除,所以这个方法只需要获取take锁。

 public boolean remove(Object o) {
        //因为队列不包含null元素,返回false
        if (o == null) return false;
        //获取两把锁
        fullyLock();
        try {
            //从头的下一个节点开始遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //如果匹配,那么将节点从队列中移除,trail表示前驱节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //释放两把锁
            fullyUnlock();
        }
    }

size()方法

由于count是一个AtomicInteger的变量,所以该方法是一个原子性的操作,是线程安全的。

public int size() {
        return count.get();
    }

LinkedBlockingDeque

从上面的字段,可以得到LinkedBlockingDeque内部只有一把锁以及该锁上关联的两个条件,同一时刻只有一个线程可以在队头或者队尾执行入队或出队操作。可以发现这点和LinkedBlockingQueue不同,LinkedBlockingQueue可以同时有两个线程在两端执行操作。  

LinkedBlockingQueue实现总结

LinkedBlockingQueue底层是一个链表(可以指定容量,默认是Integer.MAX_VALUE),维持了两把锁,一把锁用于入队,一把锁用于出队,并且使用一个AtomicInterger类型的变量保证线程安全,AtomicInterger:表示当前队列中含有的元素个数:

到此这篇关于Java中的LinkedBlockingQueue源码解析的文章就介绍到这了,更多相关LinkedBlockingQueue源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文