java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring Scheduling本地任务调度设计与实现

Spring Scheduling本地任务调度设计与实现方式

作者:肥肥技术宅

这篇文章主要介绍了Spring Scheduling本地任务调度设计与实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教

一、Spring Boot 集成 Scheduling

对于不涉及分布式计算又关于时间的任务处理,也就是本地任务调度(Task Schduling)既是 Spring Framework 的集成功能,也是 Spring Boot 的重要特性。

在 Spring Boot 中,任务调度的使用得到了极大简化。

1、简单任务调度

从官方文档介绍中,在任务调度类上声明 EnableScheduling 和 @Configuration,并在调度方法添加 @Scheduled 就可以完成任务调度。

@Configuration
@EnableScheduling
public class SayHelloTask {
 
    @Scheduled(cron = "${hello.schedule.cron:*/10 * * * * *}")
    public void sayHello() {
        System.out.print("Hello,Schedule");
    }
}

@Scheduled 支持 cron 表达式,它的使用在这篇文章中做了介绍。

Spring Boot 做了增强,可以添加默认值,优先从配置文件中读取 hello.schedule.cron,如果为空则使用后面默认的值,也就是每十秒钟执行一次 sayHello 方法。

线程池默认使用一个线程,可使用 spring.task.scheduling 命名空间进行如下微调:

properties复制代码spring.task.scheduling.thread-name-prefix=scheduling-
spring.task.scheduling.pool.size=2

2、自定义任务调度

如果需要扩展任务调度,可以实现 SchedulingConfigurer 来完成。

@Component
public class SayHiSchedulingConfigurer implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(4);
        threadPoolTaskScheduler.setThreadNamePrefix("helloschedules");
        threadPoolTaskScheduler.initialize();
 
        taskRegistrar.setScheduler(threadPoolTaskScheduler);
    }
}

这样,调整了任务调度的线程池大小,也修改了线程日志名称,便于日志分析定位。

二、Spring Scheduling 设计说明

Spring Boot 对 TaskExecution and Scheduling 做了简要使用说明,深入了解本地任务调度可以参考 Spring Framework 对 TaskExecution and Scheduling 的介绍。

下面就以 Spring Boot 3.1.2 以及 Spring Framework 6.0.11 版本为例,重点分析 Spring 对本地任务调度的实现方法。

首先,打开 @EnableScheduling 注解:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
 
}

这个注解写了大段的注释用于解释任务调度的用法,从注解中我们可以了解到本地调度的设计说明,这是后面分析源码、扩展实现、最佳实践的基础:

三、Spring Scheduling 关键类初探

1、SchedulingConfiguration

使用 @EnableScheduling 就会自动引入这个注解,它只做了一件事就是注入了一个 bean ScheduledAnnotationBeanPostProcessor。

@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
 
    @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
        return new ScheduledAnnotationBeanPostProcessor();
    }
 
}

2、ScheduledAnnotationBeanPostProcessor

ScheduledAnnotationBeanPostProcessor 是任务调度的核心类,由 @EnableScheduling 注解自动注册,作为 bean 的后置处理器实现了大量接口。核心功能是提供 cron、fixedRate 和 fixedDelay 三种调度模式,识别 @Scheduled 标记的方法,并转换成 TaskScheduler 可调度的任务。以及,识别所有 SchedulingConfigurer 的实例,允许自定义使用调度任务或对任务进行细粒度的控制。

(1)postProcessAfterInitialization

忽略掉细枝末节,方法的核心功能是扫描带有 @Scheduled 注解的方法,并处理成标准化任务。

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
    // ...
    AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
        Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                method, Scheduled.class, Schedules.class);
        return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
    });
    
    // ...
    annotatedMethods.forEach((method, scheduledAnnotations) ->
            scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
 
    // ...
    return bean;
}

(2)processScheduled

将扫描到的所有调度配置借助 ScheduledTaskRegistrar 转换成标准化调度任务,直接被异步执行,返回结果交给 ScheduledTask.future。

protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
    try {
        Runnable runnable = createRunnable(bean, method);
        boolean processedSchedule = false;
        String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
 
        Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
 
        // ...
 
        // Check cron expression
        String cron = scheduled.cron();
        if (StringUtils.hasText(cron)) {
            String zone = scheduled.zone();
            if (this.embeddedValueResolver != null) {
                cron = this.embeddedValueResolver.resolveStringValue(cron);
                zone = this.embeddedValueResolver.resolveStringValue(zone);
            }
            if (StringUtils.hasLength(cron)) {
                Assert.isTrue(initialDelay.isNegative(), "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                if (!Scheduled.CRON_DISABLED.equals(cron)) {
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
            }
        }
 
        // ...
 
        // Finally register the scheduled tasks
        synchronized (this.scheduledTasks) {
            Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
            regTasks.addAll(tasks);
        }
 
        // ...
}

这里分析一下 fixedDelay 与 fixedRate 的区别:

fixedDelay 属性可确保在执行任务的结束时间与下一次执行任务的开始时间之间有 n 毫秒的延迟。当我们需要确保只有一个任务实例一直在运行时,该属性特别有用。

而 fixedRate 属性是每 n 毫秒运行一次计划任务。它不会检查任务之前的执行情况。如果任务的所有执行都是独立的,这一点就很有用。如果我们不希望超出内存和线程池的大小,那么 fixedRate 就会非常方便。不过,如果进入的任务不能快速完成,就有可能出现内存不足异常。

(3)finishRegistration

这个方法完成任务调度器注册。提供自定义调度任务扩展点的 SchedulingConfigurer 接口,并被全部扫描进来。

最后,查找 TaskScheduler,添加到 ScheduledTaskRegistrar 中。

private void finishRegistration() {
    if (this.scheduler != null) {
        this.registrar.setScheduler(this.scheduler);
    }
 
    if (this.beanFactory instanceof ListableBeanFactory lbf) {
        Map<String, SchedulingConfigurer> beans = lbf.getBeansOfType(SchedulingConfigurer.class);
        List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
        AnnotationAwareOrderComparator.sort(configurers);
        for (SchedulingConfigurer configurer : configurers) {
            configurer.configureTasks(this.registrar);
        }
    }
    // ...
 
    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true /false));
    
    // ...
 
    this.registrar.afterPropertiesSet();
}

3、ScheduledTaskRegistrar

ScheduledAnnotationBeanPostProcessor 被 SchedulingConfiguration 创建的时候二话没说,初始化就创建了一个 ScheduledTaskRegistrar。

ScheduledTaskRegistrar 用于辅助任务注册到 TaskScheduler 中,尤其是结合 @EnableAsync 注解和 SchedulingConfigurer 的回调方法时。这个 bean 在创建时会先判断是否存在 TaskScheduler,没有就会创建一个单线程任务调度池,然后逐一把调度任务添加到任务队列中。

(1)scheduleTasks

在被 ScheduledAnnotationBeanPostProcessor#finishRegistration 调用时,就是完成任务标准化,afterPropertiesSet 只完成了一个方法完成,也就是 scheduleTasks。

protected void scheduleTasks() {
    if (this.taskScheduler == null) {
       this.localExecutor = Executors.newSingleThreadScheduledExecutor();
       this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
    }
    
    // ...
    
    if (this.cronTasks != null) {
       for (CronTask task : this.cronTasks) {
          addScheduledTask(scheduleCronTask(task));
       }
    }
    
    // ...
}

(2)scheduleCronTask

使用适配器模式将各个 Task 转换成 ScheduledTask,成为标准化任务执行。

其他几个方法 scheduleTriggerTask,scheduleFixedRateTask,scheduleFixedDelayTask 都是类似的。

public ScheduledTask scheduleCronTask(CronTask task) {
    ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
    boolean newTask = false;
    if (scheduledTask == null) {
       scheduledTask = new ScheduledTask(task);
       newTask = true;
    }
    if (this.taskScheduler != null) {
       scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
    }
    else {
       addCronTask(task);
       this.unresolvedTasks.put(task, scheduledTask);
    }
    return (newTask ? scheduledTask : null);
}

四、Spring Scheduling 调用时序

@EnableScheduling 的引入使得 Spring 容器开启了对本地调度任务扫描,并自动装配了 SchedulingConfiguration,实际上是引入了 ScheduledAnnotationBeanPostProcessor。

ScheduledAnnotationBeanPostProcessor 实现了 postProcessAfterInitialization 首先被执行,在 bean 初始化完成后对 @Scheduled 注解进行扫描并转换成标准化本地调度任务 ScheduledTask。

任务转换完成后就会异步执行任务,但是需要等待主线程对线程池的初始化。ScheduledAnnotationBeanPostProcessor 实现了 onApplicationEvent 完成对任务注册的初始化,包括自定义调度任务配置

SchedulingConfigurer 的所有实现的扫描,以及对调度任务执行者 TaskScheduler 的初始化,如果都没有注入的话会创建一个默认的单线程任务调度器。

最后,准备工作完成后,由 ScheduledTaskRegistrar 的各类型调度任务分别调用 ScheduledFuture 的 schedule 方法完成任务。

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

您可能感兴趣的文章:
阅读全文