Java中的延迟队列DelayQueue详细解析
作者:菜鸟-小胖
这篇文章主要介绍了Java中的延迟队列DelayQueue详细解析,JDK自身支持延迟队列的数据结构,其实类:java.util.concurrent.DelayQueue,<BR>我们通过阅读源码的方式理解该延迟队列类的实现过程,需要的朋友可以参考下
前言
JDK自身支持延迟队列的数据结构,其实类:java.util.concurrent.DelayQueue。
我们通过阅读源码的方式理解该延迟队列类的实现过程。
1.定义
DelayQueue:是一种支持延时获取元素的无界阻塞队列。
特性:
- 线程安全(多生产者,多消费者)(单机,如果想实现分布式,可以结合redis 消息分发,如果需要较高数据可靠性可以考虑结合消息中间件等);
- 内部元素有“延迟”特性:只有延迟到期的元素才允许被获取;
- 具有优先级特性的无界队列,优先级以元素延迟时间为标准,最先过期的元素优先级最高(队首);
- 入队操作不会被阻塞,获取元素在特定情况会阻塞(队列为空,队首元素延迟未到期等);
根据其源码分析为何如此定义以及其特性的由来。
DelayQueue继承关系:
类图分析:
其核心继承/实现:
1.BlockingQueue:说明其具有阻塞队列的特性;
2.元素必实现接口Delayed,而Delayed继承了接口Comparable。因此所有元素必须实现两个方法:
compareTo方法用于元素比较; getDelay方法用于获取元素剩余延时时间。
public interface Delayed extends Comparable<Delayed> { /** * 返回关联对象的剩余延迟时间(可指定时间单位) */ long getDelay(TimeUnit unit); }
2.源码
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { /** * 可重入锁,用于保证线程安全 */ private final transient ReentrantLock lock = new ReentrantLock(); /** * 优先队列(容器),实际存储元素的地方 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** * 等待取元素线程的领导(leader)线程,有且仅有一个leader。 * 具有最高优先级,第一个尝试获取元素的线程。 * leader取完元素后,会唤醒新的等待线程成为新的leader。 */ private Thread leader = null; /** * 触发条件,表示是否可以从队列中读取元素. * 用于等待(await())/通知(signal())其他线程 */ private final Condition available = lock.newCondition(); /** * 构造函数 */ public DelayQueue() { } /** * 构造函数: 调用addAll()方法:将集合c 存入队列中 * */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); } /*--------------------------添加元素(非阻塞)-------------------------------*/ /** * 插入新元素. * 核心内容见:public boolean offer(E e) */ public boolean add(E e) { return offer(e); } /** * 插入新元素. * 核心内容见:public boolean offer(E e) */ public void put(E e) { offer(e); } /** * 插入新元素. * 核心内容见:public boolean offer(E e) * @param e 元素 * @param timeout 此参数将被忽略,因为该方法从不阻塞(废弃) * @param unit 此参数将被忽略,因为该方法从不阻塞(废弃) * @return {@code true} * @throws NullPointerException {@inheritDoc} */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } /** * 插入新元素.(线程安全 lock) * 逻辑: * 1.入队; * 2.如果入队元素为队首元素(原队列为空),唤醒一个等待的线程,通知获取数据。 * * @param e 元素 * @return {@code true} * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { // 入队 q.offer(e); // 若该元素为队列头部元素(说明原队列为空),可以唤醒等待的线程取元素数据 if (q.peek() == e) { // 如果队首元素是刚插入的元素,则设置leader为null(腾位置) leader = null; // 唤醒一个等待的线程 available.signal(); } return true; } finally { lock.unlock(); } } /*--------------------------取出(返回并删除)元素-------------------------------*/ /** * 取出延迟到期元素(非阻塞的).(线程安全 lock) * poll() 方法是非阻塞的,即调用之后无论元素是否存在/延迟到期都会立即返回。 * 逻辑: * 1.查询队首元素; * 2.元素延迟到期返回,否则返回null */ public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { // 查询队首元素 E first = q.peek(); // 队首元素为空或者延时未到期 返回null if (first == null || first.getDelay(NANOSECONDS) > 0) { return null; } else { // 如果到期,取出并删除队首元素 return q.poll(); } } finally { lock.unlock(); } } /** * 取出延迟到期元素(带有超时时间,阻塞).(线程安全 lock) * 如果队首元素未到期或者为null,等待:直到队首元素延迟到期或者超出指定等待时间(timeout) * 逻辑(无限循环等待获取): * 宗旨:在不超出timeout的时间内,循环去取出延迟到期的队首元素(前提无其他线程正在取数--互斥). * 1.查询队首元素; * 2.1.队列空:等待timeout一段时间,直到等待超时(即timeout被重置小于等于0); * 2.2.队列不为空: * 2.2.1. 队首元素延迟到期,取出队首元素(poll()); * 2.2.2. 队首元素延迟未到期: * 2.2.3 等待超时 ,返回null; * 2.2.4 等待未超时,等待时间<延迟时间或者有其他线程正在取数据,继续等待到超时到期 * 2.2.5 等待为超时,等待时间>=延迟时间并且无其他线程正在取数据,该线程设置为leader等待到延迟到期(最后清空leader) * 3. 循环后,如果leader=null(无正在取数线程)并且队列还有数据,唤醒一个等待线程最终成为leader. */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try { for (; ; ) { // 获取队首元素 E first = q.peek(); if (first == null) { // 若队首元素为空(即队列为空,这时就需要关注,当前取值请求是否需要阻塞等待 // 等待时间小于等于0 ,不阻塞等待,直接返回null) if (nanos <= 0) { return null; } else { // 等待相应的时间 nanos = available.awaitNanos(nanos); } } else { // 若队列元素非空,获取队首元素剩余延迟时间 long delay = first.getDelay(NANOSECONDS); // 延时过期 返回元素 if (delay <= 0) { return q.poll(); } // 延时未过期 等待时间超时 ,不等待,直接返回null if (nanos <= 0) { return null; } first = null; // 延时和等待都未到期且等待时间<延迟时间 或者 有其他线程在取数据,当前请求继续等待 if (nanos < delay || leader != null) { nanos = available.awaitNanos(nanos); } else { // 没有其他线程等待,将当前线程设置为 leader,类似于“独占”操作 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待直到延迟到期 long timeLeft = available.awaitNanos(delay); // 计算超时时间 nanos -= delay - timeLeft; } finally { // 该线程操作完毕,把 leader 置空 if (leader == thisThread) { leader = null; } } } } } } finally { // 如果leader线程为空 并且 queue非空,则唤醒其他等待线程 if (leader == null && q.peek() != null) { available.signal(); } lock.unlock(); } } /** * 取出延迟到期元素(无超时时间限制,阻塞).(线程安全 lock) * 逻辑(无限循环等待获取): * 其逻辑参考poll(long timeout, TimeUnit unit). * 其区别在于:不受超时时间限制(timeout) */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 以可中断方式获取锁 lock.lockInterruptibly(); try { // 无限循环 for (; ; ) { // 获取队首元素 E first = q.peek(); if (first == null) { // 若队首元素为空(队列为空),则等待 available.await(); } else { // 若队列元素非空,获取队首元素剩余延迟时间 long delay = first.getDelay(NANOSECONDS); // 延迟到期,获取队首元素 if (delay <= 0) { return q.poll(); } // 延时未过期 first = null; // leader 不为空表示有其他线程在读取数据,当前线程等待 if (leader != null) { available.await(); } else { // 没有其他线程等待,将当前线程设置为 leader Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 等待延迟时间过期 available.awaitNanos(delay); } finally { if (leader == thisThread) { leader = null; } } } } } } finally { // 如果leader线程为空 并且 queue非空,则唤醒其他等待线程 if (leader == null && q.peek() != null) { available.signal(); } lock.unlock(); } } /*--------------------------读取队首元素-------------------------------*/ /** * 读取队首元素.(线程安全 lock) */ public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.peek(); } finally { lock.unlock(); } } /*--------------------------读取队列长度-------------------------------*/ /** * 获取队列数据的长度.(线程安全 lock) */ public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } /*--------------------------获取延迟到期元素集合-------------------------------*/ /** * 将队列中延迟到期数据 收集到集合C中.(线程安全 lock) * * @return 返回延迟到期元素数量 */ public int drainTo(Collection<? super E> c) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // peekExpired() 判断队首元素是否延迟到期 for (E e; (e = peekExpired()) != null; ) { c.add(e); q.poll(); ++n; } return n; } finally { lock.unlock(); } } /** * 将队列中延迟到期数据 收集到集合C中(C集合总数有限制小于maxElements).(线程安全 lock) * @return 返回延迟到期元素数量 */ public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) { throw new NullPointerException(); } if (c == this) { throw new IllegalArgumentException(); } if (maxElements <= 0) { return 0; } final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; // peekExpired() 判断队首元素是否延迟到期。并且到期元素总数不允许超过maxElements for (E e; n < maxElements && (e = peekExpired()) != null; ) { c.add(e); q.poll(); ++n; } return n; } finally { lock.unlock(); } } /** * 读取队首元素(已延迟到期).(私有方法) */ private E peekExpired() { // 获取队首元素 E first = q.peek(); // 队首元素存在并且延迟到期,否则返回null return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : first; } /*--------------------------删除元素-------------------------------*/ /** * 清除队列中所有元素(线程安全 lock)--暴力清除 */ public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { q.clear(); } finally { lock.unlock(); } } /** * 删除指定元素O.(线程安全 lock) */ public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } /** * 删除指定元素O.(这里指的是相同的对象引用/内存地址)(线程安全 lock) */ void removeEQ(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { for (Iterator<E> it = q.iterator(); it.hasNext(); ) { // 使用了对象引用/内存地址相等比较 if (o == it.next()) { it.remove(); break; } } } finally { lock.unlock(); } } /*--------------------------队列转数组-------------------------------*/ /** * 将队列元素都复制到数组中(无序).(线程安全 lock) */ public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } /** * 将队列元素都复制到数组a中(无序). */ public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } /*--------------------------私有内部类--迭代器-------------------------------*/ /** * 返回此队列中所有元素(已过期和未过期)的迭代器。迭代器不按任何特定顺序返回元素。 */ public Iterator<E> iterator() { return new Itr(toArray()); } /** * 快照迭代器,用于处理底层 队列/数组的副本。 */ private class Itr implements Iterator<E> { final Object[] array; // Array of all elements int cursor; // index of next element to return int lastRet; // index of last element, or -1 if no such Itr(Object[] array) { lastRet = -1; this.array = array; } public boolean hasNext() { return cursor < array.length; } @SuppressWarnings("unchecked") public E next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return (E) array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); removeEQ(array[lastRet]); lastRet = -1; } } }
3.使用demo
使用DelayQueue实现延迟队列:
优点:实现简单。
缺点:可扩展性较差,内存限制、无持久化机制等。
@SneakyThrows public static void main(String[] args) { DelayQueue<TestTask> testTaskDelayQueue = new DelayQueue<>(); long time = System.currentTimeMillis(); testTaskDelayQueue.offer(TestTask.builder().name("test_1").endTime(time + 10 * 1000).build()); testTaskDelayQueue.offer(TestTask.builder().name("test_2").endTime(time + 4 * 1000).build()); testTaskDelayQueue.offer(TestTask.builder().name("test_3").endTime(time + 16 * 1000).build()); for(;;){ System.out.println(testTaskDelayQueue.take()); TimeUnit.SECONDS.sleep(2); } } @Data @AllArgsConstructor @NoArgsConstructor @Builder private static class TestTask implements Delayed { private String name; private Long endTime; @Override public long getDelay(TimeUnit unit) { return unit.convert(endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); } }
到此这篇关于Java中的延迟队列DelayQueue详细解析的文章就介绍到这了,更多相关延迟队列DelayQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!