springboot定时任务SchedulingConfigurer异步多线程实现方式
作者:wfreefish
这篇文章主要介绍了springboot定时任务SchedulingConfigurer异步多线程实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
1、设计定时任务数据库(mysql)
CREATE TABLE `sys_scheduled` ( `id` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT '主键', `job_name` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '定时任务名称', `class_name` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '类名', `method` varchar(250) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '方法名', `cron` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '定时任务表达式', `start_flag` tinyint(1) NULL DEFAULT NULL COMMENT '启用标记 1启用 0停用', `create_date` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '创建日期', `create_by` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '创建人', `update_by` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '更新人', `update_date` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '更新日期', `del_flag` varchar(32) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '删除标记', `temp` varchar(2500) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '备注', PRIMARY KEY (`id`) USING BTREE ) ENGINE = MyISAM AUTO_INCREMENT = 1 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci COMMENT = '定时任务配置表' ROW_FORMAT = Dynamic;
2、编写批量获取定时任务配置查询服务SysScheduledService
package com.jdsoft.springbootmybatisplusgenrator.service.crm.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.jdsoft.springbootmybatisplusgenrator.bean.scheduled.SysScheduled; import com.jdsoft.springbootmybatisplusgenrator.dao.crm.ScheduledMapper; import com.jdsoft.springbootmybatisplusgenrator.service.crm.SysScheduledService; import org.springframework.stereotype.Service; import java.util.List; /** * @author: wxf * Date: 2023/8/2 * Time: 17:16 * Description: */ @Service public class SysScheduledServiceImpl extends ServiceImpl<ScheduledMapper,SysScheduled> implements SysScheduledService { @Override public List<SysScheduled> getAllConfig() { return this.getBaseMapper().getAllConfig(); } @Override public SysScheduled getById(String id) { LambdaQueryWrapper<SysScheduled> lw = new LambdaQueryWrapper<>(); lw.eq(SysScheduled::getId,id); return this.getBaseMapper().selectOne(lw); } }
2.1 po SysScheduled
package com.jdsoft.springbootmybatisplusgenrator.bean.scheduled; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.Data; import java.io.Serializable; @Data public class SysScheduled implements Serializable { @TableId(value = "id",type = IdType.ID_WORKER_STR) private String id; // 主键 @TableId(value = "job_name") //表里的字段名称映射到实体类 private String jobName; // 定时任务名称 @TableId(value = "class_name") private String className; // 类名 @TableId(value = "method") private String method; // 方法名 @TableId(value = "cron") private String cron; // 定时任务表达式 @TableId(value = "start_flag") private Boolean startFlag; // 启用标记 @TableId(value = "temp") private String temp;//备注 }
2.2 mybatisPlus业务mapper ScheduledMapper
package com.jdsoft.springbootmybatisplusgenrator.dao.crm; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.jdsoft.springbootmybatisplusgenrator.bean.scheduled.SysScheduled; import org.apache.ibatis.annotations.Select; import java.util.List; /** * @author: wxf * Date: 2023/8/2 * Time: 17:14 * Description: 定时任务配置 mapper */ public interface ScheduledMapper extends BaseMapper<SysScheduled> { @Select("SELECT* FROM sys_scheduled u ") List<SysScheduled> getAllConfig(); }
3、在启动容器中添加@EnableScheduling注解
4、增加调度任务配置类实现
SchedulingConfigurer, AsyncConfigurer接口
说明:springboot项目启动会执行configureTasks实现方法代码
package com.scheduled; import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; import cn.hutool.core.collection.CollectionUtil; import com.alibaba.druid.util.StringUtils; import com.jdsoft.springbootmybatisplusgenrator.bean.scheduled.SysScheduled; import com.jdsoft.springbootmybatisplusgenrator.service.crm.SysScheduledService; import com.wxf.springdemo.redis.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.CronTask; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import org.springframework.stereotype.Component; import org.springframework.util.ReflectionUtils; /** * @Package: com.scheduled * @ClassName: ScheduleSetting * @Author: wxf * @Description: 任务调度 * @Date: 2023/8/1 20:11 * @Version: 1.0 */ @Component public class ScheduleSetting implements SchedulingConfigurer, AsyncConfigurer { @Autowired private SysScheduledService sysScheduledService; private volatile ScheduledTaskRegistrar scheduledTaskRegistrar; //调度器map private Map<String, ScheduledFuture<?>> scheduledFutures = new HashMap<String, ScheduledFuture<?>>(); // 执行任务map private Map<String, CronTask> cronTasks = new HashMap<String, CronTask>(); @Autowired private RedisUtil redisUtil; private static List<SysScheduled> scheduleList; @Override public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) { //异步执行定时任务 TaskScheduler taskScheduler = taskScheduler(); scheduledTaskRegistrar.setTaskScheduler(taskScheduler); this.scheduledTaskRegistrar = scheduledTaskRegistrar; scheduleList = sysScheduledService.getAllConfig(); redisUtil.setSysScheduledList("scheduleList", scheduleList); refresh(scheduleList); } /** * 刷新任务列表 * * @param scheduleList 待执行的任务列表 */ public void refresh(List<SysScheduled> scheduleList) { //取消停用的定时策略 Set<String> keyIds = scheduledFutures.keySet(); CopyOnWriteArrayList<String> copyKeyIds = CollectionUtil.newCopyOnWriteArrayList(keyIds); for (String scheduledFutureId : copyKeyIds) { if (!taskExit(scheduleList, scheduledFutureId)) { //cancel 参数 false 表示如果调度器当前执行任务时 不进行打断 true表示直接打断进程取消策略 scheduledFutures.get(scheduledFutureId).cancel(false); scheduledFutures.remove(scheduledFutureId); cronTasks.remove(scheduledFutureId); } } if (CollectionUtil.isNotEmpty(scheduleList)) { for (SysScheduled sysScheduled : scheduleList) { //定时corn表达式 String cron = sysScheduled.getCron(); String keyId = sysScheduled.getId(); //任务状态 在新增任务时进行判断 Boolean startFlag = sysScheduled.getStartFlag(); // 如果启动状态是false的话 , taskExit已进行处理,不需要再进行操作,直接跳下一次循环 if (!startFlag) { continue; } if (StringUtils.isEmpty(cron)) { continue; } //定时任务存在的话,并且没发生改变的话不进行处理 if (scheduledFutures.containsKey(keyId) && cronTasks.get(keyId).getExpression().equalsIgnoreCase(cron)) { continue; } else if (scheduledFutures.containsKey(keyId)) { //任务调度时间发生改变,取消当前策略的任务 scheduledFutures.get(keyId).cancel(false); scheduledFutures.remove(keyId); cronTasks.remove(keyId); } //新建定时任务 CronTask task = new CronTask(getRunnable(sysScheduled), cron); //将定时任务分配到对应的调度器 ScheduledFuture<?> scheduledFuture = scheduledTaskRegistrar.getScheduler(). schedule(task.getRunnable(), task.getTrigger()); //保存缓存任务map cronTasks.put(keyId, task); //保存缓存调度器 map scheduledFutures.put(keyId, scheduledFuture); } } } /** * 判断调度器缓存map中是否存在查询到的配置调度任务 * * @param scheduleList 数据库查询的调度任务 * @param taskId 主键id * @return 判断结果 */ private boolean taskExit(List<SysScheduled> scheduleList, String taskId) { if (CollectionUtil.isEmpty(scheduleList) || StringUtils.isEmpty(taskId)) { return false; } for (SysScheduled sysScheduled : scheduleList) { //id存在 且任务状态是开启状态 判断任务存在 if (taskId.equalsIgnoreCase(sysScheduled.getId()) && sysScheduled.getStartFlag()) { return true; } } return false; } /** * 设置异步线程池 任务调度器 * * @return 返回任务调度器对象 */ public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.initialize(); //设置线程池数量 scheduler.setPoolSize(8); scheduler.setThreadNamePrefix("Mq-Listeners"); //等待终止时间 scheduler.setAwaitTerminationSeconds(60); //等待任务完成关闭 scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } /** * 转换首字母小写 * * @param str * @return */ public static String lowerFirstCapse(String str) { char[] chars = str.toCharArray(); chars[0] += 32; return String.valueOf(chars); } /** * runnable * * @param scheduleConfig * @return */ private Runnable getRunnable(final SysScheduled scheduleConfig) { return new Runnable() { @Override public void run() { Class<?> clazz; try { //根据id查询对应的 启动状态 clazz = Class.forName(scheduleConfig.getClassName()); String className = lowerFirstCapse(clazz.getSimpleName()); Object bean = (Object) ApplicationContextHelper.getBean(className); Method method = ReflectionUtils.findMethod(bean.getClass(), scheduleConfig.getMethod()); ReflectionUtils.invokeMethod(method, bean); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }; } }
5、当数据库配置发生改变后直接调用
ScheduleSetting.refresh()方法进行刷新任务策略,实现动态启用、停用任务
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。