SpringBoot实现异步事件Event详解
作者:木棉软糖
SpringBoot实现异步事件
为什么需要用到Spring Event?
我简单说一个场景,大家都能明白: 你在公司内部,写好了一个用户注册的功能
然后产品经理根据公司情况,新增以下需求
- 注册新用户,给新用户发邮件
- 发放新用户优惠券
public void registerUser(AddUserRequest request){ //插入用户 userService.insertUser(request); }
实现需求后:
public void registerUser(AddUserRequest request){ //插入用户 User user = convertToUser(request) userService.insertUser(user); //发邮件 sendEmail(user); //发放优惠券 sendCouponToUser(user); }
这样正常写的话,会有以下缺点:
- 发邮件方法里面,如果邮件服务出现问题,就会影响到注册用户的核心业务,无论发邮件成不成功,都不应影响注册用户
- 发放优惠券,产品经理会根据市场需求要求你反复去掉删除,要是没有一些措施,很容易被产品经理"耍猴",而且反复改代码会导致功能不稳定。
更理论的话来说,就是把一些次要的功能耦合到核心功能里面,且经常调整,会导致核心功能不稳定
解决方案: 将发放优惠券,发送邮件做成单独的服务A和B。 注册业务在注册用户成功后,发布一个"注册成功"的消息。
服务A和服务B相当于一个监听者,都监听**"注册成功"的消息**,监听到后,服务A和B就各自做自己的事情了。 服务A和服务B不需要关心到底是谁,哪个地方发出了这个消息,它只需要监听此消息并做出反应。
这种方式的好处是:
- 如果不想要发放优惠券的功能,直接把服务A的代码去掉就好了,而且由于跟注册用户解耦,可以不用担心影响到注册功能。
- 如果想要做更多的次要业务,例如注册时发短信通知,可以增加一个服务C监听**"注册成功"的消息**,然后服务C进行自己的服务就行。不需要更改注册用户的代码。
上面这种模式就是事件模式。
Spring Event 的使用
注解方式实现
我用注解的方式去实现Spring Event的使用 事件对象:
@Data public class RegisterUserEvent { /** * 用户id */ private Integer userId; /** * 用户名 */ private String userName; }
接口:
@RestController @Api(tags="测试前端控制器") @RequiredArgsConstructor public class TestController { private final TestService testService; @ApiOperation(value="模拟注册用户功能的发送事件", notes="\n 开发者:") @PostMapping("/sendEvent") public JsonResult sendEvent(){ testService.sendEvent(); return JsonResult.success(); } }
注册功能:
/** * @author zhengbingyuan * @date 2023/2/6 */ @Slf4j @Service @RequiredArgsConstructor public class TestService { private final ApplicationEventPublisher eventPublisher; /** * 模拟一个注册用户的功能 */ @Transactional(rollbackFor = Exception.class) public void sendEvent() { log.info("开始注册用户...."); UserDto dto = saveUser(); RegisterUserEvent userEvent = new RegisterUserEvent(); userEvent.setUserId(dto.getId()); userEvent.setUserName(dto.getUserName()); eventPublisher.publishEvent(userEvent); } private UserDto saveUser() { int id = 1; String userName = "超人"; log.info("保存用户id: {},name:{}",id,userName); UserDto dto = new UserDto(); dto.setId(id); dto.setUserName(userName); return dto; } }
次要业务的事件监听:
/** * @author zhengbingyuan * @date 2023/2/6 */ @Slf4j @Component public class RegisterUserEventListener { @EventListener public void processSendCouponToUser(RegisterUserEvent event){ log.info("发放优惠券给用户:{}",event.getUserName()); } @EventListener public void processSendEmailToUser(RegisterUserEvent event){ log.info("发放邮件给用户:{}",event.getUserName()); } }
结果:
2023-02-06 16:47:30,228:INFO http-nio-8083-exec-2 [] (TestService.java:28) - 开始注册用户....
2023-02-06 16:47:30,229:INFO http-nio-8083-exec-2 [] (TestService.java:40) - 保存用户id: 1,name:超人
2023-02-06 16:47:30,232:INFO http-nio-8083-exec-2 [] (RegisterUserEventListener.java:17) - 发放优惠券给用户:超人
2023-02-06 16:47:30,232:INFO http-nio-8083-exec-2 [] (RegisterUserEventListener.java:23) - 发放邮件给用户:超人
小结
上面将注册的主要逻辑(用户信息落库)和次要的业务逻辑(发送邮件)通过事件的方式解耦了。次要的业务做成了可插拔的方式,比如不想发送邮件了,只需要将邮件监听器上面的@Component注释就可以了,非常方便扩展。
Spring Event异步模式
对于上面的程序,如果发送邮件出现异常的话,根据实践,整个注册功能会受到影响,也就是上面的程序仅只实现了代码可拔插的效果。 如果将发送邮件这一个功能完全解耦出来,还需要做成异步事件模式。
先看看事件监听器是怎么实现的 在注解方式的publishEvent方法底层,会通过getApplicationEventMulticaster().multicastEvent(event)来派发事件。这个getApplicationEventMulticaster()获得的对象是SimpleApplicationEventMulticaster。
SimpleApplicationEventMulticaster 里面有一个taskExecutor 的线程池,如果这个线程池不是null,那么将会使用这个线程池去消费事件消息。
@Override public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event)); Executor executor = getTaskExecutor(); for (ApplicationListener<?> listener : getApplicationListeners(event, type)) { if (executor != null) { //线程池调用 executor.execute(() -> invokeListener(listener, event)); } else { //直接调用 invokeListener(listener, event); } } }
所以,只要让executor 不为null,就能使用异步事件了。但是默认情况下executor是空的,此时需要我们来给其设置一个值。
怎么设置这个值,这需要看回去ApplicationEventMulticaster是怎么初始化的,这个对象是在AbstractApplication.refresh()中的initApplicationEventMulticaster()方法执行。
protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = getBeanFactory(); if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) { this.applicationEventMulticaster = beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class); if (logger.isTraceEnabled()) { logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster); if (logger.isTraceEnabled()) { logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " + "[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); } } }
通过初始化方法,可以得知,只要存在name = "applicationEventMulticaster" 的bean,那么就不会创建SimpleApplicationEventMulticaster 实例。 换句话说,只要开发者在配置类,提供一个设置好taskExecutor的SimpleApplicationEventMulticaster 就可以使用异步事件了。
/** * @author zhengbingyuan * @date 2023/2/6 */ @Configuration @RequiredArgsConstructor public class AsyncEventConfiguration { @Bean public SimpleApplicationEventMulticaster applicationEventMulticaster(BeanFactory beanFactory) { SimpleApplicationEventMulticaster applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); //设置线程池 applicationEventMulticaster.setTaskExecutor(eventExecutor()); return applicationEventMulticaster; } @Bean public TaskExecutor eventExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); //核心线程数 int corePoolSize = 5; threadPoolTaskExecutor.setCorePoolSize(corePoolSize); //最大线程数 int maxPoolSize = 10; threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize); //队列容量 int queueCapacity = 10; threadPoolTaskExecutor.setQueueCapacity(queueCapacity); //拒绝策略 threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); //线程名前缀 String threadNamePrefix = "eventExecutor-"; threadPoolTaskExecutor.setThreadNamePrefix(threadNamePrefix); threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); // 使用自定义的跨线程的请求级别线程工厂类19 int awaitTerminationSeconds = 5; threadPoolTaskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds); threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } }
继续使用上面所说的例子,由于我log日志有加线程前缀,这里就不用加线程阻塞手段去测试了。
结果:可以看出,次要业务和核心业务已经是发生在不同的线程上了
2023-02-06 18:22:19,865:INFO http-nio-8083-exec-2 [] (TestService.java:28) - 开始注册用户....
2023-02-06 18:22:19,866:INFO http-nio-8083-exec-2 [] (TestService.java:41) - 保存用户id: 1,name:超人
2023-02-06 18:22:19,866:INFO http-nio-8083-exec-2 [] (TestService.java:35) - 注册用户完成
2023-02-06 18:22:19,866:INFO eventExecutor-3 [] (RegisterUserEventListener.java:17) - 发放优惠券给用户:超人
2023-02-06 18:22:19,866:INFO eventExecutor-7 [] (RegisterUserEventListener.java:23) - 发放邮件给用户:超人
小结: 异步线程的使用,在次要业务代码可拔插的情况下,进一步解耦,即使次要业务出问题,也不影响核心业务。
事件使用建议
异步事件的模式,通常将一些非主要的业务放在监听器中执行,因为监听器中存在失败的风险,所以使用的时候需要注意。
如果只是为了解耦,但是被解耦的次要业务也是必须要成功的,可以使用消息中间件的方式(落地+重试机制)来解决这些问题。
到此这篇关于SpringBoot实现异步事件Event详解的文章就介绍到这了,更多相关SpringBoot实现异步事件内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!