spring event 事件异步处理方式(发布,监听,异步处理)
作者:qq_34097912
spring event 事件异步处理(发布,监听,异步处理)
// 定义事件 public class EventDemo extends ApplicationEvent { private String supplierCode; private String productCode; public EventDemo(Object source, String supplierCode, String productCode) { super(source); this.supplierCode = supplierCode; this.productCode = productCode; } public String getSupplierCode() { return supplierCode; } public String getProductCode() { return productCode; } }
// 发布事件 @Component public class EventDemoPublish { @Autowired private ApplicationEventPublisher applicationEventPublisher; public void publish(String message) { EventDemo demo = new EventDemo(this, message); applicationEventPublisher.publishEvent(demo); System.out.println("发布事件执行结束"); } }
// 监听事件 @Component public class EventDemoListener implements ApplicationListener<EventDemo> { @Override public void onApplicationEvent(EventDemo event) { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("事件监听开始...... " + "商家编码:" + event.getSupplierCode() + ",商品编码:" + event.getProductCode()); } }
<!--定义事件异步处理--> <bean id="commonTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。 并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 --> <property name="corePoolSize" value="2" /> <!-- 最大线程池容量 --> <property name="maxPoolSize" value="2" /> <!-- 超过最大线程池容量后,允许的线程队列数 --> <property name="queueCapacity" value="2" /> <!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 --> <property name="keepAliveSeconds" value="1000" /> <!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 --> <property name="allowCoreThreadTimeOut" value="true" /> <!-- 线程池对拒绝任务(无线程可用)的处理策略 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> <!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 --> <!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 --> </property> </bean> <!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个--> <bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster"> <!--注入任务执行器 这样就实现了异步调用--> <property name="taskExecutor" ref="commonTaskExecutor"></property> </bean>
spring事件之异步线程执行
Spring 不仅为我们提供了IOC
, AOP
功能外,还在这个基础上提供了许多的功能,我们用的最多的可能就是Spring MVC
了吧,但是让我们来看下spring-context
包,其中包含了缓存、调度、校验功能等等
这里主要想介绍一下Spring提供的观察者模式实现(事件发布监听)及异步方法执行,这些功能也都是基于AOP实现的
Spring 事件
观察者模式大家都了解,它可以解耦各个功能,但是自己实现的话比较麻烦,Spring为我们提供了一种事件发布机制,可以按需要发布事件,之后由监听此事件的类或方法来执行各自对应的功能,代码互相不影响,以后修改订单后续的逻辑时不会影响到订单创建,有点类似于使用MQ的感觉~
比如在配置中心apollo项目中,在portal创建了app后会发送app创建事件,监听此事件的逻辑处将此消息同步到各个环境的admin sevice中,大家有兴趣可以看下相关代码
现在我们来看看具体如何使用:假设一个下单场景,订单创建成功后可能有一些后续逻辑要处理,但是和创建订单本身没有关系,此时就可以在创建订单完成后,发送一个消息,又相应部分的代码进行监听处理,避免代码耦合到一起
首先创建对应的事件
import org.springframework.context.ApplicationEvent; public class CreatedOrderEvent extends ApplicationEvent { private final String orderSn; public CreatedOrderEvent(Object source, String orderSn) { super(source); this.orderSn = orderSn; } public String getOrderSn() { return this.orderSn; } }
现在还需要一个事件发布者和监听者,创建一下
发布
import org.springframework.context.ApplicationEventPublisher; private ApplicationEventPublisher applicationEventPublisher; applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));
监听的多种实现
1:注解实现 @EventListener
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @Slf4j @Component public class OrderEventListener { @EventListener public void orderEventListener(CreatedOrderEvent event) { } }
2:代码实现
import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationListener; @Slf4j @Component public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> { @Override public void onApplicationEvent(CreatedOrderEvent event) { } }
简单的事件发布就完成了,其中的其他复杂逻辑由Spring替我们处理了
这里我们要注意一点:发布和监听后处理的逻辑是在一个线程中执行的,不是异步执行
异步方法
有时候我们为了提高响应速度,有些方法可以异步去执行,一般情况下我们可能是手动将方法调用提交到线程池中去执行,但是Spring 为我们提供了简化的写法,在开启了异步情况下,不用修改代码,只使用注解即可完成此功能
这时只需要在要异步执行的方法上添加@Async
注解即可异步执行;@EnableAsync
启动异步线程, 如
import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.stereotype.Component; @Slf4j @Component @EnableAsync public class OrderEventListener { @Async @EventListener public void orderEventListener(CreatedOrderEvent event) { } }
在使用@Async会有一些问题建议看各位看下相关文档及源码
我们通过Spring事件同步线程改为异步线程,默认的线程池是不复用线程
我觉得这是这个注解最坑的地方,没有之一!我们来看看它默认使用的线程池是哪个,在前文的源码分析中,我们可以看到决定要使用线程池的方法是
org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor
其源码如下:
protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; // 可以在@Async注解中配置线程池的名字 String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { // 获取默认的线程池 targetExecutor = this.defaultExecutor.get(); } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
最终会调用到
org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor
这个方法中
protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
可以看到,它默认使用的线程池是SimpleAsyncTaskExecutor
。我们不看这个类的源码,只看它上面的文档注释,如下:
主要说了三点
- 为每个任务新起一个线程
- 默认线程数不做限制
- 不复用线程
就这三点,你还敢用吗?只要你的任务耗时长一点,说不定服务器就给你来个OOM
。
解决方案
最好的办法就是使用自定义的线程池,主要有这么几种配置方法
1.在之前的源码分析中,我们可以知道,可以通过AsyncConfigurer
来配置使用的线程池
如下:
import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.lang.NonNull; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; /** * 异步线程池配置 */ @Slf4j @Component public class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor(); executor.setCorePoolSize(5); executor.setMaxPoolSize(200); executor.setKeepAliveSeconds(5 * 60); executor.setQueueCapacity(1000); // 自定义实现拒绝策略 executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("当前任务线程池队列已满.")); // 或者选择已经定义好的其中一种拒绝策略 // 丢弃任务并抛出RejectedExecutionException异常 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 丢弃任务,但是不抛出异常 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 由调用线程处理该任务 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // 线程名称前缀 executor.setThreadNamePrefix("Async-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("线程池执行任务发生未知异常.", ex); } /** * 增加日志MDC */ public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { /** * Gets context for task * * * @return context for task */ private Map<String, String> getContextForTask() { return MDC.getCopyOfContextMap(); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.) * all delegate to this. */ @Override public void execute(@NonNull Runnable command) { super.execute(wrap(command, getContextForTask())); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @NonNull @Override public Future<?> submit(@NonNull Runnable task) { return super.submit(wrap(task, getContextForTask())); } /** * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.) * all delegate to this. */ @NonNull @Override public <T> Future<T> submit(@NonNull Callable<T> task) { return super.submit(wrap(task, getContextForTask())); } /** * Wrap callable * * @param <T> parameter * @param task task * @param context context * @return the callable */ private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) { return () -> { Map<String, String> previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { return task.call(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } /** * Wrap runnable * * @param runnable runnable * @param context context * @return the runnable */ private Runnable wrap(final Runnable runnable, final Map<String, String> context) { return () -> { Map<String, String> previous = MDC.getCopyOfContextMap(); if (context == null) { MDC.clear(); } else { MDC.setContextMap(context); } try { runnable.run(); } finally { if (previous == null) { MDC.clear(); } else { MDC.setContextMap(previous); } } }; } } }
该方式实现线程的复用以及,子线程继承父线程全链路traceId,方便定位问题
2.直接在@Async注解中配置要使用的线程池的名称
@Async(value = "自定义线程名")
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。