Spring源码之事件监听机制(实现EventListener接口方式)
作者:it_lihongmin
一、Spring实现自定义事件的发布订阅
1、事件定义
/** * 定义事件类型 * * @author lihongmin * @date 2019/11/3 20:30 */ public class OrderEvent extends ApplicationEvent { public OrderEvent(Object source) { super(source); } }
2、事件监听(泛型)
/** * 订单事件监听 * @author lihongmin * @date 2019/11/3 20:33 */ @Component public class OrderEventListener implements ApplicationListener<OrderEvent> { @Override public void onApplicationEvent(OrderEvent orderEvent) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("我受到了一个事件:" + orderEvent.getSource()); } }
3、模拟事件发送
/** * 事件触发模拟 * * 我受到了一个事件:我发布了事件!!! * 我执行完毕了!!! * * @author lihongmin * @date 2019/11/3 20:35 */ @Controller public class OrderEventController implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @GetMapping("publishOrderEvent") public String publishOrderEvent() { applicationContext.publishEvent(new OrderEvent("我发布了事件!!!")); System.out.println("我执行完毕了!!!"); return "发送事件了!"; } }
4、启动项目,调用 127.0.0.1:8080/publishOrderEvent
我受到了一个事件:我发布了事件!!!
我执行完毕了!!!
总结:事件发送非常的简单,一个事件类型,一个监听,一个触发机制。并且该事件为同步机制(后续在Spring Boot中可以方便切换为异步)。
二、Spring事件驱动原理分析(Spring版本为5.1.7)
1、ApplicationContext委派ApplicationEventPublisher发送事件
我们调用的是 ApplicationContext的
publishEvent(new OrderEvent("我发布了事件!!!"));
查看ApplicationContext 结构,发现调用的是父类 ApplicationEventPublisher的接口
如下:
public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory, HierarchicalBeanFactory, MessageSource, ApplicationEventPublisher, ResourcePatternResolver { @Nullable String getId(); String getApplicationName(); String getDisplayName(); long getStartupDate(); @Nullable ApplicationContext getParent(); AutowireCapableBeanFactory getAutowireCapableBeanFactory() throws IllegalStateException; }
public interface ApplicationEventPublisher { default void publishEvent(ApplicationEvent event) { this.publishEvent((Object)event); } void publishEvent(Object var1); }
那么就是其子类 AbstractApplicationContext 实现的发送操作
public void publishEvent(Object event) { this.publishEvent(event, (ResolvableType)null); } protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null"); Object applicationEvent; if (event instanceof ApplicationEvent) { applicationEvent = (ApplicationEvent)event; } else { applicationEvent = new PayloadApplicationEvent(this, event); if (eventType == null) { eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType(); } } if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); } if (this.parent != null) { if (this.parent instanceof AbstractApplicationContext) { ((AbstractApplicationContext)this.parent).publishEvent(event, eventType); } else { this.parent.publishEvent(event); } } }
发现执行到
getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);
那么其实这里算是一个委派模式了(个人认为),spring容器将发送事件委派给 AbstractApplicationContext的ApplicationEventMulticaster applicationEventMulticaster对象。
2、ApplicationEventMutulcaster类型的确认和初始化
不难发现(或者对Spring ApplicationContext比较熟悉的话)是项目启动时,不同类型的ApplicationContext(如:ClassPathXmlApplicationContext)
在调用父类 AbstractApplicationContext的refresh方法(之前分析过是一个模板方法)时, initApplicationEventMulticaster()
如下:
protected void initApplicationEventMulticaster() { ConfigurableListableBeanFactory beanFactory = this.getBeanFactory(); if (beanFactory.containsLocalBean("applicationEventMulticaster")) { this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class); if (this.logger.isTraceEnabled()) { this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]"); } } else { this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster); if (this.logger.isTraceEnabled()) { this.logger.trace("No 'applicationEventMulticaster' bean, using [" + this.applicationEventMulticaster.getClass().getSimpleName() + "]"); } } }
逻辑比较简单,在BeanFactory中获取名称为 applicationEventMulticaster的Bean,当然如果我们没有自定义并且注册为该名称的Bean,肯定是获取不到的。
那么会new一个 SimpleApplicationEventMulticaster类型的bean注册到容器中。
也就是说上面的getApplicationEventMulticaster()获取到的就是SimpleApplicationEventMulticaster。
但是还需要注意使用的是有参数构造进行初始化,如下:
public SimpleApplicationEventMulticaster(BeanFactory beanFactory) { this.setBeanFactory(beanFactory); }
在父类中实现:
public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; if (beanFactory instanceof ConfigurableBeanFactory) { ConfigurableBeanFactory cbf = (ConfigurableBeanFactory)beanFactory; if (this.beanClassLoader == null) { this.beanClassLoader = cbf.getBeanClassLoader(); } this.retrievalMutex = cbf.getSingletonMutex(); } }
获取bean工厂中所以的所以单例对象放入属性retrievalMutex 中,将类加载器也进行赋值,后续会用到。
3、SimpleApplicationEventMulticaster的发送事件方法
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event); Iterator var4 = this.getApplicationListeners(event, type).iterator(); while(var4.hasNext()) { ApplicationListener<?> listener = (ApplicationListener)var4.next(); Executor executor = this.getTaskExecutor(); if (executor != null) { executor.execute(() -> { this.invokeListener(listener, event); }); } else { this.invokeListener(listener, event); } } }
分析一下这个方法:
- 1)、获取或确认 ResolvableType 类型
- 2)、根据事件对象和ResolvableType 类型,获取订阅者列表
- 3)、发现如果 SimpleApplicationEventMulticaster对象的线程池属性 Executor taskExecutor不为null则异步执行监听方法。但是我们看到的是自己new了一个对象,所以如果想 事件监听使用线程池异步执行的话(自己想到应该可以这样玩,自己比较喜欢自定义线程参数,心里有数,当前一般还会设置线程池前缀名称):
@Component public class DesignpatternApplication implements BeanFactoryAware { private BeanFactory beanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.beanFactory = beanFactory; } @Bean("APPLICATION_EVENT_MULTICASTER_BEAN_NAME") public SimpleApplicationEventMulticaster init() { ThreadPoolExecutor MulticasterExecutor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy()); SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster(); multicaster.setTaskExecutor(MulticasterExecutor); multicaster.setBeanFactory(beanFactory); return multicaster; } }
- 4)、最后肯定是invokeListener(listener, event);
4、ResolvableType类型确认
首先我们传入的eventType是null,所以先根据我们传入的对象调用resolveDefaultEventType方法
如下:
private ResolvableType resolveDefaultEventType(ApplicationEvent event) { return ResolvableType.forInstance(event); }
再调用,肯定OrderEvent肯定没有实现ResolvableTypeProvider接口:
public static ResolvableType forInstance(Object instance) { Assert.notNull(instance, "Instance must not be null"); if (instance instanceof ResolvableTypeProvider) { ResolvableType type = ((ResolvableTypeProvider) instance).getResolvableType(); if (type != null) { return type; } } return ResolvableType.forClass(instance.getClass()); }
再调用:
public static ResolvableType forClass(@Nullable Class<?> clazz) { return new ResolvableType(clazz); }
所以我们或者到了一个新创建的 ResolvableType 对象,对象的clazz字段为我们的 OrderEvent。
为什么追这么深,是因为下面就是根据类型来获取监听器的。
5、获取所有的监听列表,并且看看是怎么做到监听泛型类型
protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) { Object source = event.getSource(); Class<?> sourceType = source != null ? source.getClass() : null; AbstractApplicationEventMulticaster.ListenerCacheKey cacheKey = new AbstractApplicationEventMulticaster.ListenerCacheKey(eventType, sourceType); AbstractApplicationEventMulticaster.ListenerRetriever retriever = (AbstractApplicationEventMulticaster.ListenerRetriever)this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); } else if (this.beanClassLoader == null || ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader))) { Object var7 = this.retrievalMutex; synchronized(this.retrievalMutex) { retriever = (AbstractApplicationEventMulticaster.ListenerRetriever)this.retrieverCache.get(cacheKey); if (retriever != null) { return retriever.getApplicationListeners(); } else { retriever = new AbstractApplicationEventMulticaster.ListenerRetriever(true); Collection<ApplicationListener<?>> listeners = this.retrieveApplicationListeners(eventType, sourceType, retriever); this.retrieverCache.put(cacheKey, retriever); return listeners; } } } else { return this.retrieveApplicationListeners(eventType, sourceType, (AbstractApplicationEventMulticaster.ListenerRetriever)null); } }
在自己的 ConcurrentHashMap类型的retrieverCache缓存中获取,key是根据 OrderEvent类型和我发送的数据源(当前为String类型)如下:
- Map的key:
private static final class ListenerCacheKey implements Comparable<AbstractApplicationEventMulticaster.ListenerCacheKey> { private final ResolvableType eventType; @Nullable private final Class<?> sourceType; // ..... }
- Map的value类型:
private class ListenerRetriever { public final Set<ApplicationListener<?>> applicationListeners = new LinkedHashSet(); public final Set<String> applicationListenerBeans = new LinkedHashSet(); private final boolean preFiltered; }
很清楚的结构,两个LinkedHashSet, 就是为了保证两个Set个数相同,并且顺序一一对应。用于存放当前的监听对象和监听的类型。
当前的缓存是在AbstractApplicationContext的refresh的registerBeanPostProcessors(注册所有的BeanPostProcess),的最后一步,注册了ApplicationListenerDetector类型。
并且在refresh的最后会将所有懒加载的Bean都初始化,则会将所有的实现了该接口的Bean放入容器中。
则重点是 retrieveApplicationListeners方法,比较长:
private Collection<ApplicationListener<?>> retrieveApplicationListeners(ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable AbstractApplicationEventMulticaster.ListenerRetriever retriever) { List<ApplicationListener<?>> allListeners = new ArrayList(); Object var7 = this.retrievalMutex; LinkedHashSet listeners; LinkedHashSet listenerBeans; synchronized(this.retrievalMutex) { listeners = new LinkedHashSet(this.defaultRetriever.applicationListeners); listenerBeans = new LinkedHashSet(this.defaultRetriever.applicationListenerBeans); } Iterator var14 = listeners.iterator(); while(var14.hasNext()) { ApplicationListener<?> listener = (ApplicationListener)var14.next(); if (this.supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { retriever.applicationListeners.add(listener); } allListeners.add(listener); } } if (!listenerBeans.isEmpty()) { BeanFactory beanFactory = this.getBeanFactory(); Iterator var16 = listenerBeans.iterator(); while(var16.hasNext()) { String listenerBeanName = (String)var16.next(); try { Class<?> listenerType = beanFactory.getType(listenerBeanName); if (listenerType == null || this.supportsEvent(listenerType, eventType)) { ApplicationListener<?> listener = (ApplicationListener)beanFactory.getBean(listenerBeanName, ApplicationListener.class); if (!allListeners.contains(listener) && this.supportsEvent(listener, eventType, sourceType)) { if (retriever != null) { if (beanFactory.isSingleton(listenerBeanName)) { retriever.applicationListeners.add(listener); } else { retriever.applicationListenerBeans.add(listenerBeanName); } } allListeners.add(listener); } } } catch (NoSuchBeanDefinitionException var13) { ; } } } AnnotationAwareOrderComparator.sort(allListeners); if (retriever != null && retriever.applicationListenerBeans.isEmpty()) { retriever.applicationListeners.clear(); retriever.applicationListeners.addAll(allListeners); } return allListeners; }
分析该方法,上面锁住的是 retrievalMutex对象,现在又是同步锁该对象。
为了保证LinkedHashSet中的值不会乱(monitor enter两次exit两次),去缓存中的每个查看每个监听器是否是对象的类型,检查了监听器的泛型对象和事件源类型。
6、根据监听列表,循环调用(同步或异步)
我们实现的 onApplicationEvent(OrderEvent orderEvent)方法
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) { ErrorHandler errorHandler = this.getErrorHandler(); if (errorHandler != null) { try { this.doInvokeListener(listener, event); } catch (Throwable var5) { errorHandler.handleError(var5); } } else { this.doInvokeListener(listener, event); } }
所以 ErrorHandler想在这里处理,则需要在该对象中创建该异常处理器(可以有很多中方式处理,利用bean的生命周期,这是一个很好的扩展点,后续可以去实现),继续 doInvokeListener方法
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); } catch (ClassCastException var6) { String msg = var6.getMessage(); if (msg != null && !this.matchesClassCastMessage(msg, event.getClass())) { throw var6; } Log logger = LogFactory.getLog(this.getClass()); if (logger.isTraceEnabled()) { logger.trace("Non-matching event type for listener: " + listener, var6); } } }
最后看见 listener.onApplicationEvent(event);
it is over!!!
总结
1、ApplicationContext发送事件是委托给了一个 Spring容器在refresh时初始化的SimpleApplicationEventMulticaster bean(由于没有初始化内部线程池对象,所以事件是同步发送的)。
2、发送前先获取事件的ResolvableType类型(当前为OrderEvent clazz)和事件源类型(当前为String)
3、获取监听者列表。 先去自己Bean内部先查询缓存,否则从BeanFactory中获取所有单利bean进行匹配(再放入缓存ConturrentHashMap)。
4、监听者列表循环(同步或异步)地调用我们自己写的监听方法OnApplicationEvent。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。