java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Java的等待队列DelayQueue

Java多线程之等待队列DelayQueue详解

作者:爱喝咖啡的程序员

这篇文章主要介绍了Java多线程之等待队列DelayQueue详解,    DelayQueue被称作"等待队列"或"JDK延迟队列",存放着实现了Delayed接口的对象,对象需要设置到期时间,当且仅当对象到期,才能够从队列中被取走(并非一定被取走),需要的朋友可以参考下

一. 概念

DelayQueue被称作"等待队列"或"JDK延迟队列",存放着实现了Delayed接口的对象。对象需要设置到期时间,当且仅当对象到期,才能够从队列中被取走(并非一定被取走)。DelayQueue的内部使用了PriorityQueue来存放元素,需要元素实现Comparable接口,优先级队列会根据对象的到期时间实现有序排序。

二. 案例

本案例参考了《Java编程思想》第21章P726页的例子。

1. 定义延迟任务对象

class DelayedTask implements Runnable, Delayed { // Delayed接口必须实现,Runnable接口可以不实现
    private static int counter = 0;
    private final int id = counter++;
    /**
     * 延迟的时间(单位: 毫秒)
     */
    private final int delta;
    /**
     * 任务准备执行的时间点(单位: 纳秒)
     */
    private final long trigger;
    protected static List<DelayedTask> sequence = new ArrayList<>();
    public DelayedTask(int delayInMilliseconds) {
        delta = delayInMilliseconds;
        trigger = System.nanoTime() + NANOSECONDS.convert(delta, MILLISECONDS);
        sequence.add(this);
    }
    @Override
    public long getDelay(@NotNull TimeUnit unit) {
        // 过期时间
        return unit.convert(trigger - System.nanoTime(), NANOSECONDS);
    }
    /**
     * 用于在过期任务队列中,任务之间执行顺序的排序
     */
    @Override
    public int compareTo(@NotNull Delayed arg) {
        DelayedTask that = (DelayedTask)arg;
        if(trigger < that.trigger) return -1;
        if(trigger > that.trigger) return 1;
        return 0;
    }
    @Override
    public void run() {
        System.out.println(this + " ");
    }
    @Override
    public String toString() {
        // 语法规则: %[argument_index$][flags][width][.precision]conversion
        // %后面的1$指的是第一个参数,也就是delta
        // $后面的-4d指的是如果数据总长度不够4位,则由右向左补足空格
        return String.format("[%1$-4d]", delta) + " Task " + id;
    }
    public String summary() {
        return "(" + id + ":" + delta + ")";
    }
    public static class EndSentinel extends DelayedTask {
        private ExecutorService exec;
        public EndSentinel(int delay, ExecutorService e) {
            super(delay);
            exec = e;
        }
        @Override
        public void run() {
            for(DelayedTask pt : sequence) {
                System.out.println(pt.summary() + " ");
            }
            System.out.println();
            System.out.println(this + " Calling shutdownNow()");
            exec.shutdownNow();
        }
    }
}

2. 定义队列的消费者

class DelayedTaskConsumer implements Runnable {
    private DelayQueue<DelayedTask> q;
    public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {
        this.q = q;
    }
    @Override
    public void run() {
        try {
            while(!Thread.interrupted()) {
                // DelayQueue take()
                // 取出队列中的head元素,若队列中没有任何延迟到期的元素存在,则该方法将会被阻塞
                // 此时有两种可能: 1. 队列中没有元素 2. 队列中有元素,但都尚未过期
                // 直到队列中有延迟到期的元素为止
                // 即便是不让DelayedTask实现Runnable接口,本例的执行结果也不会发生改变。
                // 因为此处直接调用了run()方法,并没有分配其他线程去驱动DelayedTask
                q.take().run();
            }
        } catch (InterruptedException e) {
            // Acceptable way to exit
            System.out.println("Acceptable way to exit");
        }
        System.out.println("Finished DelayedTaskConsumer");
    }
}

3. main方法

public class DelayQueueDemo {
    public static void main(String[] args) {
        test2();
    }
    public static void test1() {
        Random random = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        // Fill with tasks that have random delays:
        for(int i = 0; i < 5; i++) {
            queue.put(new DelayedTask(random.nextInt(5000)));
        }
        //使用DelayQueue take()的方式获取任务
        queue.add(new DelayedTask.EndSentinel(5000, exec));
        exec.execute(new DelayedTaskConsumer(queue));
    }
    public static void test2() {
        Random random = new Random(47);
        ExecutorService exec = Executors.newCachedThreadPool();
        DelayQueue<DelayedTask> queue = new DelayQueue<>();
        // Fill with tasks that have random delays:
        for(int i = 0; i < 5; i++) {
            queue.put(new DelayedTask(random.nextInt(5000)));
        }
        //使用DelayQueue poll()方式获取任务
        while(queue.size() != 0) {
            // 每执行一次DelayQueue poll()且返回的元素不是null,则DelayQueue等待队列的元素个数会减一
            DelayedTask task = queue.poll();
            if(task != null) {
                System.out.println(LocalDateTime.now(ZoneId.of("+08:00")));
            }
        }
    }
}

由于案例中给Random设置了种子,因此过期时间的值是可以预测的(每次执行都保持一致)。test1()的执行结果如下:

 * [555 ] Task 1
 * [961 ] Task 4
 * [1693] Task 2
 * [1861] Task 3
 * [4258] Task 0
 * (0:4258)
 * (1:555)
 * (2:1693)
 * (3:1861)
 * (4:961)
 * (5:5000)

其中,[]方括号代表任务的执行顺序,()代表任务的创建顺序。显而易见,任务的创建顺序与执行顺序没有任何关系,任务严格按照延迟时间的长短运行。

三. 总结

1. 延迟队列中的对象只有到期后才能够从队列中被取走。(若没有到期或队头元素为null,则DelayQueue会陷入阻塞,说白了就是循环等待,可以参考DelayQueue的take()方法,写了一个for(;;) )

2. 对象并非到期后就会被立刻取走,每次取出(poll())的仅仅是到期元素中队头元素。

3. 任务的创建顺序与任务的执行顺序没有任何关系,延迟队列中任务的排序顺序与过期任务对Comparable接口compareTo()方法的具体实现有关。

四. 疑问

还是以下图为例

若在插入c之前,时间过去了100纳秒,c仍然应该排列在b之前吗?照此推论,只要把过期时长控制在500纳秒以内,所有插入的任务都应该在b之前执行(因为过期时长比b短),这是不是非常不合理?既然时间过去了100纳秒,为什么不将a的过期时长改变成900纳秒,b的过期时长改变成400纳秒,最后让b在c之前执行呢?

遗憾的是,在DelayQueue的源码中,并没有看到对容器PriorityQueue内部元素有做任何定时器,试图改变任务过期时长的代码。

到此这篇关于Java多线程之等待队列DelayQueue详解的文章就介绍到这了,更多相关Java的等待队列DelayQueue内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文