使用Java实现一个简单的定时器
作者:JWASX
1. 概要
说到定时任务,大家都不会模式,前几天在看到 Java 的 Timer 定时(延时)任务,就顺着 Timer - ScheduledThreadPoolExecutor - RocketMQ - Netty 这些框架看了下定时任务的一些核心逻辑实现,现在就写一个系列的文章来记录下这些源码的实现过程,如果有错误的地方欢迎指出。
定时任务在我们平时的项目中都会多多少少有接触,在很多场景下都非常有用,比如:
- 每天批量处理数据、生成报告等
- 会议提醒、邮件发送等
- 订单支付超时退款、清理临时文件、日志文件…
下面在介绍切入正题之前我们先自己实现一个最简单的定时/延时器
2. 注意问题
要实现一个定时任务,首先我们要考虑以下几点问题
- 如何让任务按照执行的时间排序(什么样的集合)
- 定时任务到期之后如何执行
- 如何实现任务执行多次
带着这三个问题,我们来考虑定时器的实现
首先是第一个问题,什么样的集合能满足我们的要求,需要满足按照固定顺序自动排序调整任务,用 JDK 的 PriorityQueue 优先队列就可以了,只需要设置按照任务的执行时间排序,当我们把任务添加到队列里面,队列就会按照任务的执行时间自动排序,这部分代码就不用我们考虑了
第二个问题,我们可以额外启动一个任务处理线程专门去循环队列处理里面的到期任务,如果任务没到期就阻塞等待,如果到期了就唤醒执行任务
第三个问题,添加任务的时候可以设置一个 Count 表示任务需要执行的次数,添加任务的时候就可以 for 循环去遍历添加
3. 实现
3.1 任务定义
首先定义一个任务 bean,里面包括任务的执行时间和执行的任务:
class Task { // 执行时间 private long executeTime; // 任务 Runnable runnable; public Task(long executeTime, Runnable runnable) { this.executeTime = executeTime; this.runnable = runnable; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Runnable getRunnable() { return runnable; } public void setRunnable(Runnable runnable) { this.runnable = runnable; } }
3.2 添加任务的方法
public boolean add(long addTime, Runnable runnable, int count) { long now = System.currentTimeMillis(); synchronized (queue) { for (int i = 0; i < count; i++) { // 计算任务的执行时间 long newTime = now + addTime * (i + 1); // 如果小于 now,说明溢出了 if (newTime < now) { throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit"); } Task min = getMin(); if (min == null || newTime < min.executeTime) { // 加入任务队列 queue.offer(new Task(newTime, runnable)); // 唤醒任务线程 queue.notifyAll(); continue; } // 加入任务队列 queue.offer(new Task(newTime, runnable)); } } return true; }
首先计算任务的执行时间,判断一下是否会溢出,如果溢出就抛出异常
再获取队首任务,如果添加的新任务执行时间比队首任务更块,调用 queue.notifyAll() 唤醒线程去执行任务
将任务加入队列中
3.3 任务线程
Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + ": 启动 execute-job-thread 线程"); while (true) { try { synchronized (queue) { if (queue.isEmpty()) { this.queue.wait(); } long now = System.currentTimeMillis(); while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) { Task task = queue.poll(); // 线程池执行任务 executor.execute(task.runnable); } if (queue.isEmpty()) { this.queue.wait(); } else { long div = queue.peek().executeTime - now; this.queue.wait(div); } } } catch (Exception e) { throw new RuntimeException(e); } } }, "execute-job-thread");
线程在一个死循环里面不断遍历队列看有没有任务值
- 如果没有任务,就阻塞等待
- 被唤醒后代表有任务,这时候获取当前时间,获取所有的过期的任务依次执行
- 执行完之后根据队列中是否还有任务来判断要阻塞多长时间
3.4 整体代码
PQueueService 代码如下:
public class PQueueService { private static final int MAX_COUNT = Integer.MAX_VALUE; private final PriorityQueue<Task> queue = new PriorityQueue<>((o1, o2) -> { return Long.compare(o1.executeTime, o2.executeTime); }); static final ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 32, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(16)); Thread thread = new Thread(() -> { System.out.println(Thread.currentThread().getName() + ": 启动 execute-job-thread 线程"); while (true) { try { synchronized (queue) { if (queue.isEmpty()) { this.queue.wait(); } long now = System.currentTimeMillis(); while (!queue.isEmpty() && now >= queue.peek().getExecuteTime()) { Task task = queue.poll(); // 线程池执行任务 executor.execute(task.runnable); } if (queue.isEmpty()) { this.queue.wait(); } else { long div = queue.peek().executeTime - now; this.queue.wait(div); } } } catch (Exception e) { throw new RuntimeException(e); } } }, "execute-job-thread"); public void init() { thread.start(); } public Task getMin() { if (queue.isEmpty()) { return null; } synchronized (queue) { return queue.peek(); } } public Task pollMin() { if (queue.isEmpty()) { return null; } synchronized (queue) { return queue.poll(); } } public boolean add(long addTime, Runnable runnable, int count) { long now = System.currentTimeMillis(); synchronized (queue) { for (int i = 0; i < count; i++) { long newTime = now + addTime * (i + 1); if (newTime < now) { throw new RuntimeException(Thread.currentThread().getName() + ": overflow long limit"); } Task min = getMin(); if (min == null || newTime < min.executeTime) { // 加入任务队列 queue.offer(new Task(newTime, runnable)); // 唤醒任务线程 queue.notifyAll(); continue; } // 加入任务队列 queue.offer(new Task(newTime, runnable)); } } return true; } class Task { private long executeTime; Runnable runnable; public Task(long executeTime, Runnable runnable) { this.executeTime = executeTime; this.runnable = runnable; } public long getExecuteTime() { return executeTime; } public void setExecuteTime(long executeTime) { this.executeTime = executeTime; } public Runnable getRunnable() { return runnable; } public void setRunnable(Runnable runnable) { this.runnable = runnable; } } }
Timer 类代码如下:
public class Timer { public static final Logger logger = LoggerFactory.getLogger(Timer.class); PQueueService pQueueService; public Timer() { pQueueService = new PQueueService(); pQueueService.init(); } /** * 执行任务 */ public void fixExecuteOne(int seconds, TimeUnit timeUnit, Runnable runnable, int count) { if (timeUnit.ordinal() != TimeUnit.SECONDS.ordinal()) { throw new RuntimeException("unknown unit"); } long time = seconds * 1000L; pQueueService.add(time, runnable, count); } public static final SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { Timer timer = new Timer(); // 1.添加一个10s后执行的任务 timer.fixExecuteOne(10, TimeUnit.SECONDS, () -> { System.out.println("10s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); // 2.添加一个3s后执行的任务 timer.fixExecuteOne(3, TimeUnit.SECONDS, () -> { System.out.println("3s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); // 3.添加一个20s后执行的任务 timer.fixExecuteOne(20, TimeUnit.SECONDS, () -> { System.out.println("20s任务 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); // 4.添加一个1s后执行的任务 timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任务1 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); // 5.添加一个1s后执行的任务 timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任务2 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); // 6.添加一个1s后执行的任务 timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任务3 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 2); try { Thread.sleep(30000); // 7.添加一个1s后执行的任务 timer.fixExecuteOne(1, TimeUnit.SECONDS, () -> { System.out.println("1s任务3 ===> " + Thread.currentThread().getName() + ": " + "执行了!!!!!, 当前时间:" + simpleDateFormat.format(new Date())); }, 1); Thread.sleep(30000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
执行结果:
execute-job-thread: 启动 execute-job-thread 线程
1s任务2 ===> pool-1-thread-2: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务1 ===> pool-1-thread-1: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务3 ===> pool-1-thread-3: 执行了!!!!!, 当前时间:2024-11-23 21:17:21
1s任务1 ===> pool-1-thread-4: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
1s任务2 ===> pool-1-thread-5: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
1s任务3 ===> pool-1-thread-6: 执行了!!!!!, 当前时间:2024-11-23 21:17:22
3s任务 ===> pool-1-thread-7: 执行了!!!!!, 当前时间:2024-11-23 21:17:23
3s任务 ===> pool-1-thread-8: 执行了!!!!!, 当前时间:2024-11-23 21:17:26
10s任务 ===> pool-1-thread-9: 执行了!!!!!, 当前时间:2024-11-23 21:17:30
10s任务 ===> pool-1-thread-10: 执行了!!!!!, 当前时间:2024-11-23 21:17:40
20s任务 ===> pool-1-thread-11: 执行了!!!!!, 当前时间:2024-11-23 21:17:40
30s后添加的1s任务 ===> pool-1-thread-12: 执行了!!!!!, 当前时间:2024-11-23 21:17:51
20s任务 ===> pool-1-thread-13: 执行了!!!!!, 当前时间:2024-11-23 21:18:00
4. 存在问题
上面实现了一个简单的定时器,但是这个定时器还是存在下面一些问题:
- 功能比较简单,没有其他的 remove 等操作
- 任务线程拿到任务之后会丢到另外一个线程池中执行,如果线程池中任务比较多,就会阻塞导致执行时间不准
- 代码中对 queue 进行了加锁,但是一个任务可以添加到多个 queue,这里没有做并发处理
- …
总之,这就是一个简单的 Timer Demo,主要了解下定时器/延时器的执行过程, 但是其实 JDK 里面的定时器/延时器实现流程都大差不差,下一篇文章,我们就来看看 Timer 的用法和源码
到此这篇关于使用Java实现一个简单的定时器的文章就介绍到这了,更多相关Java定时器内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!