Java无界阻塞队列DelayQueue详细解析
作者:努力的小强
概述
DelayQueue是一个支持时延获取元素的无界阻塞队列。
队列使用PriorityQueue来实现。
队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。
只有在延迟期满时才能从队列中提取元素。
DelayQueue可以运用在以下两个应用场景:
缓存系统的设计:使用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取元素时,就表示有缓存到期了。
定时任务调度:使用DelayQueue保存当天要执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,比如Tiner就是使用DelayQueue实现的。
用法实例
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author admin */ public class Message implements Delayed { /** *触发时间 */ private long time; /** *名称 */ String name; public Message(String name,long time,TimeUnit unit){ this.name = name; this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { Message item = (Message) o; long diff = this.time - item.time; if (diff <= 0){ return -1; }else{ return 1; } } @Override public String toString() { return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}"; } }
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; /** * @author admin */ public class DelayQueueDemo { public static void main(String[] args) throws InterruptedException { Message item1 = new Message("消息1",5, TimeUnit.SECONDS); Message item2 = new Message("消息2",10, TimeUnit.SECONDS); Message item3 = new Message("消息3",15, TimeUnit.SECONDS); DelayQueue<Message> queue = new DelayQueue<Message>(); queue.add(item1); queue.add(item2); queue.add(item3); int queueLengh = queue.size(); System.out.println(printDate() + "开始!"); for (int i = 0; i < queueLengh; i++) { Message take = queue.take(); System.out.format(printDate() + " 消息出队,属性name=%s%n",take.name); } System.out.println(printDate() + "结束!"); } static String printDate(){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(new Date()); } }
DelayQueue声明
DelayQueue声明如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
从DelayQueue声明可以看出,DelayQueue中的元素必须是Delayed接口的子类。
Delayed声明如下:
public interface Delayed extends Comparable<Delayed> { /** *以给定的时间单位返回与此对象关联的剩余延迟 */ long getDelay(TimeUnit unit); }
DelayQueue属性
/** *可重入锁 */ private final transient ReentrantLock lock = new ReentrantLock(); /** *缓存元素的优先级队列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** *特定的用于等待队列头中元素的线程 *Leader-Follower模式的变体形式 *用于最小化不必要的定时等待 */ private Thread leader = null; /** *当更新的元素在队列的开头变得可用时 *或在新线程可能需要成为领导者时,会发出条件信号 */ private final Condition available = lock.newCondition();
以上可以看出,延时队列主要使用优先级队列来实现,并辅以重入锁和条件来控制并发安全。
DelayQueue构造器
/** *默认构造器 */ public DelayQueue() {} /** *添加集合c中所有元素到队列中 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
DelayQueue入队
/* *将指定元素插入此延时队列 */ public boolean add(E e) { return offer(e); } /* *将指定元素插入此延时队列 *由于队列是无界的,因此该方法将永远不会被阻塞 */ public void put(E e) { offer(e); } /* *将指定元素插入此延时队列 *由于队列是无界的,因此该方法将永远不会被阻塞 */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }
以上几个方法都会调用offer()方法。
public boolean offer(E e) { //获取可重入锁 final ReentrantLock lock = this.lock; //可重入锁加锁 lock.lock(); try { //调用优先级队列的offer()方法入队 q.offer(e); //如果入队元素在队首,则唤醒一个出队线程 if (q.peek() == e) { leader = null; available.signal(); } //返回入队成功 return true; } finally { //解锁 lock.unlock(); } }
leader是等待获取队列头元素的线程,应用主从式设计减少不必要的等待。如果leader不为空,表示已经有线程在等待获取队列的头元素。
所以,通过await()方法让出当前线程等待信号。
如果leader为空,则把当前线程设置为leader,当一个线程为leader,它使用awaitNanos()方法让当前线程等待接收信号或等待delay时间。
DelayQueue出队
poll()方法
/* *检索并删除次队列的头 *如果此队列没有延迟过期的元素,则返回null */ public E poll() { //获取可重入锁 final ReentrantLock lock = this.lock; //可重入锁加锁 lock.lock(); try { //检索但不删除队列头部元素 E first = q.peek(); //如果first为null或者返回与此对象关联的剩余延迟时间大于0 //返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else //否则通过优先队列poll()方法出队 return q.poll(); } finally { //可重入锁解锁 lock.unlock(); } }
take()方法
/* *检索并除去此队列的头 *等待直到该队列上具有过期延迟的元素可用 */ public E take() throws InterruptedException { //获取可重入锁 final ReentrantLock lock = this.lock; //可重入锁加锁 lock.lockInterruptibly(); try { for (;;) { //检索但不删除队列头部元素 E first = q.peek(); //如果first为空 if (first == null) //在available条件上等待 available.await(); else { //如果first非空 //获取first的剩余延迟时间 long delay = first.getDelay(NANOSECONDS); //如果delay小于等于0 if (delay <= 0) //延迟时间到期,获取并删除头部元素 return q.poll(); //如果delay大于0,即延迟时间未到期 //将first置为null first = null; //如果leader线程非空 if (leader != null) //当前线程无限期阻塞 //等待leader线程唤醒 available.await(); else { //如果leader线程为空 //获取当前线程 Thread thisThread = Thread.currentThread(); //是当前线程成为leader线程 leader = thisThread; try { //当前线程等待剩余延迟时间 available.awaitNanos(delay); } finally { //如果当前线程是leader线程 //释放leader线程 if (leader == thisThread) leader = null; } } } } } finally { //如果leader线程为null并且队列不为空 //说明没有其他线程在等待,那就通知条件队列 if (leader == null && q.peek() != null) //通过signal()方法唤醒一个出队线程 available.signal(); //解锁 lock.unlock(); } }
take()方法总结:
- 获取锁。
- 取出优先级队列q的首元素。
- 如果元素q的队首为空则阻塞。
- 如果元素q的队首(first)不为空;获取这个元素的delay时间值,如果first的延迟delay时间小于等于0,说明元素已经到了可以使用的时间,调用poll()方法弹出该元素,跳出方法。
- 如果first的延迟delay时间大于0,释放元素first的引用,避免内存泄漏。
- 如果延迟delay时间大于0,leader非空,当前线程等待。
- 如果延迟delay时间大于0,leader为空,将当前线程设置为leader线程,等待剩余时间。
- 自旋,循环以上操作,直到return。
重载poll()方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException { //获取等待时间 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //如果first为空 if (first == null) { //如果nanos小于等于0 if (nanos <= 0) //返回null return null; else //如果nanos大于0 //等待nanos时间 nanos = available.awaitNanos(nanos); } else { //如果队首非空 //获取first的剩余延迟时间 long delay = first.getDelay(NANOSECONDS); //如果delay小于等于0 if (delay <= 0) //延迟时间到期,获取并删除头部元素 return q.poll(); //如果delay大于0 //如果nanos小于等于0 if (nanos <= 0) //返回null return null; //如果delay大于0且nanos大于0 //first置为null first = null; //如果nanos小于delay或者leader非空 if (nanos < delay || leader != null) //等待delay时间 nanos = available.awaitNanos(nanos); else { //如果nanos大于等于delay或者leader为空 //获取当前线程 Thread thisThread = Thread.currentThread(); //设置当前线程为leader leader = thisThread; try { //等待delay时间 long timeLeft = available.awaitNanos(delay); //修改nanos nanos -= delay - timeLeft; } finally { //如果当前线程为leader线程 //释放leader线程 if (leader == thisThread) leader = null; } } } } } finally { //如果leader为null并且队列不为空 //说明没有其他线程在等待,那就通知条件队列 if (leader == null && q.peek() != null) //通过singnal()方法唤醒一个出队线程 available.signal(); //解锁 lock.unlock(); } }
到此这篇关于Java无界阻塞队列DelayQueue详细解析的文章就介绍到这了,更多相关Java无界阻塞队列DelayQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!