Spring定时任务并行(异步)处理方式
作者:BlueKitty1210
这篇文章主要介绍了Spring定时任务并行(异步)处理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
Spring定时任务并行(异步)
最近项目中遇到一个问题 , 在SpringBoot中设置了定时任务之后 , 在某个点总是没有执行 . 经过搜索研究发现 , spring 定时器任务scheduled-tasks默认配置是单线程串行执行的 .
即在当前时间点之内 . 如果同时有两个定时任务需要执行的时候 , 排在第二个的任务就必须等待第一个任务执行完毕执行才能正常运行.
如果第一个任务耗时较久的话 , 就会造成第二个任务不能及时执行 .
这样就可能由于时效性造成其他问题 . 而在实际项目中 , 我们也往往需要这些定时任务是"各干各的" , 而不是排队执行.
以下为默认串行的定时任务代码
package com.xbz.timerTask.task; import java.text.SimpleDateFormat; import java.util.Date; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; /** * @title 测试spring定时任务执行 * @createDate 2017年8月18日 * @version 1.0 */ @Component @Configuration @EnableScheduling public class MyTestTask { private static final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Scheduled(fixedDelay = 1000) public void executeUpdateYqTask() { System.out.println(Thread.currentThread().getName() + " >>> task one " + format.format(new Date())); } @Scheduled(fixedDelay = 1000) public void executeRepaymentTask() throws InterruptedException { System.out.println(Thread.currentThread().getName() + " >>> task two " + format.format(new Date())); Thread.sleep(5000); } }
启动项目之后 , 发现控制台输出如下 :
可以发现 , 一直是pool-5-thread-1一个线程在执行定时任务 , 这显然不符合我们的业务需求.
如何把定时任务改造成异步呢 , 在spring中网上文档较多 , 不再叙述 . 但在SpringBoot找到的相关资料也是新建xml文件的方式配置 , 实际上这就违背了SpringBoot减少配置文件的初衷 .
在SpringBoot可以自定义以下线程池配置
package com.xbz.config; @Configuration @EnableScheduling public class ScheduleConfig implements SchedulingConfigurer, AsyncConfigurer{ /** 异步处理 */ public void configureTasks(ScheduledTaskRegistrar taskRegistrar){ TaskScheduler taskScheduler = taskScheduler(); taskRegistrar.setTaskScheduler(taskScheduler); } /** 定时任务多线程处理 */ @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskScheduler(){ ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(20); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } /** 异步处理 */ public Executor getAsyncExecutor(){ Executor executor = taskScheduler(); return executor; } /** 异步处理 异常 */ public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler(){ return new SimpleAsyncUncaughtExceptionHandler(); } }
此时再启动定时任务 , 就发现已经是异步处理的了 .
如果项目中同时配置了异步任务的线程池和定时任务的异步线程处理
配置类如下 :
package com.xbz.config; import java.lang.reflect.Method; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.scheduling.config.ScheduledTaskRegistrar; /** * @title 使用自定义的线程池执行异步任务 , 并设置定时任务的异步处理 * @version 1.0 */ @Configuration @EnableAsync @EnableScheduling public class ExecutorConfig implements SchedulingConfigurer, AsyncConfigurer { private static final Logger LOG = LogManager.getLogger(ExecutorConfig.class.getName()); @Autowired private TaskThreadPoolConfig config; @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(config.getCorePoolSize()); executor.setMaxPoolSize(config.getMaxPoolSize()); executor.setQueueCapacity(config.getQueueCapacity()); executor.setKeepAliveSeconds(config.getKeepAliveSeconds()); executor.setThreadNamePrefix("taskExecutor-"); // rejection-policy:当pool已经达到max size的时候,如何处理新任务 // CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * @title 异步任务中异常处理 * @description * @author Xingbz * @createDate 2017年9月11日 * @return * @see org.springframework.scheduling.annotation.AsyncConfigurer#getAsyncUncaughtExceptionHandler() * @version 1.0 */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable ex, Method method, Object... params) { LOG.error("==========================" + ex.getMessage() + "=======================", ex); LOG.error("exception method:" + method.getName()); } }; } @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { TaskScheduler taskScheduler = taskScheduler(); taskRegistrar.setTaskScheduler(taskScheduler); } /** * 并行任务使用策略:多线程处理 * * @return ThreadPoolTaskScheduler 线程池 */ @Bean(destroyMethod = "shutdown") public ThreadPoolTaskScheduler taskScheduler() { ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); scheduler.setPoolSize(config.getCorePoolSize()); scheduler.setThreadNamePrefix("task-"); scheduler.setAwaitTerminationSeconds(60); scheduler.setWaitForTasksToCompleteOnShutdown(true); return scheduler; } }
需要注意:
- 这两个配置类只能同时配置一个
- 如果配置了第二个 , 则第一个就无需再用
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。