java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring定时任务@Scheduled

Spring的定时任务@Scheduled源码详解

作者:HengTian_real

这篇文章主要介绍了Spring的定时任务@Scheduled源码详解,@Scheduled注解是包org.springframework.scheduling.annotation中的一个注解,主要是用来开启定时任务,本文提供了部分实现代码与思路,需要的朋友可以参考下

Spring的定时任务@Scheduled源码详解

EnableScheduling

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})	//@Import注解将SchedulingConfiguration导入到IOC中
@Documented
public @interface EnableScheduling {
}

SchedulingConfiguration

将ScheduledAnnotationBeanPostProcessor加入到Spring容器中 @Configuration //当结合@Bean注解时,@Bean注解的类可以类比在spring.xml中定义

@Role(2)
public class SchedulingConfiguration {
    public SchedulingConfiguration() {
    }
    @Bean(
        name = {"org.springframework.context.annotation.internalScheduledAnnotationProcessor"}
    )
    @Role(2)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }
}

ScheduledAnnotationBeanPostProcessor

1、先看下ScheduledAnnotationBeanPostProcessor有哪些属性

//string属性解析器,用来解析${}对应的配置文件的属性,aware接口注入
@Nullable
private StringValueResolver embeddedValueResolver;
@Nullable
private String beanName;
@Nullable
private BeanFactory beanFactory;	//aware接口注入
@Nullable
private ApplicationContext applicationContext;	//aware接口注入
@Nullable
//定时任务线程池,如果不为空使用这个scheduler当作ScheduledTaskRegistrar的线程池
private Object scheduler;
//定时任务的注册器,通过这个类将定时任务委托给定时任务线程池
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
//已检测的没有scheduled注解的类的集合
private final Set<Class<?>> nonAnnotatedClasses = Collections.newSetFromMap(new ConcurrentHashMap(64));
//保存class与scheduled方法的映射
private final Map<Object, Set<ScheduledTask>> scheduledTasks = new IdentityHashMap(16);

2、根据上面的定时任务流程,在每个Bean的属性填充完之后,调用postProcessAfterInitialization方法,将带有@Scheduled 注解的方法,在拿到@Scheduled注解的方法后,调用processScheduled

public Object postProcessAfterInitialization(Object bean, String beanName) {
    Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    if (!this.nonAnnotatedClasses.contains(targetClass)) {
        Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (method) -> {
            Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
            return !scheduledMethods.isEmpty() ? scheduledMethods : null;
        });
        if (annotatedMethods.isEmpty()) {
            this.nonAnnotatedClasses.add(targetClass);
            if (this.logger.isTraceEnabled()) {
                this.logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
            }
        } else {
            annotatedMethods.forEach((method, scheduledMethods) -> {
                scheduledMethods.forEach((scheduled) -> {
                    //这里调用
                    this.processScheduled(scheduled, method, bean);
                });
            });
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName + "': " + annotatedMethods);
            }
        }
    }
    return bean;
}

3、要了解processScheduled方法做了什么,可以先看下@Scheduled 注解的定义

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
    String cron() default "";
    String zone() default "";
    long fixedDelay() default -1L;
    String fixedDelayString() default "";
    long fixedRate() default -1L;
    String fixedRateString() default "";
    long initialDelay() default -1L;
    String initialDelayString() default "";
}

可以看到,processScheduled方法主要是使用embeddedValueResolver对带String后缀的属性进行从配置文件读取的操作,根据每个方法上使用的注解判断定时任务的类型是CronTask还是FixedRateTask,将这些任务添加到ScheduledTaskRegistrar中的unresolvedTasks

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
        Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
        Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
        boolean processedSchedule = false;
        String errorMessage = "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
        Set<ScheduledTask> tasks = new LinkedHashSet(4);
        long initialDelay = scheduled.initialDelay();
        String initialDelayString = scheduled.initialDelayString();
        if (StringUtils.hasText(initialDelayString)) {
            Assert.isTrue(initialDelay < 0L, "Specify 'initialDelay' or 'initialDelayString', not both");
            if (this.embeddedValueResolver != null) {
                initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
            }
            if (StringUtils.hasLength(initialDelayString)) {
                try {
                    initialDelay = parseDelayAsLong(initialDelayString);
                } catch (RuntimeException var25) {
                    throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                }
            }
        }
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay == -1L, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                TimeZone timeZone;
                if (StringUtils.hasText(zone)) {
                    timeZone = StringUtils.parseTimeZoneString(zone);
                } else {
                    timeZone = TimeZone.getDefault();
                }
                tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
            }
        }
        //省略了一部分解析过程,和解析cron是一样的
        ...
            ...
            Assert.isTrue(processedSchedule, errorMessage);
        synchronized(this.scheduledTasks) {
            Set<ScheduledTask> registeredTasks = (Set)this.scheduledTasks.get(bean);
            if (registeredTasks == null) {
                registeredTasks = new LinkedHashSet(4);
                this.scheduledTasks.put(bean, registeredTasks);
            }
            ((Set)registeredTasks).addAll(tasks);
        }
    } catch (IllegalArgumentException var26) {
        throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + var26.getMessage());
    }
}

对于3中,比较关键的代码

tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));

跟踪到this.registrar.scheduleCronTask(),这里跳转到ScheduledTaskRegistrar类的scheduleCronTask()

public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        //创建ScheduledTask
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    //可以看到ScheduledTaskRegistrar的初始化方法中没有对taskScheduler赋值
    //所以此时this.taskScheduler = null
    if (this.taskScheduler != null) {
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    } else {
        //进入这里
        this.addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }
    return newTask ? scheduledTask : null;
}

4、在所有单例的Bean实例化完成后,调用afterSingletonsInstantiated() ,在Spring容器初始化完成后,触发ContextRefreshedEvent 事件,调用onApplicationEvent方法,执行finishRegistration()

private void finishRegistration() {
    	//对应a
       if (this.scheduler != null) {
           this.registrar.setScheduler(this.scheduler);
       }
   	//对应b
       if (this.beanFactory instanceof ListableBeanFactory) {
           Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory)this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
           List<SchedulingConfigurer> configurers = new ArrayList(beans.values());
           AnnotationAwareOrderComparator.sort(configurers);
           Iterator var3 = configurers.iterator();
           while(var3.hasNext()) {
               SchedulingConfigurer configurer = (SchedulingConfigurer)var3.next();
               configurer.configureTasks(this.registrar);
           }
       }
   	//对应c
       if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
           Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
           try {
               this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
           } catch (NoUniqueBeanDefinitionException var9) {
               this.logger.debug("Could not find unique TaskScheduler bean", var9);
               try {
                   this.registrar.setTaskScheduler((TaskScheduler)this.resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
               } catch (NoSuchBeanDefinitionException var8) {
                   if (this.logger.isInfoEnabled()) {
                       this.logger.info("More than one TaskScheduler bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var9.getBeanNamesFound());
                   }
               }
           } catch (NoSuchBeanDefinitionException var10) {
               this.logger.debug("Could not find default TaskScheduler bean", var10);
               try {
                   this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
               } catch (NoUniqueBeanDefinitionException var6) {
                   this.logger.debug("Could not find unique ScheduledExecutorService bean", var6);
                   try {
                       this.registrar.setScheduler(this.resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                   } catch (NoSuchBeanDefinitionException var5) {
                       if (this.logger.isInfoEnabled()) {
                           this.logger.info("More than one ScheduledExecutorService bean exists within the context, and none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' (possibly as an alias); or implement the SchedulingConfigurer interface and call ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + var6.getBeanNamesFound());
                       }
                   }
               } catch (NoSuchBeanDefinitionException var7) {
                   this.logger.debug("Could not find default ScheduledExecutorService bean", var7);
                   this.logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
               }
           }
       }
       this.registrar.afterPropertiesSet();
   }

这个方法主要实现的内容是:

public interface SchedulingConfigurer {
void configureTasks(ScheduledTaskRegistrar var1);
}
this.registrar.afterPropertiesSet();

所以在容器中注入TaskScheduler或ScheduledExecutorService的类或者实现SchedulingConfigurer接口都可以配置定时任务的线程池

5、afterPropertiesSet

public void afterPropertiesSet() {
    this.scheduleTasks();
}
protected void scheduleTasks() {
    if (this.taskScheduler == null) {
        this.localExecutor = Executors.newSingleThreadScheduledExecutor();
        this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    //省略了内容
    ...
        ...
        if (this.cronTasks != null) {
            var1 = this.cronTasks.iterator();
            while(var1.hasNext()) {
                CronTask task = (CronTask)var1.next();
                this.addScheduledTask(this.scheduleCronTask(task));
            }
        }
    //省略了内容
    ...
    ...
}

又进入了熟悉的方法scheduleCronTask,在这里将任务提交给taskScheduler

public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = (ScheduledTask)this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
        scheduledTask = new ScheduledTask(task);
        newTask = true;
    }
    if (this.taskScheduler != null) {
        //走到了这里
        scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    } else {
        this.addCronTask(task);
        this.unresolvedTasks.put(task, scheduledTask);
    }
    return newTask ? scheduledTask : null;
}

6、如果想看taskScheduler是怎么执行定时任务的,可以看taskScheduler的一个默认实现ConcurrentTaskScheduler,大体是有一个任务队列WorkerQueue,这个队列是按小顶堆排序的,排序规则是任务执行的时间,每次取出任务时,将任务提交给线程池执行,在执行任务的时候,计算下一次执行的时间,提交队列…

到此这篇关于Spring的定时任务@Scheduled源码详解的文章就介绍到这了,更多相关Spring定时任务@Scheduled内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文