java

关注公众号 jb51net

关闭
首页 > 软件编程 > java > Spring+Quartz动态任务

Spring+Quartz实现动态任务调度详解

作者:it_lihongmin

这篇文章主要介绍了Spring+Quartz实现动态任务调度详解,最近经常基于spring boot写定时任务,并且是使用注解的方式进行实现,分成的方便将自己的类注入spring容器,需要的朋友可以参考下

一、定时任务的实现

1、配置文件方式

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
 
<beans>
	<bean id="userTimer" class="com.kevin.timer.XmlTimer"></bean>
	
	<bean id="userTask"
		class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">
		<property name="targetObject">
			<ref bean="userTimer" />
		</property>
		<property name="targetMethod">
			<value>execute</value>
		</property>
	</bean>
	
	<!-- 每隔5秒钟执行一次,我要在运行时动态修改为每小时执行一次 -->
	<bean id="userTrigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
		<property name="jobDetail">
			<ref bean="userTask" />
		</property>
		<property name="cronExpression">
			<value>0/5 * * * * ?</value>
		</property>
	</bean>
	
	
	<bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
		<property name="triggers">
			<list>
				<ref bean="userTrigger" />
			</list>
		</property>
	</bean>
 
</beans>

2、注解方式

我最近经常基于spring boot写定时任务,并且是使用注解的方式进行实现,分成的方便将自己的类注入spring容器(@conponent等),

并在要实现父方法上添加注解 @Scheduled(cron="0/10 * * * * ?") ,告知任务调度器质性方法中的业务逻辑的质性时间。

二、任务调用的工作原理

三、动态任务调用的实现

首先定义任务实体应该具有的属性:

package com.kevin.schedule.entity;
/**
 * 调度任务定义
 * @author Tom
 *
 */
public class Task {
	private String id;				//任务ID,默认系统时间戳
	private String parentId = "";	//父级任务ID
	private String name = "";		//任务名称
	private String desc = "";		//任务描述
	private int planExe = 0;		//计划执行次数,默认为0,表示满足条件循环执行
	private String group = "";		//任务组名称
	private String groupDesc = "";	//任务组描述
	private String cron = "";		//任务表达式
	private String cronDesc = "";	//表达式描述
	private String trigger = "";	//触发器
	private String triggerDesc = "";//触发器描述
	private int execute = 0;		//任务被执行过多少次
	private Long lastExeTime = 0L;	//最后一次开始执行时间
	private Long lastFinishTime = 0L;//最后一次执行完成时间
	private int state = 1;			//任务状态0禁用、1启动、2删除
	private int deply = 0;			//延时启动,默认为0,表示不延时启动
	public Task(String taskId){
		this.id = taskId;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getParentId() {
		return parentId;
	}
	public void setParentId(String parentId) {
		this.parentId = parentId;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public String getCron() {
		return cron;
	}
	public void setCron(String cron) {
		this.cron = cron;
	}
	public String getCronDesc() {
		return cronDesc;
	}
	public void setCronDesc(String cronDesc) {
		this.cronDesc = cronDesc;
	}
//	public List<ScheduleJob> getChildren() {
//		return children;
//	}
//	public void setChildren(List<ScheduleJob> children) {
//		this.children = children;
//	}
	public int getExecute() {
		return execute;
	}
	public void setExecute(int execute) {
		this.execute = execute;
	}
	public String getDesc() {
		return desc;
	}
	public void setDesc(String desc) {
		this.desc = desc;
	}
	public String getGroup() {
		return group;
	}
	public void setGroup(String group) {
		this.group = group;
	}
	public String getGroupDesc() {
		return groupDesc;
	}
	public void setGroupDesc(String groupDesc) {
		this.groupDesc = groupDesc;
	}
	public int getState() {
		return state;
	}
	public void setState(int state) {
		this.state = state;
	}
	public Long getLastExeTime() {
		return lastExeTime;
	}
	public void setLastExeTime(Long lastExeTime) {
		this.lastExeTime = lastExeTime;
	}
	public String getTrigger() {
		return trigger;
	}
	public void setTrigger(String trigger) {
		this.trigger = trigger;
	}
	public String getTriggerDesc() {
		return triggerDesc;
	}
	public void setTriggerDesc(String triggerDesc) {
		this.triggerDesc = triggerDesc;
	}
	public int getDeply() {
		return deply;
	}
	public void setDeply(int deply) {
		this.deply = deply;
	}
	public int getPlanExe() {
		return planExe;
	}
	public void setPlanExe(int planExe) {
		this.planExe = planExe;
	}
//	public String getTriggerGroup() {
//		return triggerGroup;
//	}
//	public void setTriggerGroup(String triggerGroup) {
//		this.triggerGroup = triggerGroup;
//	}
//	public String getTriggerGroupDesc() {
//		return triggerGroupDesc;
//	}
//	public void setTriggerGroupDesc(String triggerGroupDesc) {
//		this.triggerGroupDesc = triggerGroupDesc;
//	}
	public Long getLastFinishTime() {
		return lastFinishTime;
	}
	public void setLastFinishTime(Long lastFinishTime) {
		this.lastFinishTime = lastFinishTime;
	}
}
package com.kevin.schedule.proxy;
import java.lang.reflect.Method;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import com.gupaoedu.schedule.entity.Task;
/**
 * 
 * @author Tom
 *
 */
public class TriggerProxy implements Job{
	public static final String DATA_TARGET_KEY = "target";		//目标对象,实例
	public static final String DATA_TRIGGER_KEY = "trigger";	//方法名
	public static final String DATA_TRIGGER_PARAMS_KEY = "trigger_params";//方法的参数值
	public static final String DATA_TASK_KEY = "task";			//自己封装的任务对象
	private ThreadLocal<Entry> local = new ThreadLocal<Entry>();
	//是由调度器自动调用的
	public void execute(JobExecutionContext context) throws JobExecutionException {
//		TriggerProxy.class.getResource("")
		try {
			local.set(new Entry());
			//获取参数信息
			JobDataMap data = context.getTrigger().getJobDataMap();
			Object target = data.get(DATA_TARGET_KEY);
			Method method = (Method)data.get(DATA_TRIGGER_KEY);
			Object[] params = (Object[])data.get(DATA_TRIGGER_PARAMS_KEY);
			//修改任务执行次数
			Task task = (Task)data.get(DATA_TASK_KEY);
			//任务没执行一次,需要累加1
			task.setExecute(task.getExecute() + 1);
			local.get().start = System.currentTimeMillis();
			//调用触发器,用反射调用我们自己定义的方法
			method.invoke(target,params);
			local.get().end = System.currentTimeMillis();
			//记录任务的最后一次执行时间
			task.setLastExeTime(local.get().start);
			//记录任务完成的时间
			task.setLastFinishTime(local.get().end);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	class Entry{
		public long start = 0L;
		public long end = 0L;
	}
}
@Component
public class AnnotationTimer {
	Logger LOG = Logger.getLogger(this.getClass());
	@Scheduled(cron="0/10 * * * * ?")
	@Async
	public void a(){
		LOG.info("annotation配置的任务执行");
	}
}

定义动态任务调度需要的方法,在web项目中,通过界面的方式调用方法以动态控制和查看任务调度的执行情况:

package com.kevin.schedule.service;
import java.util.List;
import com.gupaoedu.schedule.entity.Task;
public interface IScheduleService {
	/**
	 * 获取任务列表
	 * @return
	 */
	public List<Task> getAllTask();
	/**
	 * 根据任务ID获取一个任务
	 * @return
	 */
	public Task getTask(String taskId);
	/**
	 * 新建一个任务
	 * @param taskName 任务名称
	 * @param taskClassName	任务Class名称
	 * @param triggerName 触发器名称
	 * @param cron	执行表达式
	 * @throws Exception
	 */
	public Task createTask(String taskName,String taskClassName,String triggerName,String cron) throws Exception;
    /** 
     * 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名) 
     *  
     */  
    public Task modifyTaskCron(String taskId, String cron);
    /** 
     * 移除一个任务(使用默认的任务组名,触发器名,触发器组名) 
     *  
     */  
    public Task removeTask(String taskId);
    /**
     * 重启任务
     * @param taskId
     * @return
     */
    public Task restartTask(String taskId);
    /**
     * 暂停定时任务
     * @param taskId
     * @return
     */
    public Task pauseTask(String taskId);
    /**
     * 关闭定时任务
     * @param taskId
     * @return
     */
    public Task shutdownTask(String taskId);
    /** 
     * 启动所有定时任务 
     *  
     */
    public void startAllTask();
    /** 
     * 关闭所有定时任务 
     */  
    public void shutdownAllTask();
}

实现方法:

package com.kevin.schedule.service.impl;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.quartz.CronTrigger;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import com.gupaoedu.schedule.entity.Task;
import com.gupaoedu.schedule.proxy.TriggerProxy;
import com.gupaoedu.schedule.service.IScheduleService;
/**
 * 任务管理,负责输出和管理日志
 * @author Tom
 */
@Service
public class ScheduleService implements IScheduleService,ApplicationContextAware{
	//自己不能再重新搞一个工厂出来,必须和Spring引用的调度工厂是同一个
	//才能实现无缝集成,而且可以动态修改配置文件已经配置好的任务
	@Autowired private SchedulerFactoryBean schedulerFactory;
	//保存所有任务表达式的描述
	private Map<String,String> cronDesc = new LinkedHashMap<String,String>(); 
	//所有的任务都放到任务池中
	private static Map<String,Task> taskPool = new LinkedHashMap<String,Task>();
	private static ApplicationContext app;
	public ScheduleService(){}
	/**
	 * 创建一个调度任务
	 * @param m
	 * @return
	 * @throws Exception
	 */
	private Task createTask(Method m) throws Exception{
		//任务ID就是时间戳,再加上一个随机数
		Task task = new Task("" + System.currentTimeMillis());
		//通过方法,可以获取到方法所在的class
		Class clazz = m.getDeclaringClass();
		//设置任务组
		task.setGroup(clazz.getName());
		//设置触发器信息
		//一个触发器对应一个方法,在此处,所谓的触发器还只是一个概念
		//概念是一个字符描述,它记录了触发的目标信息
		task.setTrigger(clazz.getName() + "." + m.getName());
		Annotation sc = m.getAnnotation(Scheduled.class);
		Method cronM = sc.getClass().getMethod("cron",null);
		String cron = cronM.invoke(sc, null).toString();
		task.setTriggerDesc(cronDesc.get(cron));
		task.setCron(cron);
		task.setCronDesc(cronDesc.get(cron));
		//此时,就已经完成一个task的封装
		return task;
	}
	@Override
	public void setApplicationContext(ApplicationContext app) throws BeansException {
		this.app = app;
		//加载所有任务到任务队列
 		for(String name : app.getBeanDefinitionNames()){
			try{
				Class<?> c = app.getBean(name).getClass();
				for(Method m : c.getMethods()){
					//只要是添加了Scheduled注解的都给他添加到调度工厂中
					if(m.isAnnotationPresent(Scheduled.class)){
						Task task = createTask(m);
						//将任务加入队列准备启动
						createTask(task);
					}
				}
			}catch(Exception e){
				continue;
			}
		}
	}
	public Task getTask(String taskId) {
		return taskPool.get(taskId);
	}
	@Override
	public List<Task> getAllTask() {
		if(taskPool.size() == 0){
			return new ArrayList<Task>();
		}
		List<Task> r = new ArrayList<Task>();
		r.addAll(taskPool.values());
		return r;
	}
	private Task createTask(Task task) throws Exception{
		if(null == task.getGroup() || task.getGroup().trim().length() == 0){ return null; }
		//还是拿到字节码
		Class<?> clazz = Class.forName(task.getGroup());
		//先从容器中获取
		Object target = null;
		try{
			//从Spring容器中提取已经创建好的对象引用
			target = app.getBean(clazz);
		}catch(Exception e){
		}
		//如果Spring容器没有帮我们创建,那么就自己创建实例
		if(target == null){
			target = clazz.newInstance();
		}
		//把触发器需要调用的方法找出来,还是用反射
		Method m = clazz.getMethod(task.getTrigger().replaceAll(task.getGroup() + ".", ""));
		//把任务ID取出来,时间戳
		String taskId = task.getId();
		//================ 事前准备  ====================
		//拿到Quartz中的调度器
		Scheduler sched = schedulerFactory.getScheduler();
		//创建一个Detail
        JobDetail taskDetail = new JobDetail(taskId, task.getGroup(), TriggerProxy.class);// 任务名,任务组,任务执行类  
        // 触发器
        CronTrigger trigger = new CronTrigger(taskId, task.getTrigger());// 触发器名,触发器组 
        //在这里设置CronExpression表达式
        trigger.setCronExpression(task.getCron());// 触发器时间设定  
        //JobDataMap   用来存储附加信息
        //利用这么一个API,把自定义的信息添加到Map中
        trigger.getJobDataMap().put(TriggerProxy.DATA_TARGET_KEY, target);
        trigger.getJobDataMap().put(TriggerProxy.DATA_TRIGGER_KEY, m);
//        m.getParameterTypes()
        trigger.getJobDataMap().put(TriggerProxy.DATA_TRIGGER_PARAMS_KEY, new Object[]{});
        trigger.getJobDataMap().put(TriggerProxy.DATA_TASK_KEY, task);
        sched.scheduleJob(taskDetail, trigger);
        // 如果这个任务没有被主动关闭,我们就给他启动
        if (!sched.isShutdown()) {  
            sched.start();  
        }
        //放入我们的任务池
        if(!taskPool.containsKey(taskId)){
        	taskPool.put(taskId, task);
		}
        return task;
	}
	/** 
     * 添加一个定时任务,使用默认的任务组名,触发器名,触发器组名 
     */  
    public Task createTask(String taskName,String taskClassName,String triggerName,String cron) throws Exception{
    	return createTask(taskName,null,taskClassName,null,triggerName,cron);
    }
    /** 
     * 添加一个定时任务 
     */  
    private Task createTask(String taskName, String taskGroupName, String taskClassName,String triggerGroupName, String triggerName,String cron)  throws Exception{  
    	//根据类名,利用反射机制获取到类的字节码
    	//约定优于配置
    	Class<?> clazz = Class.forName(taskClassName); //就是类名全程,包名.类名
    	Method m = clazz.getMethod(triggerName);		//显然就是方法名
    	Task task = createTask(m);
    	task.setName(taskName);
    	if(null != taskGroupName){
    		task.setGroup(taskGroupName);
    	}
    	task.setCron(cron);
    	return createTask(task);
    }
    /** 
     * 修改一个任务的触发时间(使用默认的任务组名,触发器名,触发器组名) 
     *  
     */  
    public Task modifyTaskCron(String taskId, String cron) {  
    	Task task = taskPool.get(taskId);
        try {
            Scheduler sched = schedulerFactory.getScheduler();  
            CronTrigger trigger = (CronTrigger) sched.getTrigger(taskId,task.getTrigger());  
            String oldTime = trigger.getCronExpression();  
            if (!oldTime.equalsIgnoreCase(cron)) {  
                JobDetail taskDetail = sched.getJobDetail(taskId,task.getGroup());  
                Class objJobClass = taskDetail.getJobClass();
                removeTask(taskId);
                //重新生成ID
                task.setId("" + System.currentTimeMillis());
                task.setCron(cron);
                createTask(task);
            }  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }
        return task;
    }
    /** 
     * 移除一个任务(使用默认的任务组名,触发器名,触发器组名) 
     *  
     */  
    public Task removeTask(String taskId) {  
    		Task task = taskPool.get(taskId);
        try {
            Scheduler sched = schedulerFactory.getScheduler();  
            sched.pauseTrigger(taskId, task.getTrigger());// 停止触发器  
            sched.unscheduleJob(taskId, task.getGroup());// 移除触发器  
            sched.deleteJob(taskId, task.getGroup());// 删除任务 
            taskPool.remove(taskId);
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }
        return task;
    }
    /**
     * 暂停任务
     * @param taskId
     */
    public Task pauseTask(String taskId){
    		Task task = taskPool.get(taskId);
    		try {
            Scheduler sched = schedulerFactory.getScheduler();  
            sched.pauseTrigger(task.getId(), task.getTrigger());// 停止触发器  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }
    		return task;
    }
    /** 
     * 
     *  
     */
    public Task restartTask(String taskId) {
    		Task task = taskPool.get(taskId);
		try {
			Scheduler sched = schedulerFactory.getScheduler();  
		      // 重启触发器  
			sched.resumeTrigger(task.getId(),task.getTrigger());  
		} catch (Exception e) {  
			throw new RuntimeException(e);  
		} 
        return task;
    }
    /**
     * 关闭任务
     * @param taskId
     */
    public Task shutdownTask(String taskId){
    		Task task = taskPool.get(taskId);
    	 	try {
             Scheduler sched = schedulerFactory.getScheduler();  
             sched.pauseTrigger(taskId, task.getTrigger());// 停止触发器  
             sched.unscheduleJob(taskId, task.getGroup());// 移除触发器  
             sched.deleteJob(taskId, task.getGroup());// 删除任务 
         } catch (Exception e) {  
             throw new RuntimeException(e);  
         }
    	 	return task;
    }
    /** 
     * 启动所有定时任务 
     *  
     */
    public void startAllTask() {  
        try {  
            Scheduler sched = schedulerFactory.getScheduler();  
            sched.start();  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }
    }
    /** 
     * 关闭所有定时任务 
     */  
    public void shutdownAllTask() {  
        try {  
            Scheduler sched = schedulerFactory.getScheduler();  
            if (!sched.isShutdown()) {  
                sched.shutdown();  
            }  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        } 
    }
}

到此这篇关于Spring+Quartz实现动态任务调度详解的文章就介绍到这了,更多相关Spring+Quartz动态任务内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文