SpringBoot Event 事件如何实现异步延迟执行
作者:peak_paradise
SpringBoot Event 事件实现异步延迟执行
Spring的事件(Application Event)非常好用,虽然有一点会出现代码污染,但是在做不使用其他框架来做异步的情况先,还是非常方便的。
使用它只需要三样东西
- 自定义事件:继承 ApplicationEvent,创建一个你想传的数据的对象,会在监听器那边收到该对象。
- 定义监听器,实现 ApplicationListener 或者通过 @EventListener 注解到方法上,两种方式都行,但是推荐使用@EventListener,只要参数是你写的继承ApplicationEvent的对象,就会自动找到执行方法。
- 定义发布者,通过 ApplicationEventPublisher,自带的bean,不需要单独声明,直接@Autowired就能使用,主要只需要publishEvent方法。
但是有时候我需要做延时执行,自带的功能缺不支持,但是我发现ApplicationEvent对象里面有两个成员变量,source和timestamp,构造函数(@since 5.3.8)也提供了同时注入这两个变量数据。
/** * Create a new {@code ApplicationEvent} with its {@link #getTimestamp() timestamp} * set to the value returned by {@link Clock#millis()} in the provided {@link Clock}. * <p>This constructor is typically used in testing scenarios. * @param source the object on which the event initially occurred or with * which the event is associated (never {@code null}) * @param clock a clock which will provide the timestamp * @since 5.3.8 * @see #ApplicationEvent(Object) */ public ApplicationEvent(Object source, Clock clock) { super(source); this.timestamp = clock.millis(); }
但是,看了说明timestamp只是标志执行的时间,并不是为了延迟执行,可惜了。
于是查了一些资料,找到java.util.concurrent.DelayQueue对象,JDK自带了延迟的队列对象,我们可以考虑利用自带的timestamp和延迟队列DelayQueue结合一起来实现,具体DelayQueue的使用请自行查询,非常的简单。
首先,继承的ApplicationEvent重新实现一下。
不单单要继承ApplicationEvent,还需要实现Delayed,主要是因为DelayQueue队列中必须是Delayed的实现类
import java.time.Clock; import java.time.Duration; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import org.springframework.context.ApplicationEvent; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.extern.slf4j.Slf4j; @Data @EqualsAndHashCode(callSuper = false) public class ApplicationDelayedEvent extends ApplicationEvent implements Delayed { private static final long serialVersionUID = 1L; public ApplicationDelayedEvent(Object source) { this(source, 0L); } public ApplicationDelayedEvent(Object source, long delaySeconds) { super(source, Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(delaySeconds))); } @Override public int compareTo(Delayed o) { // 最好用NANOSECONDS,更精确,但是用处不大 long delta = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS); return (int) delta; } @Override public long getDelay(TimeUnit unit) { // 最好用NANOSECONDS,更精确,但是用处不大,负数也会认为到时间了 long millis = this.getTimestamp(); long currentTimeMillis = System.currentTimeMillis(); long sourceDuration = millis - currentTimeMillis; return unit.convert(sourceDuration, unit); } }
多了两个必须实现的方法,compareTo是排序,应该是队列中的顺序。
getDelay是主要的方法,目的是归0的时候会从DelayQueue释放出来,当然那必须是NANOSECONDS级别的,我使用MILLISECONDS,就会出现负数,但也是可以的,也能释放出来。
另一个需要改的就是发布者,所以重新写一个ApplicationDelayEventPublisher
import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.ApplicationEventPublisher; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import lombok.extern.slf4j.Slf4j; @Slf4j @Component public class ApplicationDelayEventPublisher implements ApplicationRunner { // ApplicationDelayedEvent需要import进来 private DelayQueue<ApplicationDelayedEvent> delayQueue = new DelayQueue<>(); @Autowired private ApplicationEventPublisher eventPublisher; @Autowired @Qualifier("watchTaskExecutor") private ThreadPoolTaskExecutor poolTaskExecutor; public void publishEvent(ApplicationDelayedEvent event) { boolean result = delayQueue.offer(event); log.info("加入延迟队列。。。。{}", result); } @Override public void run(ApplicationArguments args) throws Exception { poolTaskExecutor.execute(() -> watchThread()); } private void watchThread() { while (true) { try { log.info("启动延时任务的监听线程。。。。"); ApplicationDelayedEvent event = this.delayQueue.take(); log.info("接收到延时任务执行。。。{}", ZonedDateTime.now().format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); eventPublisher.publishEvent(event); } catch (InterruptedException e) { log.info("启动延时任务的监听线程关闭"); this.delayQueue.clear(); break; } } } }
需要实现ApplicationRunner作为Spring boot的启动时候运行的bean,目的就是开启监听线程,有事件到了执行时间take方法会得到数据,然后调用Spring原生的事件发布。
另外特别说明的就是监听线程不能随便创建,脱离了Spring容器的线程池会造成关闭服务的时候造成无法关闭的现象,所以建议还是自定义一个ThreadPoolTaskExecutor
@Bean public ThreadPoolTaskExecutor watchTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("watch_task_"); // 线程池对拒绝任务的处理策略 // ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 // ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 // ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。 // ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; }
最后就是接收事件,跟传统的接收是一样的,异步只需要在配置类上加上@EnableAsync注解就行了,然后在监听的方法上加@Async
import java.util.concurrent.ThreadPoolExecutor; import javax.annotation.PostConstruct; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import lombok.extern.slf4j.Slf4j; @Slf4j @Configuration @EnableAsync @ConditionalOnClass(ApplicationDelayEventPublisher.class) public class DelayEventConfiguration { @PostConstruct public void init() { log.info("延迟Spring事件模块启动中。。。"); } // 不能和监听线程放到一个线程池,不然无法执行 @Bean public ThreadPoolTaskExecutor poolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3); executor.setMaxPoolSize(50); executor.setQueueCapacity(10000); executor.setKeepAliveSeconds(30); executor.setThreadNamePrefix("my_task_"); // 线程池对拒绝任务的处理策略 // ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 // ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 // ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。 // ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 初始化 executor.initialize(); return executor; } @Bean public ThreadPoolTaskExecutor watchTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(1); executor.setMaxPoolSize(1); executor.setQueueCapacity(1); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("watch_task_"); // 线程池对拒绝任务的处理策略 // ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常 // ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。 // ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面。 // ThreadPoolExecutor.CallerRunsPolicy:由调用者处理该任务 。 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 初始化 executor.initialize(); return executor; } } @Async("poolTaskExecutor") @EventListener public void listenDelayEvent(ApplicationDelayedEvent event) { log.info("收到执行事件:{}", event.getSource()); }
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。