Spring定时任务关于@EnableScheduling的用法解析
作者:daliucheng
一切的开始(@EnableScheduling)
1.先放上示例代码
@Configuration @EnableScheduling public class MainApplicationBootStrap { @Bean public Bride bride(){ return new Bride(); } public static void main(String[] args) throws IOException { AnnotationConfigApplicationContext annotationConfigApplicationContext = new AnnotationConfigApplicationContext("com.lc.spring"); Bride bride = annotationConfigApplicationContext.getBean(Bride.class); System.out.println(bride); System.in.read(); } } //Bride类 public class Bride { private String name; private int count; private SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd : HH mm ss"); public void setName(String name) { this.name = name; } public String getName() { return name; } //每五秒执行一次。 @Scheduled(cron = "0/5 * * * * ?") public void sayHello2(){ System.out.println(simpleDateFormat.format(new Date()) + ":" + Thread.currentThread().getName() + ": "+ Bride.class.getName() + ": say hello2 " + count++ ); } }
首先看看@EnableScheduling注解里面有什么,再找个类上面spring已经很明确得告知,这个注解得作用和相关得拓展方式了,有兴趣可以下载看看。这里就不写了。
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Import(SchedulingConfiguration.class)// @Documented public @interface EnableScheduling { }
注意@Import注解导入得SchedulingConfiguration。@Import注解是@Configuration一块使用。这里就不分析在Spring里面怎么解析配置类得了,springboot自动装配原理注解@EnableAutoConfiguration启动得关键就在于这里。这部分得内容之后再写。
继续看,看看SchedulingConfiguration是什么,里面干了什么事情。
@Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) //spring里面自己用得bean,和用户自定没有关系 public class SchedulingConfiguration { @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() { return new ScheduledAnnotationBeanPostProcessor(); } }
在Spring里面bean有三种角色,
- ROLE_APPLICATION 用户自定义得bean
- ROLE_SUPPORT 辅助角色
- ROLE_INFRASTRUCTURE 表示完全和用户得bean没有关系,表示一个在容器里面自己使用得。
到这里就很肯定了,ScheduledAnnotationBeanPostProcessor
就是ScheduleTask
实现的重点。
破案了,总结一下
配置类上标注@EnableScheduling注解,@EnableScheduling里面聚合了@Import,@Import最终会导入一个ScheduledAnnotationBeanPostProcessor。
ScheduledAnnotationBeanPostProcessor(ScheduleTask实现的重点)
1. ScheduledAnnotationBeanPostProcessor类图
下面对ScheduledAnnotationBeanPostProcessor实现的接口逐一说明
- MergedBeanDefinitionPostProcessor是
BeanPostProcessor
,在BeanPostProcessor的基础上增加了postProcessMergedBeanDefinition,这个接口的主要的实现类如下,其中最重要的就是AutowiredAnnotationBeanPostProcessor
用于处理Autowired。
//在实例化出来之后,在调用postProcessAfterInitialization之前会调用postProcessMergedBeanDefinition。 void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName); //初始化之前 @Nullable default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } //重点是这个方法。等会看看在ScheduledAnnotationBeanPostProcessor里面干了什么事情。 @Nullable default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; }
DestructionAwareBeanPostProcessor
继承于BeanPostProcessor,在之前的基础上面,增加了两个方法,用于判断是否需要销毁,和用于销毁bean的之前调用。
void postProcessBeforeDestruction(Object bean, String beanName) throws BeansException; default boolean requiresDestruction(Object bean) { return true; }
- 实现了很多的rAware接口,Aware接口没有什么要说的。
SmartInitializingSingleton
在spring中bean创建完成之后的回调.
//在所有的bean实例化完成之后,如果bean实现了SmartInitializingSingleton接口,就会调用afterSingletonsInstantiated方法。 void afterSingletonsInstantiated()
ApplicationListener<ContextRefreshedEvent>
时间监听,范型里面的ContextRefreshedEvent表示关心的具体事件。- 在整个spring容器创建好,对象也创建好,SmartInitializingSingleton调用之后,一直在最后的最后,会发布ContextRefreshedEvent事件。
------------------------refresh方法 // Last step: publish corresponding event. finishRefresh(); ------------------------下面是finishRefresh具体的内容。 /** * Finish the refresh of this context, invoking the LifecycleProcessor's * onRefresh() method and publishing the * {@link org.springframework.context.event.ContextRefreshedEvent}. */ protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). clearResourceCaches(); // Initialize lifecycle processor for this context. 初始化 LifecycleProcessor initLifecycleProcessor(); // Propagate refresh to lifecycle processor first. getLifecycleProcessor().onRefresh(); // Publish the final event. 重点就是这个。发布ContextRefreshedEvent事件,表示活都干完了。 publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. LiveBeansView.registerApplicationContext(this); }
DisposableBean
在bean销毁的时候调用,bean销毁的时候的生命周期是,先调用DestructionAwareBeanPostProcessor#postProcessBeforeDestruction,接着是DisposableBean#destroy方法,后面才是用户自定义的destroy方法。
2. 针对上面接口几个重点方法说明
题外话,定时任务大体的实现是什么?
- 想尽方法拿到被@Schedule修饰的方法。
- 将这些方法上的@Schedule标注的解析,保存映射关系。
- 按照触发的条件来调度定时任务。
下面会根据这种逻辑来解析Spring中的定时任务。
1. 想尽方法拿到被@Schedule修饰的方法。
相关方法 postProcessAfterInitialization
首先要知道,postProcessAfterInitialization在SpringBean的生命周期中在那一个环节,要解视这个问题,要先知道SpringBean的生命周期是什么?那么这个就繁琐了,生命周期的文档多的是,找一个看看就可以。简单的说,就是Spring在初始化完成的最后一步会调用postProcessAfterInitialization。当然,代理对象的创建也是在这里。
那么,下面就对源码分析分析,源码不难,看的懂,并且我添加了注释
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //得到没有被装饰的最原始的类,就是cglib增强之前的原始的类,并且这里也能说明CGLib是通过继承来实现增强的 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass)) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,//找处了所有的被Scheduled标注的方法 (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { // Non-empty set of methods annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isDebugEnabled()) { logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
2. 将这些方法上的@Schedule标注的解析,保存映射关系(processScheduled方法解析)
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { //断言,判单方法是否有参数,如果有参数就报错 Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");//这里会判断 getParameterCount==0,如果不是0的话,就报错。这也是spring里面定时任务比较鸡肋的方法,但是这个我觉得并没有啥子问题,谁在定时任务执行的时候需要传递参数 //判断这个方法是否是静态,私有的, Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass()); //将需要执行的方法封装成ScheduledMethodRunnable,这个类实现很简单,就俩属性, //target表示bean //method表示需要执行的方法 Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod); //标志位,开始都是false,只要找到@Scheduled注解,并且解析到参数,就是false,后面还会对他进行判断。 boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; //存放组装好的ScheduledTask, Set<ScheduledTask> tasks = new LinkedHashSet<>(4); //判断initialDelay long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); //这里的initialDelayString就是后面的这个样子 "PT20.345S" -- parses as "20.345 seconds" "PT15M" -- parses as "15 minutes" (where a minute is 60 if (this.embeddedValueResolver != null) { //这个意味着,这里的initialDelayString是可以写SPEL表达式的。embeddedValueResolver处理器很常见,会从环境中替换值 initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try {//最后还是得解析成initialDelay。 initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } //检查cron表达式 String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) {//这里也能处理 //事实就是这样,利用embeddedValueResolver来处理值,很巧妙。 cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); //解析到cron之后,标志位变为true processedSchedule = true; TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } //将cron表达式变为CronTrigger, //将runnable(ScheduledMethodRunnable)变为CronTask //将CronTask变为scheduleCronTask,并且将CronTask添加到 registrar的cronTasks属性去,并还维护了CronTask和scheduleCronTask的映射关系。 tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } .........省略部分代码.......这些代码和上面的cron都是一样的,不同的是task的类型不同。其余都是一样的。 // Check whether we had any attribute set 检查一下,检查是@schedule里面必须的参数是否都有 Assert.isTrue(processedSchedule, errorMessage); //在解析完成之后,将bean和@schedule保存在map里面,map的key是bean,value是set,set存放的是这个bean里面被@schedule标注方法的集合, // 也就是ScheduledTask集合 //但是这里的加锁操作,我没有看懂,不知道这个是干嘛的? //这里会有并发的问题吗?首先他是在postProcessAfterInitialization方法里面起作用的,这个方法在spring解析bean的时候起作用的 //并且spring调用beanPostProcess都是顺序调用。不存在并发问题。 // 所以这里的锁,我没有看懂。 synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
问题:
上面代码中加锁操作我没有看懂,有大佬能告知我一下。
说明:
对于上面代码中几个重点的类说明
1.ScheduledMethodRunnable
封装了bean和@Schedule标注的方法,将他俩封装为一个Runnable。
2.CronTrigger
Trigger表示触发器,在@Schedule注解里面每一个类型都对应不同的Trigger。
Trigger里面最核心的方法是`Date nextExecutionTime(TriggerContext triggerContext);` 下一次执行的时间,那么对于Cron或者PeriodTigger都是计算下一次执行的时间。
3.CronTask
Task表示任务,task不是接口,是一个类。
task中最核心的方法是getRunnable,TiggerTask在它的基础上增加了getTrigger,CronTask在之前的基础上增加了getExpression()
4.ScheduledTask
首先,他是final的
有两个属性值
private final Task task;//任务 //保留的是ScheduledFuture的引用,ScheduledFuture是scheduledExecutorService提交任务之后的引用对象。 //ScheduledFuture<?> scheduledFuture = scheduledExecutorService.scheduleWithFixedDelay(() -> { // }, 0, 0, TimeUnit.SECONDS); @Nullable volatile ScheduledFuture<?> future;
总的来说,ScheduledTask保留了task提交给scheduledExecutorServic之后的引用对象和task任务。
5.ScheduledTaskRegistrar
- InitializingBean 在调用自定义init方法之前调用
- DisposableBean 上面说了,在调用自定义destroy方法之前调用
- ScheduledTaskHolder,通过这个接口能拿到所有的ScheduledTask
通过ScheduledTaskRegistrar可以注册ScheduledTask,也能拿到ScheduledTask,所以,之后的重点就看看ScheduledTaskRegistrar
的逻辑
到这里,已经完成了从bean中检索Scheduled修饰的方法,并且解析ScheduledTask注解的属性,转换为对应的Bean,通过ScheduledTaskRegistrar注册到ScheduledTaskRegistrar里面。
3. 按照触发的条件来调度定时任务。(onApplicationEvent)
前面说过,ScheduledAnnotationBeanPostProcessor
实现了ApplicationListener<ContextRefreshedEvent>
接口,Spring会在所有的活都干完之后,发布一个ContextRefreshedEvent事件。重点就在于它。下面看看它里面干了什么事情
@Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext() == this.applicationContext) { // Running in an ApplicationContext -> register tasks this late... // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). finishRegistration(); } }
finishRegistration
重点是它。继续冲
protected void scheduleTasks() { //如果说在spring中没有 TaskScheduler的实现类,也没有ScheduledExecutorService的实现类,那就自己默认来一个, // Executors.newSingleThreadScheduledExecutor(); 核心线程是1,但是最大线程数是Max。 // 通过ConcurrentTaskScheduler包装。 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } //下面就是挨个从之前注册的这些task里面从 ScheduledTaskRegistrar里面维护的双向映射关系中获取scheduledTask,通过 // ScheduledTaskRegistrar中的taskScheduled提交任务,将返回的Future对象保存在ScheduledTask引用里面。 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
scheduleCronTask
看看他里面的的代码逻辑,别的都大同小异。差别就在于,task的种类可能不一样,并且提交个taskSchedule的方法可能不一样
//首先这个方法是ScheduledTaskRegistrar的 @Nullable public ScheduledTask scheduleCronTask(CronTask task) { //之前保存在解析schedule的时候保存的CronTask和ScheduledTask之前的引用关系。 ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; //这里肯定是有的,所以,肯定不会从这里走,所以newTask肯定是fasle,那这个方法的返回值肯定是null if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } //提交任务 if (this.taskScheduler != null) { //提交任务,接下来就看看taskScheduler相关的就好了 scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else { addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
上面的逻辑是提交的整个流程,下面,在来看看TaskScheduler的相关东西。最后的任务肯定都是通过他来执行的。重中之重
4. TaskScheduler(重中之重)分析
注意:这个接口默认实现是ThreadPoolTaskScheduler,下面就针对ThreadPoolTaskScheduler
和cronTask
来做详细的说明
TaskScheduler 任务的接口里面的详细的方法,在这里就不展示了,因为太多了,这里就用ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
来做详细的说明
**提示:**默认在Spring里面是不会自动创建的,需要手动的声明@Bean,这里要注意,他InitializingBean和DisposableBean。这里面肯定有初始化操作和销毁操作
继续看,接着上面看 schedule方法详情
@Override @Nullable public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { //得到执行器 //关于这个执行器的参数的设置可以看ExecutorConfigurationSupport, ScheduledExecutorService executor = getScheduledExecutor(); try { //错误处理的handle。有两种handle,一种是出错了之后打印日志,一种是抛异常 // LoggingErrorHandler // PropagatingErrorHandler ErrorHandler errorHandler = this.errorHandler; if (errorHandler == null) { errorHandler = TaskUtils.getDefaultErrorHandler(true); } //将执行器,task,trigger包装成ReschedulingRunnable,schedule方法通过将这个任务提交给executor,但是这里写的很巧妙 return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule(); } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex); } }
补充:
1.创建执行器线程的相关逻辑要看ExecutorConfigurationSupport
这里列举线程池配置的参数
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
线程的优先级是 5
线程名字
return getThreadNamePrefix() + this.threadCount.incrementAndGet();
默认的前缀是
return ClassUtils.getShortName(getClass()) + "-";
corePoolSize=1 maximumPoolSize = MAX_VALUE ttl=0 queue=new DelayedWorkQueue(), //这是xecutor默认的队列
2.Executor执行失败之后,异常处理默认策略有两种(主要实现ErrorHandler接口,并且通过set方法也可以设置)
- LoggingErrorHandler(默认)
- PropagatingErrorHandler(打日志,之后报错)
到这里,还差最后一步,就是怎么按照cron表达式来运行定时任务,并且,到现在为止,没看到CronTrigger的nextExecutionTime
5. 怎么做调度(ReschedulingRunnable的具体实现)
首先他是一个ScheduledFuture。在这里主要看两个方法run
和schedule
,这里的实现很巧妙。
- schedule方法
@Nullable public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { //获取下次任务执行的事件 this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } //算出需要延迟启动的事件 long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); //延迟启动,提交给executor的schedule this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } }
- run方法
@Override public void run() { Date actualExecutionTime = new Date(); //开始跑任务,这里是通过反射调用的, //要知道这里的runnable是ScheduledMethodRunnable对象,反射调用就在ScheduledMethodRunnable里面的run方法 //并且这里也有执行失败之后的异常处理 super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { //scheduledExecutionTime这个不是null,因为在调用的时候是先调用schedule方法的,在这个方法里面设置了scheduledExecutionTime(下次执行的时间) Assert.state(this.scheduledExecutionTime != null, "No scheduled execution"); //更新triggerContext,triggerContext是在这个类里面直接new出来的 //上次执行时间,上次执行的耗费的真正时间,完成任务时间 this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { //继续调用schedule,这就是精髓,今天的重点。继续注册,继续循环, schedule(); } } }
这两个方法涉及的确实很精妙,避免了间隔的问题,达到时间绝对的标准。
定时任务还有有一种设计方法
1.将注解里面的cron解析出来,然后维护在一个地方(内存或者redis)
2.有两个组件,调度器和执行器,
- 调度器
- 每间隔多少秒去遍历所有的任务,看看下次执行的时间是否比当前时间小,如果小,那就说明改执行了,就将方法传递给执行器,执行器来执行任务,调度器还和之前一样,间隔遍历所有任务
- 执行器
- 执行任务
但是这样存在一个问题,比如间隔10秒,但是任务是需要4秒执行一次,这就出现问题了
但是,Spring的这种方式是觉不会出现这样的问题,并且这种方式还支持取消任务。因为维护了Future引用。
重点
ReschedulingRunnable中的schedule和run方法之前的关系,如果有一个任务每间隔5秒要执行一次。
- 开始的时候是调用schedule方法,这个方法里面计算出下次执行的时间,并且算出当前时间和下次执行时间的差值,然后通过延迟启动,就能获取精确的启动时间。
- 提交之后就会运行run方法,run方法会调用ScheduledMethodRunnable的run方法,ScheduledMethodRunnable里面会通过反射调用方法。
- 运行完成之后,会更新Context,保留这次执行的相关信息,然后判断future是否取消,没有取消就继续调用schedule方法,本次已经执行完成,下次的任务就继续开始了。
到这里,关于Spring中定时任务实现的已经结束了。不禁感叹,这种实现方式真的很精妙。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。