Java并发包中的PriorityBlockingQueue深度解析
作者:lang20150928
PriorityBlockingQueue是Java并发包中的一种线程安全、无界、优先级队列,支持多线程并发访问,它的核心思想是每次取出的元素是当前队列中优先级最高的元素,本文给大家介绍Java并发包中的PriorityBlockingQueue,感兴趣的朋友跟随小编一起看看吧
PriorityBlockingQueue<E> 是 Java 并发包(java.util.concurrent)中提供的一个线程安全的、无界、优先级队列。它的核心思想是:
每次取出的元素,都是当前队列中“优先级最高”的那个元素(即最小值,依据自然排序或自定义比较器)。
一、关键特性总结
| 特性 | 说明 |
|---|---|
| 线程安全 | 所有公共操作都通过 ReentrantLock 加锁,支持多线程并发访问。 |
| 无界(逻辑上) | 理论上可以无限添加元素(但受 JVM 内存限制,可能抛 OutOfMemoryError)。 |
| 不允许 null 元素 | 插入 null 会抛 NullPointerException。 |
| 基于堆(Heap)实现 | 底层使用数组表示的二叉堆(最小堆),保证 queue[0] 是优先级最高的元素。 |
| 阻塞式取操作 | 提供 take()、poll(timeout) 等方法,在队列为空时可阻塞等待。 |
| 不保证迭代顺序 | iterator() 不按优先级顺序遍历!如需有序,必须用 Arrays.sort(toArray())。 |
| 插入/删除时间复杂度 | O(log n),因为要维护堆结构。 |
二、核心机制解析
1.底层数据结构:二叉最小堆
- 使用
Object[] queue存储元素。 - 对于任意节点
i:- 左孩子:
2*i + 1 - 右孩子:
2*i + 2 - 父节点:
(i - 1) / 2
- 左孩子:
- 堆性质:父节点 ≤ 子节点 → 根节点(
queue[0])是最小值(最高优先级)。
2.扩容机制(tryGrow)
- 当数组满时,自动扩容:
- 小容量(<64):增长较快(+ oldCap + 2)
- 大容量:增长 50%(oldCap >> 1)
- 特殊设计:扩容时不持有主锁(
lock),而是用 CAS 自旋锁(allocationSpinLock)避免阻塞消费者。- 目的:防止生产者扩容时长时间持有锁,导致消费者“饿死”。
3.堆调整操作
- siftUp:插入新元素后,从底部向上调整(冒泡到合适位置)。
- siftDown:删除根节点后,把最后一个元素放到根,再向下调整。
- 分为两种版本:
siftUpComparable/siftDownComparable:使用元素自身的compareTo()siftUpUsingComparator/siftDownUsingComparator:使用外部Comparator
4.构造函数逻辑
- 如果传入的是
SortedSet或PriorityQueue,直接复用其排序规则(无需重新建堆)。 - 否则,对传入集合调用
heapify()从底向上建堆(时间复杂度 O(n))。
5.阻塞与非阻塞操作
| 方法 | 行为 |
|---|---|
offer(e) | 立即插入,返回 true(永不阻塞,因无界) |
put(e) | 同 offer,语义上“可能阻塞”,但实际不会 |
take() | 队列空时阻塞,直到有元素 |
poll() | 队列空时立即返回 null |
poll(timeout, unit) | 队列空时最多等待 timeout 时间 |
6.关于迭代器(重要!)
Iterator<E> it = pq.iterator(); // ❌ 不保证按优先级顺序遍历!
- 原因:堆的数组存储不是排序数组,只是满足堆性质。
- 正确做法:如需有序遍历,必须:
Object[] arr = pq.toArray(); Arrays.sort(arr); // 或使用 Comparator
三、FIFOEntry 示例:解决“优先级相同时的公平性”
当多个元素优先级相同(compareTo == 0),默认不保证谁先出队。
解决方案:引入“插入顺序”作为第二排序键。
class FIFOEntry<E extends Comparable<? super E>>
implements Comparable<FIFOEntry<E>> {
static final AtomicLong seq = new AtomicLong(0);
final long seqNum; // 插入序号
final E entry;
public int compareTo(FIFOEntry<E> other) {
int res = entry.compareTo(other.entry);
if (res == 0)
res = Long.compare(seqNum, other.seqNum); // 先插入的先出
return res;
}
}使用时:
pq.offer(new FIFOEntry(myElement));
四、典型使用场景
- 任务调度系统:高优先级任务先执行。
- 事件处理:紧急事件优先处理。
- 合并多个有序流:如多路归并(配合
take()阻塞特性)。
五、注意事项
- 不要依赖
iterator()的顺序! - 避免在比较器中抛异常:会导致队列状态不一致。
- 内存风险:虽然是“无界”,但大量积压会导致 OOM。
- 性能:高并发下,所有操作串行化(单锁),吞吐量不如
ConcurrentLinkedQueue,但语义不同。
总结
PriorityBlockingQueue = 线程安全的 PriorityQueue + BlockingQueue 接口
它适合需要按优先级消费、且允许多线程协作的场景,但要注意其无界性和迭代无序性。
如果你理解了二叉堆、CAS 自旋锁、以及阻塞条件(Condition notEmpty),就掌握了它的精髓。
到此这篇关于Java并发包中的PriorityBlockingQueue解析的文章就介绍到这了,更多相关java并发包PriorityBlockingQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
