Java线程池队列PriorityBlockingQueue原理分析
作者:Brycen Liu
一、什么是PriorityBlockingQueue?
PriorityBlockingQueue队列是 JDK1.5 的时候出来的一个阻塞队列。但是该队列入队的时候是不会阻塞的,永远会加到队尾。下面我们介绍下它的几个特点:
- PriorityBlockingQueue 和 ArrayBlockingQueue 一样是基于数组实现的,但后者在初始化时需要指定长度,前者默认长度是 11。
- 该队列可以说是真正的无界队列,它在队列满的时候会进行扩容,而前面说的无界阻塞队列其实都有有界,只是界限太大可以忽略(最大值是 2147483647)
- 该队列属于权重队列,可以理解为它可以进行排序,但是排序不是从小到大排或从大到小排,是基于数组的堆结构(具体如何排下面会进行分析)
- 出队方式和前面的也不同,是根据权重来进行出队,和前面所说队列中那种先进先出或者先进后出方式不同。
- 其存入的元素必须实现Comparator,或者在创建队列的时候自定义Comparator
注意:
- 堆结构实际上是一种完全二叉树,建议学习前了解一下二叉树。
- 堆又分为大顶堆和小顶堆。大顶堆中第一个元素肯定是所有元素中最大的,小顶堆中第一个元素是所有元素中最小的。
二、PriorityBlockingQueue类图:
三、源码解析
1、字段讲解
从下面的字段我们可以知道,该队列可以排序,使用显示锁来保证操作的原子性,在空队列时,出队线程会堵塞等。
/** * 默认数组长度 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 最大达容量,分配时超出可能会出现 OutOfMemoryError 异常 */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 队列,存储我们的元素 */ private transient Object[] queue; /** * 队列长度 */ private transient int size; /** * 比较器,入队进行权重的比较 */ private transient Comparator<? super E> comparator; /** * 显示锁 */ private final ReentrantLock lock; /** * 空队列时进行线程阻塞的 Condition 对象 */ private final Condition notEmpty;
2、构造方法
/** * 默认构造,使用长度为 11 的数组,比较器为空 */ public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } /** * 自定义数据长度构造,比较器为空 */ public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } /** * 自定义数组长度,可以自定义比较器 */ public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
3、入队方法
3.1、put方法
入队方法,下面可以看到 put 方法最终会调用 offer 方法,所以我们只看 offer 方法即可。
public void put(E e) { offer(e); // never need to block } public boolean offer(E e) { //判断是否为空 if (e == null) throw new NullPointerException(); //显示锁 final ReentrantLock lock = this.lock; lock.lock(); //定义临时对象 int n, cap; Object[] array; //判断数组是否满了 while ((n = size) >= (cap = (array = queue).length)) //数组扩容 tryGrow(array, cap); try { //拿到比较器 Comparator<? super E> cmp = comparator; //判断是否有自定义比较器 if (cmp == null) //堆上浮 siftUpComparable(n, e, array); else //使用自定义比较器进行堆上浮 siftUpUsingComparator(n, e, array, cmp); //队列长度 +1 size = n + 1; //唤醒休眠的出队线程 notEmpty.signal(); } finally { //释放锁 lock.unlock(); } return true; }
3.2、上浮调整比较器方法的实现
private static <T> void siftUpComparable(int k, T x, Object[] array) { Comparable<? super T> key = (Comparable<? super T>) x; while (k > 0) { //无符号向左移,目的是找到放入位置的父节点 int parent = (k - 1) >>> 1; //拿到父节点的值 Object e = array[parent]; //比较是否大于该元素,不大于就没比较交换 if (key.compareTo((T) e) >= 0) break; //以下都是元素位置交换 array[k] = e; k = parent; } array[k] = key; }
根据上面的代码,可以看出这是完全二叉树在进行上浮调整。调整入队的元素,找出最小的,将元素排列有序化。简单理解就是:父节点元素值一定要比它的子节点得小,如果父节点大于子节点了,那就两者位置进行交换。
3.3、入队图解
说的可能很模糊,我们先写个 demo,根据 demo 来进行图解分析:
/** * @Auther: Gentle * @Date: 2019/4/14 15:11 * @Description: PriorityBlockingQueue 简单演示 demo */ public class TestPriorityBlockingQueue { public static void main(String[] args) throws InterruptedException { PriorityBlockingQueue<Integer> concurrentLinkedQueue = new PriorityBlockingQueue<Integer>(); concurrentLinkedQueue.offer(10); concurrentLinkedQueue.offer(20); concurrentLinkedQueue.offer(5); concurrentLinkedQueue.offer(1); concurrentLinkedQueue.offer(25); concurrentLinkedQueue.offer(30); //输出元素排列 concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); //取出元素 Integer take = concurrentLinkedQueue.take(); System.out.println(); concurrentLinkedQueue.stream().forEach(e-> System.out.print(e+" ")); } }
上面可以看出,我们要入队的元素是 [10,20,5,1,21,30],接下来我们用图来演示一步步入队情况。
队列初始化时:
这时,我们开始将元素 元素 10 入队,并用二叉树辅助理解:
我们在将元素 20 入队:
将元素 5 入队后发现父节点大于子节点,这时需要进行上浮调整
开始进行上浮调整,将元素 10 和元素 5进行位置调换,结果如下:
接着将元素 1 入队后发现父节点大于子节点,继续进行调整:
第一次调整将元素 20 和元素 1 进行位置交换,交换完毕后结果如下:
交换完毕后,我们发现父节点的元素值还是大于子节点,说明还需要进行一次交换,最后交换结果如下:
接下来将元素 25 和 30 入队,结果如下:
注: 最小堆的的顶端一定是元素值最小的那个。
4、出队方法
4.1、take方法
出队方法,该方法会阻塞
public E take() throws InterruptedException { //显示锁 final ReentrantLock lock = this.lock; //可中断锁 lock.lockInterruptibly(); //结果接受对象 E result; try { //判读队列是否为空 while ( (result = dequeue()) == null) //线程阻塞 notEmpty.await(); } finally { lock.unlock(); } return result; }
4.2、dequeue方法
具体出队方法的实现
private E dequeue() { //长度减少 1 int n = size - 1; //判断队列中是否又元素 if (n < 0) return null; else { //队列对象 Object[] array = queue; //取出第一个元素 E result = (E) array[0]; //拿出最后一个元素 E x = (E) array[n]; //置空 array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) //下沉调整 siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); //成功则减少队列中的元素数量 size = n; return result; } }
总体就是找到父节点与两个子节点中最小的一个节点,然后进行交换位置,不断重复,由上而下的交换。
4.3、下沉调整比较器方法的实现
private static <T> void siftDownComparable(int k, T x, Object[] array, int n) { //判断队列长度 if (n > 0) { Comparable<? super T> key = (Comparable<? super T>)x; //找到队列最后一个元素的父节点的索引。 //如下图最大元素是30 父节点是 10,对于索引是 2 int half = n >>> 1; // loop while a non-leaf while (k < half) { //拿到 k 节点下的左子节点 int child = (k << 1) + 1; // assume left child is least //取得子节点对应的值 Object c = array[child]; //取得 k 右子节点的索引 int right = child + 1; //比较右节点的索引是否小于队列长度和左右子节点的值进行比较 if (right < n && ((Comparable<? super T>) c).compareTo((T) array[right]) > 0) c = array[child = right]; //比较父节点值是否大于子节点 if (key.compareTo((T) c) <= 0) break; //下面都是元素替换 array[k] = c; k = child; } array[k] = key; } }
4.4、出队图解
这时,我们需要从队列中取出第一个元素 1,元素 1 取出时会与队列中最后一个元素进行交换,并将最后一个元素置空。(实际上源码不是这么做的,源代码中是用变量来保存索引,直到全部下沉调整完成才进行替换)
替换后,结果就如下图显示一样。我们发现父节点大于子节点了,所以还需要再一次进行替换操作。 再一次替换后,将元素 30 下沉到下一个左边子节点,子节点上浮到原父节点位置。这就完成了下沉调整了。
四、总结
PriorityBlockingQueue 真的是个神奇的队列,可以实现优先出队。最特别的是它只有一个锁,入队操作永远成功,而出队只有在空队列的时候才会进行线程阻塞。可以说有一定的应用场景吧,比如:有任务要执行,可以对任务加一个优先级的权重,这样队列会识别出来,对该任务优先进行出队。
到此这篇关于Java线程池队列PriorityBlockingQueue原理分析的文章就介绍到这了,更多相关Java的PriorityBlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!