Spring中的@Scheduled源码解析
作者:木棉软糖
@Scheduled源码解析
解析部分
定时任务调度的基础是ScheduledAnnotationBeanPostProcessor类,这是一个实现了BeanPostProcessor接口的后置处理器。
关于BeanPostProcessor,最主要就是看postProcessBeforeInitialization方法和postProcessAfterInitialization方法做了什么逻辑。 postProcessBeforeInitialization方法没有实现逻辑,所以看postProcessAfterInitialization方法的逻辑。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { /** * 上面省略部分代码,看下面的关键代码 */ if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { /** * 这里一长串代码是为了获取被@Scheduled和@Schedules注解的方法 */ Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledMethods.isEmpty() ? scheduledMethods : null); }); //如果没有被@Scheduled和@Schedules注解的方法,当前bean加入到nonAnnotatedClasses集合中,不进行处理 if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); if (logger.isTraceEnabled()) { logger.trace("No @Scheduled annotations found on bean class: " + targetClass); } } else { //如果存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法 annotatedMethods.forEach((method, scheduledMethods) -> scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean))); if (logger.isTraceEnabled()) { logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods); } } } return bean; }
根据以上代码,总结出ScheduledAnnotationBeanPostProcessor 类做的事情:
(1)获取被@Scheduled和@Schedules注解标记的方法,若没有,将此Bean加入到nonAnnotatedClasses集合中。
(2)存在被@Scheduled和@Schedules注解的方法,针对每个方法调用processScheduled方法
所以,接下来就是分析关键在于processScheduled方法做的逻辑
protected void processScheduled(Scheduled scheduled, Method method, Object bean) { try { //将被注解的方法封装为ScheduledMethodRunnable Runnable runnable = createRunnable(bean, method); boolean processedSchedule = false; String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required"; Set<ScheduledTask> tasks = new LinkedHashSet<>(4); // 解析initialDelay 的值,字符串和整型值不能同时配置 long initialDelay = scheduled.initialDelay(); String initialDelayString = scheduled.initialDelayString(); if (StringUtils.hasText(initialDelayString)) { Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both"); if (this.embeddedValueResolver != null) { initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString); } if (StringUtils.hasLength(initialDelayString)) { try { initialDelay = parseDelayAsLong(initialDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long"); } } } // Check cron expression String cron = scheduled.cron(); if (StringUtils.hasText(cron)) { String zone = scheduled.zone(); if (this.embeddedValueResolver != null) { cron = this.embeddedValueResolver.resolveStringValue(cron); zone = this.embeddedValueResolver.resolveStringValue(zone); } if (StringUtils.hasLength(cron)) { Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers"); processedSchedule = true; if (!Scheduled.CRON_DISABLED.equals(cron)) { TimeZone timeZone; if (StringUtils.hasText(zone)) { timeZone = StringUtils.parseTimeZoneString(zone); } else { timeZone = TimeZone.getDefault(); } tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))); } } } // At this point we don't need to differentiate between initial delay set or not anymore if (initialDelay < 0) { initialDelay = 0; } // Check fixed delay long fixedDelay = scheduled.fixedDelay(); if (fixedDelay >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } String fixedDelayString = scheduled.fixedDelayString(); if (StringUtils.hasText(fixedDelayString)) { if (this.embeddedValueResolver != null) { fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString); } if (StringUtils.hasLength(fixedDelayString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedDelay = parseDelayAsLong(fixedDelayString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))); } } // Check fixed rate long fixedRate = scheduled.fixedRate(); if (fixedRate >= 0) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } String fixedRateString = scheduled.fixedRateString(); if (StringUtils.hasText(fixedRateString)) { if (this.embeddedValueResolver != null) { fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString); } if (StringUtils.hasLength(fixedRateString)) { Assert.isTrue(!processedSchedule, errorMessage); processedSchedule = true; try { fixedRate = parseDelayAsLong(fixedRateString); } catch (RuntimeException ex) { throw new IllegalArgumentException( "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long"); } tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))); } } // Check whether we had any attribute set Assert.isTrue(processedSchedule, errorMessage); // Finally register the scheduled tasks synchronized (this.scheduledTasks) { Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4)); regTasks.addAll(tasks); } } catch (IllegalArgumentException ex) { throw new IllegalStateException( "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage()); } }
根据以上代码,大致逻辑如下:
(1)解析initialDelay的值
(2)根据@Scheduled注解的属性配置,分别将此bean的被注解//方法封装为CronTask,FixedDelayTask,FixedRateTask
(3)this.registrar 根据封装的任务类型使用对应的调度方法scheduleXXX 若此时taskScheduler局部变量还没有初始化完成,那么将会加入到一个临时的集合存起来,不进行调度,这个taskScheduler可以看做是一个调度任务专用的线程池
(4)调度方法返回的结果加入到tasks集合中
(5)然后按照bean分类,放入scheduledTasks集合(以bean为key的Map集合)
其中@Scheduled注解 的限制如下:
(1)cron表达式不能与initialDelay,fixedDelay,fixedRate一起配置
(2)fixedDelay不能与cron同时设置
(3)fixedRate不能与cron 同时配置
(4)fixedDelay 和fixedRate不能同时配置
至此postProcessAfterInitialization方法执行完成。 粗略总结一下,这个方法就是把@Scheduled注解的方法解析出来,然后转化为ScheduledTask,这大概是代表了一个定时任务的对象,然后再按bean分组存放到一个Map集合中。
执行部分
经过验证,其实上面在执行postProcessAfterInitialization方法,taskScheduler还是为null的,也就是说,各个定时任务实际上还是没办法开始调度执行。 举个例子:
@Nullable public ScheduledTask scheduleFixedRateTask(FixedRateTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } if (this.taskScheduler != null) { if (task.getInitialDelay() > 0) { Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay()); scheduledTask.future = this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), startTime, task.getInterval()); } else { scheduledTask.future = this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval()); } } else { addFixedRateTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
此时由于taskScheduler 为null,因此没有执行this.taskScheduler.scheduleAtFixedRate方法,而是调用了addFixedRateTask(task)。(经过测试,就算自定义了taskScheduler,也不会在这时候赋值的) 那上面的this.taskScheduler.scheduleAtFixedRate方法 在什么执行?带着这个疑问,调试打点,最终发现在onApplicationEvent方法中它才会执行调度,此时taskScheduler不为空。
ScheduledAnnotationBeanPostProcessor 类实现了ApplicationListener接口,监听ContextRefreshedEvent 事件。根据以前学习的Spirng加载流程,ContextRefreshedEvent 事件是Spring容器加载完成之后,执行finishRefesh方法时发布的。 在监听方法里面主要执行了finishRegistration()方法
private void finishRegistration() { //片段1 if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } //省略若干代码.... this.registrar.afterPropertiesSet(); }
有一个地方,个人觉得值得了解的: 片段1:自定义调度线程池时实现了SchedulingConfigurer接口 的configureTasks方法,这个方法就是在片段1执行的。
然后之后比较重要的。主要看this.registrar.afterPropertiesSet方法 this.registrar.afterPropertiesSet方法里面调用了scheduleTasks()方法
protected void scheduleTasks() { //片段1 if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } //片段2 if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
片段1:this.taskScheduler 如果为null,则使用Executors.newSingleThreadScheduledExecutor()。
如果是自定义线程池,则不会执行,因为此时已经赋值了。
片段2:根据不同的定时任务类型,分别调用不同的调度API
这里的this.cronTasks,this.fixedRateTasks,this.fixedDelayTasks 就是上面执行processScheduled方法时,因为this.taskScheduler 为null而把定时任务临时存放的地方。 因为现在已经有this.taskScheduler ,因此正式将它们加入调度,并放入scheduledTasks 集合中(已经参与调度的不会重复加入)。
小结
(1)从这个源码分析,可以知道通过实现SchedulingConfigurer接口自定义调度线程池的配置
(2)@Scheduled注解 的限制,不能同时配置多种任务类型
到此这篇关于Spring中的@Scheduled源码解析的文章就介绍到这了,更多相关@Scheduled源码解析内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!