Java中的Timer与TimerTask源码及使用解析
作者:夜聆离殇
一. Timer
在Java中,经常使用Timer来定时调度任务。
Timer 调度任务有一次性调度和循环调度,循环调度有分为固定速率调度(fixRate)和固定时延调度(fixDelay)。固定速率就好比你今天加班到很晚,但是到了第二天还必须准点到公司上班,如果你一不小心加班到了第二天早上 9 点,你就连休息的时间都没有了。而固定时延的意思是你必须睡够 8 个小时再过来上班,如果你加班到凌晨 6 点,那就可以下午过来上班了。固定速率强调准点,固定时延强调间隔。
Timer timer = new Timer(); TimerTask timerTask = new TimerTask() { @Override public void run() { // TODO Auto-generated method stub } }; //延迟1秒后开始调度任务 timer.schedule(timerTask, 1000); //延迟1秒,固定延迟1秒周期性调度 timer.schedule(timerTask, 1000, 1000); //延迟1秒,固定速率1秒周期性调度 timer.scheduleAtFixedRate(timerTask, 1000, 1000);
如果你有一个任务必须每天准点调度,那就应该使用固定速率调度,并且要确保每个任务执行时间不要太长,千万别超过了第二天这个点。如果你有一个任务需要每隔几分钟跑一次,那就使用固定时延调度,它不是很在乎你的单个任务要跑多长时间。
二. 内部源码
Timer 类里包含一个任务队列和一个异步轮训线程。任务队列里容纳了所有待执行的任务,所有的任务将会在这一个异步线程里执行,切记任务的执行代码不可以抛出异常,否则会导致 Timer 线程挂掉,所有的任务都没得执行了。单个任务也不易执行时间太长,否则会影响任务调度在时间上的精准性。比如你一个任务跑了太久,其它等着调度的任务就一直处于饥饿状态得不到调度。所有任务的执行都是这单一的 TimerThread 线程。
/** * The timer task queue. This data structure is shared with the timer * thread. The timer produces tasks, via its various schedule calls, * and the timer thread consumes, executing timer tasks as appropriate, * and removing them from the queue when they're obsolete. */ private final TaskQueue queue = new TaskQueue(); //调度队列 /** * The timer thread. */ private final TimerThread thread = new TimerThread(queue); //轮询线程
Timer 的任务队列 TaskQueue 是一个特殊的队列,它内部是一个数组。这个数组会按照待执行时间进行堆排序,堆顶元素总是待执行时间最小的任务。轮训线程会每次轮训出时间点最近的并且到点的任务来执行。数组会自动扩容,如果任务非常多。
private TimerTask[] queue = new TimerTask[128]; //增加调度任务时,如果长度超过队列长度,则扩容为原先2倍 void add(TimerTask task) { // Grow backing store if necessary if (size + 1 == queue.length) queue = Arrays.copyOf(queue, 2*queue.length); queue[++size] = task; fixUp(size); } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { //任意线程均可修改调度队列,因此会导致现场不安全,所以这边用同步。 if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { //调度队列增加任务时,其他线程也可以修改该任务,导致线程不安全,所以这边用同步 if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); //增加到调度队列中 if (queue.getMin() == task) queue.notify(); } }
三. 任务状态
TimerTask 有 4 个状态,VIRGIN 是默认状态,刚刚实例化还没有被调度。SCHEDULED 表示已经将任务塞进 TaskQueue 等待被执行。EXECUTED 表示任务已经执行完成。CANCELLED 表示任务被取消了,还没来得及执行就被人为取消了。
/** * The state of this task, chosen from the constants below. */ int state = VIRGIN; //默认状态是VIRGIN /** * This task has not yet been scheduled. */ static final int VIRGIN = 0; /** * This task is scheduled for execution. If it is a non-repeating task, * it has not yet been executed. */ static final int SCHEDULED = 1; /** * This non-repeating task has already executed (or is currently * executing) and has not been cancelled. */ static final int EXECUTED = 2; /** * This task has been cancelled (with a call to TimerTask.cancel). */ static final int CANCELLED = 3; /** * Next execution time for this task in the format returned by * System.currentTimeMillis, assuming this task is scheduled for execution. * For repeating tasks, this field is updated prior to each task execution. */ long nextExecutionTime; //下次调度执行时间 /** * Period in milliseconds for repeating tasks. A positive value indicates * fixed-rate execution. A negative value indicates fixed-delay execution. * A value of 0 indicates a non-repeating task. */ long period = 0; //间隔
对于一个循环任务来说,它不存在 EXECUTED 状态,因为它每次刚刚执行完成,就被重新调度了。EXECUTED 状态仅仅存在于一次性任务,而且这个状态其实并不是表示任务已经执行完成,它是指已经从任务队列里摘出来了,马上就要执行。
任务间隔字段 period 比较特殊,当使用固定速率时,period 为正值,当使用固定间隔时,period 为负值,当任务是一次性时,period 为零。
public void schedule(TimerTask task, long delay) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); sched(task, System.currentTimeMillis()+delay, 0); } public void schedule(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, -period); } public void scheduleAtFixedRate(TimerTask task, long delay, long period) { if (delay < 0) throw new IllegalArgumentException("Negative delay."); if (period <= 0) throw new IllegalArgumentException("Non-positive period."); sched(task, System.currentTimeMillis()+delay, period); }
对于固定速率来说,如果任务执行时间太长超出了间隔,那么它可能会持续霸占任务队列,因为它的调度时间将总是低于 currentTime,排在堆顶,每次轮训取出来的都是它。运行完毕后,重新调度这个任务,它的时间依旧赶不上。持续下去你会看到这个任务的调度时间远远落后于当前时间,而其它任务可能会彻底饿死。这就是为什么一定要特别注意固定速率的循环任务运行时间不宜过长。
四. 任务锁
Timer 的任务支持取消操作,取消任务的线程和执行任务的线程极有可能不是一个线程。有可能任务正在执行中,结果另一个线程表示要取消任务。这时候 Timer 是如何处理的呢?在 TimerTask 类里看到了一把锁。当任务属性需要修改的时候,都会加锁。
public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } } private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } }
在任务运行之前会检查任务是不是已经被取消了,如果取消了,就从队列中移除。一旦任务开始运行 run(),对于单次任务来说它就无法被取消了,而循环任务将不会继续下次调度。如果任务没有机会得到执行(时间设置的太长),那么即使这个任务被取消了,它也会一直持续躺在任务队列中。设想如果你调度了一系列久远的任务,然后都取消了,这可能会成为一个内存泄露点。所以 Timer 还单独提供了一个 purge() 方法可以一次性清空所有的已取消的任务。
public int purge() { int result = 0; synchronized(queue) { for (int i = queue.size(); i > 0; i--) { if (queue.get(i).state == TimerTask.CANCELLED) { queue.quickRemove(i); result++; } } if (result != 0) queue.heapify(); } return result; }
五. 任务队列为空
任务队列里没有任务了,调度线程必须按一定的策略进行睡眠。它需要睡眠一直到最先执行的任务到点时立即醒来,所以睡眠截止时间就是第一个任务将要执行的时间。同时在睡觉的时候,有可能会有新的任务被添加进来,它的调度时间可能会更加提前,所以当有新的任务到来时需要可以唤醒正在睡眠的线程。
private void mainLoop() { while (true) { try { TimerTask task; boolean taskFired; synchronized(queue) { // Wait for queue to become non-empty while (queue.isEmpty() && newTasksMayBeScheduled) queue.wait(); if (queue.isEmpty()) break; // Queue is empty and will forever remain; die // Queue nonempty; look at first evt and do the right thing long currentTime, executionTime; task = queue.getMin(); synchronized(task.lock) { if (task.state == TimerTask.CANCELLED) { queue.removeMin(); continue; // No action required, poll queue again } currentTime = System.currentTimeMillis(); executionTime = task.nextExecutionTime; if (taskFired = (executionTime<=currentTime)) { if (task.period == 0) { // Non-repeating, remove queue.removeMin(); task.state = TimerTask.EXECUTED; } else { // Repeating task, reschedule queue.rescheduleMin( task.period<0 ? currentTime - task.period : executionTime + task.period); } } } if (!taskFired) // Task hasn't yet fired; wait queue.wait(executionTime - currentTime); } if (taskFired) // Task fired; run it, holding no locks task.run(); } catch(InterruptedException e) { } } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } }
代码中的 wait() 方法就是调用了 Object.wait() 来进行睡眠。当有新任务进来了,发现这个新任务的运行时间是最早的,那就调用 notify() 方法唤醒轮训线程。
六. Timer终止
Timer 提供了 cancel() 方法清空队列,停止调度器,不允许有任何新任务进来。它会将 newTasksMayBeScheduled 字段设置为 false 表示 Timer 即将终止。
public void cancel() { synchronized(queue) { thread.newTasksMayBeScheduled = false; queue.clear(); queue.notify(); // In case queue was already empty. } } private void sched(TimerTask task, long time, long period) { if (time < 0) throw new IllegalArgumentException("Illegal execution time."); // Constrain value of period sufficiently to prevent numeric // overflow while still being effectively infinitely large. if (Math.abs(period) > (Long.MAX_VALUE >> 1)) period >>= 1; synchronized(queue) { if (!thread.newTasksMayBeScheduled) //调度器已经停止则抛出异常 throw new IllegalStateException("Timer already cancelled."); synchronized(task.lock) { if (task.state != TimerTask.VIRGIN) throw new IllegalStateException( "Task already scheduled or cancelled"); task.nextExecutionTime = time; task.period = period; task.state = TimerTask.SCHEDULED; } queue.add(task); if (queue.getMin() == task) queue.notify(); } }
我们还注意到 Timer.cancel() 方法会唤醒轮训线程,为的是可以立即停止轮训。不过如果任务正在执行中,这之后 cancel() 就必须等到任务执行完毕才可以停止。
到此这篇关于Java中的Timer与TimerTask源码及使用解析的文章就介绍到这了,更多相关Java中的Timer与TimerTask内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!