关于ScheduledThreadPoolExecutor不执行的原因分析
作者:Redick01
ScheduledThreadPoolExecutor不执行原因分析
最近在调试一个监控应用指标的时候发现定时器在服务启动执行一次之后就不执行了,这里用的定时器是Java的调度线程池 ScheduledThreadPoolExecutor ,后来经过排查发现 ScheduledThreadPoolExecutor 线程池处理任务如果抛出异常,会导致线程池不调度;下面就通过一个例子简单分析下为什么异常会导致 ScheduledThreadPoolExecutor 不执行。
ScheduledThreadPoolExecutor不调度分析
示例程序
在示例程序可以看到当计数器中的计数达到5的时候就会主动抛出一个异常,抛出异常后 ScheduledThreadPoolExecutor 就不调度了。
public class ScheduledTask {
private static final AtomicInteger count = new AtomicInteger(0);
private static final ScheduledThreadPoolExecutor SCHEDULED_TASK = new ScheduledThreadPoolExecutor(
1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "sc-task");
t.setDaemon(true);
return t;
}
});
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
SCHEDULED_TASK.scheduleWithFixedDelay(() -> {
System.out.println(111);
if (count.get() == 5) {
throw new IllegalArgumentException("my exception");
}
count.incrementAndGet();
}, 0, 5, TimeUnit.SECONDS);
latch.await();
}
}源码分析
- ScheduledThreadPoolExecutor#run
run方法内部首先判断任务是不是周期性的任务,如果不是周期性任务通过 ScheduledFutureTask.super.run(); 执行任务;如果状态是运行中或shutdown,取消任务执行;如果是周期性的任务,通过 ScheduledFutureTask.super.runAndReset() 执行任务并且重新设置状态,成功了就会执行 setNextRunTime 设置下次调度的时间,问题就是出现在 ScheduledFutureTask.super.runAndReset() ,这里执行任务出现了异常,导致结果为false,就不进行下次调度时间设置等
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}- *FutureTask#runAndReset
在线程任务执行过程中抛出异常,然后 catch 到了异常,最终导致这个方法返回false,然后 ScheduledThreadPoolExecutor#run 就不设置下次执行时间了,代码 c.call(); 抛出异常,跳过 ran = true; 代码,最终 runAndReset 返回false。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}注意:
Java 的 ScheduledThreadPoolExecutor 定时任务线程池所调度的任务中如果抛出了异常,并且异常没有捕获直接抛到框架中,会导致 ScheduledThreadPoolExecutor 定时任务不调度了,具体是因为当异常抛到 ScheduledThreadPoolExecutor 框架中时不进行下次调度时间的设置,从而导致 ScheduledThreadPoolExecutor 定时任务不调度。
ScheduledThreadPoolExecutor线程池例子
ScheduledThreadPoolExecutor 使用
ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor,主要用来给定时间运行任务,或者定期执行任务。
在 ScheduledThreadPoolExecutor 的实现中,使用了 FutureTask 运行任务以及使用无界队列 DelayedWorkQueue 来保存任务。
1. 使用示例
- 提交任务
ScheduledThreadPoolExecutor 实现了 ScheduledExecutorService 接口,其中,接口中有四个需要实现的方法,其中 schedule() 的两个方法需要设置任务以及任务启动的延迟时间,scheduleAtFixedRate() 可以设置任务定时重复执行,scheduleWithFixedDelay() 则是设置两个任务之间的执行延迟时间。
ScheduledThreadPoolExecutor poolExecutor = new ScheduledThreadPoolExecutor(2);
poolExecutor.schedule(() -> {
// 提交的任务
}, 5, TimeUnit.HOURS);
poolExecutor.scheduleAtFixedRate(() -> {
// 提交的任务
}, 0, 5, TimeUnit.HOURS);
poolExecutor.scheduleWithFixedDelay(() -> {
// 提交的任务
}, 0, 5, TimeUnit.HOURS);- 简单例子
下面的例子中每 500 毫秒打印一次字符串,executor 会有 5 秒的时间来等待任务执行结束,也就是一共可以打印 10 次字符串。
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10);
executor.scheduleWithFixedDelay(() -> {
System.out.println("测试");
}, 0, 500, TimeUnit.MILLISECONDS);
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}ScheduledThreadPoolExecutor 原理
1. DelayedWorkQueue
ScheduledThreadPoolExecutor 的构造方法对 DelayedWorkQueue 进行了初始化,并且最大线程数量设置成了 Integer.MAX_VALUE。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}其中,DelayedWorkQueue 中的队列是 RunnableScheduledFuture 类型以及容量为 16 的数组。
private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock();
队列添加任务是以 DelayedWorkQueue 以堆作为数据结构存储任务,在添加元素的时候,会使用基于 Siftup 版本进行元素添加,并且会根据任务的执行时间的大小来排序。
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 当队列的容量不够,会扩充 50%
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}2. delayedExecute()
ScheduledExecutorService 接口的四个实现方法中都涉及到了 delayedExecute(),方法主要用来判断线程池的状态以及对线程进行初始化。
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池关闭了,需要执行饱和策略
if (isShutdown())
reject(task);
else {
// 添加到队列中
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 判断等待队列中是否已经满了,会使用到 ThreadPoolExecutor
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。
