java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > scheduledThreadPool的方法

Java多线程之scheduledThreadPool的方法解析

作者:竹下星空

这篇文章主要介绍了Java多线程之scheduledThreadPool的方法解析,queue是DelayedWorkQueue,但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量,需要的朋友可以参考下

scheduledThreadPool

我们对java中定时任务实现可能会有以下疑问:

怎样做到每个任务延迟指定时间执行?

内部使用了什么数据结构保存延迟任务?

延迟任务放入scheduledThreadPool时机并不固定,怎么保证按延迟时间顺序执行?

构造器

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
  new DelayedWorkQueue());
}

corePoolSize就是我们传过了的参数,maximumPoolSize是Integer.MAX_VALUE,所以最大线程是无穷大,非核心线程成活时间是0,所以非核心线程执行完firstTask之后如果poll任务没拿到任务则会直接销毁。queue是DelayedWorkQueue。但通过后面的分析可以知道,最大线程数是不起作用的,最多会起核心线程数的数量

schedule(Runnable command,long delay, TimeUnit unit)方法

public ScheduledFuture<?>schedule(Runnable command,
long delay,
   TimeUnit unit){
if(command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t =decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
 

delayedExecute方法

private voiddelayedExecute(RunnableScheduledFuture<?> task){
if(isShutdown())
reject(task);
else{
super.getQueue().add(task);
if(isShutdown()&&
!canRunInCurrentRunState(task.isPeriodic())&&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
 
  1. 使用queue.add方法把task放入queue
  2. 执行ensurePrestart方法

offer方法

public boolean offer(Runnable x){
if(x == null)
throw new NullPointerException();
    RunnableScheduledFuture<?> e =(RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
int i = size;
if(i >= queue.length)
grow();
size = i +1;
if(i ==0){
    queue[0]= e;
setIndex(e,0);
}else{
siftUp(i, e);
}
if(queue[0]== e){
    leader = null;
    available.signal();
}
} finally {
lock.unlock();
}
return true;
}
 
  1. DelayedWorkQueue底层使用的是RunnableScheduledFuture的数组,初始化容量是16,之后扩容是以1.5倍进行。
  2. 在offer元素整个过程中使用ReentrantLock进行加锁,所以DelayedWorkQueue是一个线程安全的队列。然后使用了condition来实现阻塞的功能,当poll没有元素时会使用await进行等待,当offer的是数组的第一个元素时会signal,这个signal的设计是排序的点睛之笔,设计的非常巧妙,这块需要offer和take方法一起来看,在take方法时会拿第一个元素来判断delay的时间,如果时间没到会使用await休眠delay时间,但此时如果有delay时间更短的任务放入queue中,此时需要take的任务就不是之前的那个任务了,就要重新执行逻辑获取这个最新delay的任务,这样才能做到任务的正确执行。
  3. 在offer元素时会使用siftUp方法来保证数组中元素是按delay时间从小到大排列,但要注意的是数组前半部分肯定都是排了delay最小的任务,但后半部分不一定是有序的

ensurePrestart()方法

voidensurePrestart(){
int wc =workerCountOf(ctl.get());
if(wc < corePoolSize)
addWorker(null, true);
elseif(wc ==0)
addWorker(null, false);
}
 

这个比较简单,addWorker方法之前我们也分析过了,需要注意的是这里的firstTask默认是空的,所以工作线程会直接从queue中拿任务。这有个比较奇怪的else if,感觉应该永远不用执行,因为wc==0肯定已经被if条件拦截了,也就是只能起核心线程数。最大线程数永远不会起作用

poll方法

public RunnableScheduledFuture<?>poll(long timeout, TimeUnit unit)
    throws InterruptedException {
long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
加锁
    lock.lockInterruptibly();
    try {
自旋
for(;;){
拿到queue中的第一个元素,如果是空则awaitNanos时间,等待时间过后如果queue中还是没有元素则返回null。
    RunnableScheduledFuture<?> first = queue[0];
if(first == null){
if(nanos <=0)
return null;
else
    nanos = available.awaitNanos(nanos);
}else{
拿到第一个任务的delay时间,如果到了delay时间则返回finishPoll方法的结果
long delay = first.getDelay(NANOSECONDS);
if(delay <=0)
returnfinishPoll(first);
如果传入的nanos小于等于0则返回null
if(nanos <=0)
return null;
first = null;// don't retain ref while waiting
如果等待时间还不够或前一个需要执行的任务还在执行,则当前线程直接等待
if(nanos < delay || leader != null)
    nanos = available.awaitNanos(nanos);
else{否则当前线程可以执行(leader线程),但需要awaitNanos delay的时间才能执行
    Thread thisThread = Thread.currentThread();
    leader = thisThread;
    try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
当等待时间到了之后就  leader = null说明此时可以返回finishPoll方法的结果
if(leader == thisThread)
    leader = null;
}
}
}
}
} finally {
if(leader == null && queue[0]!= null)
    available.signal();
lock.unlock();
}
}
  1. DelayedWorkQueue的poll方法也是使用reentrantLock来保证线程安全,然后使用condition.awaitNanos来达到等待特定时间的效果,这里使用leader线程保证了排在第一位的任务只有一个工作线程获取到,其他工作线程进行排队等待,在获取到第一个任务的工作线程delay时间到了之后会take到这个任务并signal排队的第一个工作线程继续获取下一个任务,周而复始。
  2. 在使用finishPoll方法返回delay时间到了的任务时会用siftDown对queue后半部分的任务进行排序,因为之前offer时使用siftUp方法只对queue前半部分进行了排序
  3. 回到ScheduledThreadPool线程池,keepAliveTime是0,所以当first任务的delay时间还没有到时会直接返回null,然后非核心工作线程就会直接销毁,之后的代码都不会执行,而核心线程则执行的take方法,take方法才会进入下面这段逻辑
if (leader != null)
      available.await();
  else {
    Thread thisThread = Thread.currentThread();
   leader = thisThread;
   try {
  available.awaitNanos(delay);
   } finally {
  if (leader == thisThread)
       leader = null;
       }
 }

scheduleAtFixedRate方法

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
  long initialDelay,
  long period,
  TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
 
  1. 这个方法跟之前schedule方法差不多了,都是使用了ScheduledFutureTask,这边多了个period变量保存执行周期值,outerTask引用了自身的对象,然后也是使用delayExecute方法把任务放入了queue中,此时任务的delay是initialDelay,所以会在initialDelay时间之后出队然后执行
  2. 由于现在工作线程中的task是ScheduledFutureTask,所以工作线程调用的task.run方法是ScheduledFutureTask.run方法

ScheduledFutureTask.run方法

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
cancel(false);
    else if (!periodic)
ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
    }
}
 

1.判断是不是周期执行的任务,之前的schedule方法的period是0,所以会执行super.run();然后执行传入的runnable中的run方法,而scheduleAtFixedRate方法的period不是0,则会执行super.runAndReset();方法,执行传入的runnable中的run方法之后执行setNextRunTime();

重新设置delay时间(initialDelay+period),然后把任务又放入queue中

scheduleWithFixedDelay方法

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
 long initialDelay,
 long delay,
 TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
  null,
  triggerTime(initialDelay, unit),
  unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
 

这个方法几乎跟scheduleAtFixedRate方法一模一样,区别在于period是个负数,通过之前我们对scheduleAtFixedRate方法的分析,period这个参数在算周期执行间隔时会用到,也就是setNextRunTime方法

setNextRunTime方法

private void setNextRunTime() {
    long p = period;
    if (p > 0)
time += p;
    else
time = triggerTime(-p);
}
 

当period大于0时,也就是scheduleAtFixedRate执行时,是直接在之前的time加上了period,而scheduleWithFixedDelay方法执行时,是用triggerTime方法在当前时间加上了periode,不同的计算方式的区别在于,scheduleAtFixedRate不会管任务的执行时间,我只要保证任务固定频率执行就好了,所以他是几乎精确的period时间执行,而scheduleWithFixedDelay是在任务之后的时间+period时间来确定下一次任务执行的时间,所以任务执行的频率相对来说不固定

到此这篇关于Java多线程之scheduledThreadPool的方法解析的文章就介绍到这了,更多相关scheduledThreadPool的方法内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文