springboot使用TaskScheduler实现动态增删启停定时任务方式
作者:?abc!
TaskScheduler
概述
TaskScheduler是spring 3.0版本后,自带了一个定时任务工具,不用配置文件,可以动态改变执行状态。也可以使用cron表达式设置定时任务。
被执行的类要实现Runnable接口
TaskScheduler是一个接口,它定义了6个方法
接口的6种方法
public interface TaskScheduler { /** * 提交任务调度请求 * @param task 待执行任务 * @param trigger 使用Trigger指定任务调度规则 * @return */ ScheduledFuture schedule(Runnable task, Trigger trigger); /** * 提交任务调度请求 * 注意任务只执行一次,使用startTime指定其启动时间 * @param task 待执行任务 * @param startTime 任务启动时间 * @return */ ScheduledFuture schedule(Runnable task, Date startTime); /** * 使用fixedRate的方式提交任务调度请求 * 任务首次启动时间由传入参数指定 * @param task 待执行的任务 * @param startTime 任务启动时间 * @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒 * @return */ ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); /** * 使用fixedRate的方式提交任务调度请求 * 任务首次启动时间未设置,任务池将会尽可能早的启动任务 * @param task 待执行任务 * @param period 两次任务启动时间之间的间隔时间,默认单位是毫秒 * @return */ ScheduledFuture scheduleAtFixedRate(Runnable task, long period); /** * 使用fixedDelay的方式提交任务调度请求 * 任务首次启动时间由传入参数指定 * @param task 待执行任务 * @param startTime 任务启动时间 * @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 * @return */ ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); /** * 使用fixedDelay的方式提交任务调度请求 * 任务首次启动时间未设置,任务池将会尽可能早的启动任务 * @param task 待执行任务 * @param delay 上一次任务结束时间与下一次任务开始时间的间隔时间,单位默认是毫秒 * @return */ ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); }
0、ThreadPoolTaskScheduler
在 ThreadPoolTaskSchedulerConfig 中定义 ThreadPoolTaskScheduler bean
@Configuration public class ThreadPoolTaskSchedulerConfig { @Bean public ThreadPoolTaskScheduler threadPoolTaskScheduler(){ ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.setPoolSize(5); threadPoolTaskScheduler.setThreadNamePrefix( "ThreadPoolTaskScheduler"); return threadPoolTaskScheduler; } }
配置的 bean threadPoolTaskScheduler 可以根据配置的池大小 5 异步执行任务。
请注意,所有与 ThreadPoolTaskScheduler 相关的线程名称都将以ThreadPoolTaskScheduler 为前缀。
让我们实现一个简单的任务,然后我们可以安排:
class RunnableTask implements Runnable{ private String message; public RunnableTask(String message){ this.message = message; } @Override public void run() { System.out.println(new Date()+" Runnable Task with "+message +" on thread "+Thread.currentThread().getName()); } }
1、schedule(Runnable task, Trigger trigger)
指定一个触发器执行定时任务。可以使用CronTrigger来指定Cron表达式,执行定时任务
如下:使用CronTrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,RunnableTask 将在每分钟的第 10 秒执行。
taskScheduler.schedule(new RunnableTask("Cron Trigger"), cronTrigger);
2、schedule(Runnable task, Date startTime);
指定一个具体时间点执行定时任务,可以动态的指定时间,开启任务,只执行一次
如下:配置一个任务在 1000 毫秒的固定延迟后运行,RunnableTask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskScheduler.schedule( new Runnabletask("Specific time, 3 Seconds from now"), new Date(System.currentTimeMillis + 3000) );
3、scheduleAtFixedRate(Runnable task, long period);
立即执行,循环任务,指定一个执行周期(毫秒计时)
PS:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:安排一个任务以固定的毫秒速率运行,下一个 RunnableTask 将始终在 2000 毫秒后运行,而不管上次执行的状态如何,它可能仍在运行。
taskScheduler.scheduleAtFixedRate( new RunnableTask("Fixed Rate of 2 seconds") , 2000);
4、scheduleAtFixedRate(Runnable task, Date startTime, long period);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
PS:不管上一个周期是否执行完,到时间下个周期就开始执行
如下:使用CronTrigger 来根据 cron 表达式调度任务,可以使用提供的触发器按照某个指定的节奏或时间表运行任务,在这种情况下,RunnableTask 将在每分钟的第 10 秒执行。
taskScheduler.scheduleAtFixedRate(new RunnableTask( "Fixed Rate of 2 seconds"), new Date(), 3000);
5、scheduleWithFixedDelay(Runnable task, long delay);
立即执行,循环任务,指定一个间隔周期(毫秒计时)
PS:上一个周期执行完,等待delay时间,下个周期开始执行
如下:配置一个任务在 1000 毫秒的固定延迟后运行,RunnableTask 将始终在一次执行完成和下一次执行开始之间运行 1000 毫秒。
taskScheduler.scheduleWithFixedDelay( new RunnableTask("Fixed 1 second Delay"), 1000);
6、scheduleWithFixedDelay(Runnable task, Date startTime, long delay);
指定时间开始执行,循环任务,指定一个间隔周期(毫秒计时)
PS:上一个周期执行完,等待delay时间,下个周期开始执行
如下:将任务配置为在给定开始时间的固定延迟后运行,RunnableTask 将在指定的执行时间被调用,其中包括 @PostConstruct 方法开始的时间,随后延迟 1000 毫秒。
taskScheduler.scheduleWithFixedDelay( new RunnableTask("Current Date Fixed 1 second Delay"), new Date(), 1000);
接口5个实现类
1、ConcurrentTaskScheduler
以当前线程执行任务,如果任务简单,可以直接使用这个类来执行,快捷方便
- 单线程运行
public class LocTest implements Runnable { private ConcurrentTaskScheduler concurrentTaskScheduler = new ConcurrentTaskScheduler(); private void start() { concurrentTaskScheduler.schedule(this, new Date()); } public void run() { Thread thread = Thread.currentThread(); System.out.println("current id:" + thread.getId()); System.out.println("current name:" + thread.getName()); } public static void main(String[] args) { new LocTest().start(); } }
2、DefaultManagedTaskScheduler
以当前线程执行任务,是ConcurrentTaskScheduler的子类,添加了JNDI的支持。
和ConcurrentTaskScheduler一样的用法,需要使用JNDI可以单独设置
3、ThreadPoolTaskScheduler
TaskScheduler接口的默认实现类,多线程定时任务执行。可以设置执行线程池数(默认一个线程)
- 使用前必须得先调用
initialize()
【初始化方法】 - 有
shutDown()方法
,执行完后可以关闭线程
除实现了TaskScheduler接口中的方法外,它还包含了一些对ScheduledThreadPoolExecutor进行操作的接口,其常用方法如下:
setPoolSize
:设置线程池大小,最小为1,默认情况下也为1;setErrorHandler
:设置异常处理器。getScheduledThreadPoolExecutor
:获取ScheduledExecutor,默认ScheduledThreadPoolExecutor类型。getActiveCount
:获取当前活动的线程数execute
: 提交执行一次的任务submit
\submitListenable
:提交执行一次的任务,并且返回一个Future对象供判断任务状态使用
public class LocTest implements Runnable { private ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); private void start() { taskScheduler.setPoolSize(10); //必须得先初始化,才能使用 taskScheduler.initialize(); taskScheduler.schedule(this, new Date()); } public void run() { Thread ct = Thread.currentThread(); System.out.println("current id:"+ct.getId()); System.out.println("current name:"+ct.getName()); } public static void main(String[] args) { new LocTest().start(); } }
4、TimerManagerTaskScheduler
用于包装CommonJ中的TimerManager接口。
在使用CommonJ进行调度时使用
spring boot使用TaskScheduler实现动态增删启停定时任务
SchedulingConfig:添加执行定时任务的线程池配置类
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; @Configuration public class SchedulingConfig { @Bean public TaskScheduler taskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 定时任务执行线程池核心线程数 taskScheduler.setPoolSize(4); taskScheduler.setRemoveOnCancelPolicy(true); taskScheduler.setThreadNamePrefix("TaskSchedulerThreadPool-"); return taskScheduler; } }
ScheduledTask:添加ScheduledFuture的包装类
ScheduledFuture是ScheduledExecutorService定时任务线程池的执行结果。
import java.util.concurrent.ScheduledFuture; public final class ScheduledTask { volatile ScheduledFuture<?> future; /** * 取消定时任务 */ public void cancel() { ScheduledFuture<?> future = this.future; if (future != null) { future.cancel(true); } } }
SchedulingRunnable:添加Runnable接口实现类
添加Runnable接口实现类,被定时任务线程池调用,用来执行指定bean里面的方法
import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ReflectionUtils; import java.lang.reflect.Method; import java.util.Objects; public class SchedulingRunnable implements Runnable { private static final Logger logger = LoggerFactory.getLogger(SchedulingRunnable.class); private final String beanName; private final String methodName; private final String params; public SchedulingRunnable(String beanName, String methodName) { this(beanName, methodName, null); } public SchedulingRunnable(String beanName, String methodName, String params) { this.beanName = beanName; this.methodName = methodName; this.params = params; } @Override public void run() { logger.info("定时任务开始执行 - bean:{},方法:{},参数:{}", beanName, methodName, params); long startTime = System.currentTimeMillis(); try { Object target = SpringContextUtils.getBean(beanName); Method method = null; if (StringUtils.isNotEmpty(params)) { method = target.getClass().getDeclaredMethod(methodName, String.class); } else { method = target.getClass().getDeclaredMethod(methodName); } ReflectionUtils.makeAccessible(method); if (StringUtils.isNotEmpty(params)) { method.invoke(target, params); } else { method.invoke(target); } } catch (Exception ex) { logger.error(String.format("定时任务执行异常 - bean:%s,方法:%s,参数:%s ", beanName, methodName, params), ex); } long times = System.currentTimeMillis() - startTime; logger.info("定时任务执行结束 - bean:{},方法:{},参数:{},耗时:{} 毫秒", beanName, methodName, params, times); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SchedulingRunnable that = (SchedulingRunnable) o; if (params == null) { return beanName.equals(that.beanName) && methodName.equals(that.methodName) && that.params == null; } return beanName.equals(that.beanName) && methodName.equals(that.methodName) && params.equals(that.params); } @Override public int hashCode() { if (params == null) { return Objects.hash(beanName, methodName); } return Objects.hash(beanName, methodName, params); } }
CronTaskRegistrar:添加定时任务注册类,用来增加、删除定时任务
import com.example.testspringboot.cron.ScheduleResult; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.config.CronTask; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * 添加定时任务注册类,用来增加、删除定时任务。 */ @Component public class CronTaskRegistrar implements DisposableBean { private final Map<Runnable, ScheduledTask> scheduledTasks = new ConcurrentHashMap<>(16); private final Map<Integer, ScheduleResult> schedulerJob = new HashMap<>(); @Autowired private TaskScheduler taskScheduler; public TaskScheduler getScheduler() { return this.taskScheduler; } public void addCronTask(ScheduleResult scheduleResult) { SchedulingRunnable task = new SchedulingRunnable(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams()); String cronExpression = scheduleResult.getCronExpression(); CronTask cronTask = new CronTask(task, cronExpression); // 如果当前包含这个任务,则移除 if (this.scheduledTasks.containsKey(task)) { removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams()); } schedulerJob.put(scheduleResult.getJobId(), scheduleResult); this.scheduledTasks.put(task, scheduleCronTask(cronTask)); } public void removeCronTask(String beanName, String methodName, String methodParams) { SchedulingRunnable task = new SchedulingRunnable(beanName, methodName, methodParams); ScheduledTask scheduledTask = this.scheduledTasks.remove(task); if (scheduledTask != null) { scheduledTask.cancel(); } } public void removeCronTask(ScheduleResult scheduleResult) { schedulerJob.put(scheduleResult.getJobId(), scheduleResult); removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams()); } public ScheduledTask scheduleCronTask(CronTask cronTask) { ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.future = this.taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger()); return scheduledTask; } public Map<Runnable, ScheduledTask> getScheduledTasks() { return scheduledTasks; } public Map<Integer, ScheduleResult> getSchedulerJob() { return schedulerJob; } @Override public void destroy() { for (ScheduledTask task : this.scheduledTasks.values()) { task.cancel(); } this.scheduledTasks.clear(); } public ScheduleResult getSchedulerByJobId(Integer jobId) { for (ScheduleResult job : findAllTask()) { if (jobId.equals(job.getJobId())) { return job; } } return null; } public List<ScheduleResult> findAllTask() { List<ScheduleResult> ScheduleResults = new ArrayList<>(); Set<Map.Entry<Integer, ScheduleResult>> entries = schedulerJob.entrySet(); for (Map.Entry<Integer, ScheduleResult> en : entries) { ScheduleResults.add(en.getValue()); } return ScheduleResults; } }
CronUtils:校验Cron表达式的有效性
import org.springframework.scheduling.support.CronExpression; public class CronUtils { /** * 返回一个布尔值代表一个给定的Cron表达式的有效性 * * @param cronExpression Cron表达式 * @return boolean 表达式是否有效 */ public static boolean isValid(String cronExpression) { return CronExpression.isValidExpression(cronExpression); } }
ScheduleResult:添加定时任务实体类
import lombok.Data; @Data public class ScheduleResult { /** * 任务ID */ private Integer jobId; /** * bean名称 */ private String beanName; /** * 方法名称 */ private String methodName; /** * 方法参数: 执行service里面的哪一种方法 */ private String methodParams; /** * cron表达式 */ private String cronExpression; /** * 状态(1正常 0暂停) */ private Integer jobStatus; /** * 备注 */ private String remark; /** * 创建时间 */ private String createTime; /** * 更新时间 */ private String updateTime; }
ScheduleJobStatus:任务状态枚举类型
public enum ScheduleJobStatus { /** * 暂停 */ PAUSE, /** * 正常 */ NORMAL; }
SpringContextUtils类:从spring容器里获取bean
import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class SpringContextUtils implements ApplicationContextAware { private static ApplicationContext applicationContext = null; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (SpringContextUtils.applicationContext == null) { SpringContextUtils.applicationContext = applicationContext; } } public static ApplicationContext getApplicationContext() { return applicationContext; } // 通过name获取 Bean. public static Object getBean(String name) { return getApplicationContext().getBean(name); } // 通过class获取Bean. public static <T> T getBean(Class<T> clazz) { return getApplicationContext().getBean(clazz); } // 通过name,以及Clazz返回指定的Bean public static <T> T getBean(String name, Class<T> clazz) { return getApplicationContext().getBean(name, clazz); } public static boolean containsBean(String name) { return getApplicationContext().containsBean(name); } public static boolean isSingleton(String name) { return getApplicationContext().isSingleton(name); } public static Class<? extends Object> getType(String name) { return getApplicationContext().getType(name); } }
ScheduleJobService:增删启停service方法
@Service @Slf4j public class ScheduleJobService { @Autowired private CronTaskRegistrar cronTaskRegistrar; public void addScheduleJob(ScheduleResult scheduleResult) { long currentTimeMillis = System.currentTimeMillis(); scheduleResult.setCreateTime(formatTimeYMD_HMS_SSS(currentTimeMillis)); scheduleResult.setUpdateTime(formatTimeYMD_HMS_SSS(currentTimeMillis)); scheduleResult.setJobId(findAllTask().size() + 1); if (scheduleResult.getJobStatus().equals(ScheduleJobStatus.NORMAL.ordinal())) { log.info("Stop or pause: is now on"); cronTaskRegistrar.addCronTask(scheduleResult); return; } cronTaskRegistrar.getSchedulerJob().put(scheduleResult.getJobId(), scheduleResult); } public void editScheduleJob(ScheduleResult currentSchedule) { //先移除 cronTaskRegistrar.removeCronTask(currentSchedule.getBeanName(), currentSchedule.getMethodName(), currentSchedule.getMethodParams()); ScheduleResult pastScheduleJob = cronTaskRegistrar.getSchedulerByJobId(currentSchedule.getJobId()); if (pastScheduleJob == null) { System.out.println("没有这个任务"); return; } //然后判断是否开启, 如果开启的话,现在立即执行 startOrStopSchedulerJob(currentSchedule, true); } public void deleteScheduleJob(ScheduleResult scheduleResult) { // 清除这个任务 cronTaskRegistrar.removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams()); // 清除这个任务的数据 cronTaskRegistrar.getSchedulerJob().remove(scheduleResult.getJobId()); } public void startOrStopScheduler(ScheduleResult scheduleResult) { cronTaskRegistrar.getSchedulerJob().get(scheduleResult.getJobId()).setJobStatus(scheduleResult.getJobStatus()); startOrStopSchedulerJob(scheduleResult, false); } private void startOrStopSchedulerJob(ScheduleResult scheduleResult, boolean update) { // 更新时间 scheduleResult.setUpdateTime(formatTimeYMD_HMS_SSS(System.currentTimeMillis())); if (scheduleResult.getJobStatus().equals(ScheduleJobStatus.NORMAL.ordinal())) { System.out.println("停止或暂停:现在是开启"); cronTaskRegistrar.addCronTask(scheduleResult); return; } System.out.println("停止或暂停:现在是暂停"); if (update){ cronTaskRegistrar.removeCronTask(scheduleResult); return; } cronTaskRegistrar.removeCronTask(scheduleResult.getBeanName(), scheduleResult.getMethodName(), scheduleResult.getMethodParams()); } public List<ScheduleResult> findAllTask() { return cronTaskRegistrar.findAllTask(); } // 转换为年-月-日 时:分:秒 private String formatTimeYMD_HMS_SSS(long time) { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(time); } }
cronController:访问接口
import com.example.testspringboot.cron.ScheduleResult; import com.example.testspringboot.cron.ScheduleJobService; import com.example.testspringboot.cron.utils.CronUtils; import com.google.gson.Gson; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.List; @RestController public class cronController { @Autowired private ScheduleJobService scheduleJobService; /** * 测试上传的用例文件, 获取详细执行结果 */ @PostMapping("/add") void executeTestOneFile(@RequestBody ScheduleResult scheduleResult) { boolean valid = CronUtils.isValid(scheduleResult.getCronExpression()); if (valid){ System.out.println("校验成功, 添加任务"); scheduleResult.setMethodParams(scheduleResult.getBranch()+scheduleResult.getCaseDir()); scheduleJobService.addScheduleJob(scheduleResult); }else { System.out.println("校验失败"); } } @PostMapping("/stop") void end(@RequestBody ScheduleResult scheduleResult) { Gson gson = new Gson(); System.out.println("================"); System.out.println(scheduleResult); System.out.println("================="); scheduleResult.setJobStatus(0); scheduleJobService.startOrStopScheduler(scheduleResult); } @PostMapping("/start") void start(@RequestBody ScheduleResult scheduleResult) { System.out.println("================"); System.out.println(scheduleResult); System.out.println("================="); scheduleResult.setJobStatus(1); scheduleJobService.startOrStopScheduler(scheduleResult); } @PostMapping("/edit") void edit(@RequestBody ScheduleResult scheduleResult) { System.out.println("=======edit========="); System.out.println(scheduleResult); System.out.println("================="); scheduleJobService.editScheduleJob(scheduleResult); } @PostMapping("/delete") void delete(@RequestBody ScheduleResult scheduleResult) { System.out.println("=======delete========="); System.out.println(scheduleResult); System.out.println("================="); scheduleJobService.deleteScheduleJob(scheduleResult); } @GetMapping("/tasks") List<ScheduleResult> get() throws Exception { List<ScheduleResult> allTask = scheduleJobService.findAllTask(); System.out.println("现在的定时任务数量 = " + allTask.size()); System.out.println("现在的定时任务 = " + allTask); return allTask; } }
c1:测试bean
import org.springframework.stereotype.Component; @Component public class c1 { public void test1(String y){ System.out.println("这个是test1的bean : " + y); } public void test2(){ System.out.println("这个是test1的bean中test2方法"); } }
init:项目启动后的定时任务
import com.example.testspringboot.cron.ScheduleJobService; import com.example.testspringboot.cron.ScheduleResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; @Component public class init implements CommandLineRunner { @Autowired private ScheduleJobService scheduleJobService; @Override public void run(String... args) throws Exception { System.out.println("开始珍惜"); ScheduleResult scheduleResult = new ScheduleResult(); scheduleResult.setBeanName("c1"); scheduleResult.setMethodName("test1"); scheduleResult.setCronExpression("0/25 * * * * *"); scheduleResult.setJobStatus(1); scheduleResult.setMethodParams("test1"); scheduleJobService.addScheduleJob(scheduleResult); scheduleJobService.findAllTask(); } }
后续的操作,基本上就是复制粘贴,运行
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。