Java EventBus手把手带你实现
作者:鲲鹏飞九万里
一、说明
在Guava中,EventBus简化了观察者模式的实现。理解EventBus的原理来,自动动手实现一个简单的EventBus。
二、Guava的EventBus
EventBus叫做“时间总线”,它提供了实现观察者模式的骨架代码。可以基于此框架,非常容易地在自己的业务场景中实现观察者模式。它不仅支持异步非阻塞模式,同时支持同步阻塞模式。
基于EventBus,不需要定义Observer接口(观察者接口),任意类型的对象都可以注册到EventBus中。通过@Subscribe
注解来表明类中哪个函数可以接收观察者发送的消息。
Guava EventBus中的几个主要的类和函数:
EventBus、SyncEventBus
EventBus类中封装了对外暴露的所有可调用接口。其中EventBus实现了同步阻塞的观察者模式,SyncEventBus继承EventBus提供了异步非阻塞的观察者模式。
// 同步阻塞的方式 EventBus eventBus = new EventBus(); // 异步非阻塞的方式 final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20; // 异步非阻塞线程池大小 ExecutorService executorService = Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE); EventBus eventBus = new AsyncEventBus(executorService);
register()
函数
EventBus通过register()
函数来注册观察者。它可以接收任意类型(Object)的观察者。具体的函数定义如下:
public void register(Object object) { //...... }
unregister()
函数
相对于register()
,unregister()
函数是从EventBus中删除某个观察者。
public void unregister(Object object) { //...... }
post
函数
EventBus提供post()
函数 ,用来给观察者发消息。
public void post(Object event) { //...... }
post发送消息的时候,并不是把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post函数中定义的父类)。
比如,AObserver能接收的消息类型是XMsg,BObserver能接收的消息类型是YMsg,CObserver能接收的消息类型是ZMsg。其中,XMsg是YMsg的父类。
XMsg xMsg= new XMsg(); YMsg yMsg= new YMsg(); ZMsg zMsg= new ZMsg(); post(xMsg);// AObserver 接收消息 post(yMsg);// AObserver和BObserver接收到消息 post(zMsg);// CObserver接收到消息
Observer(观察者)能接收到消息类型是通过@Subscribe
注解定义的。
@Subscribe
注解
EventBus通过@Subscribe
注解类标明,某个函数能接收哪种类型的消息。(类型不能是基本类型)
三、EventBus的原理
四、动手实现一个EventBus
@Beat
标注一个公共的API(公共的类、方法或字段) 在未来的发行版本中会发生不兼容的变化。带有此注释的 API 不受其包含库所做的任何兼容性保证。请注意,此注释的存在并不意味着所讨论 API 的质量或性能,只是它不是“API 冻结”的事实。
应用程序依赖 beta API 通常是安全的,但需要在升级期间进行一些额外的工作。然而,不建议在类库(包含在用户的CLASSPATH中,不受开发人员的控制)上这么做。
4.1 定义Subscribe注解
定义Subscribe注解,用于标明哪个函数可以接收消息。
/** * 定义一个注解,表明观察者中的哪个函数可以接收消息 */ @Retention(RetentionPolicy.RUNTIME) // 注解的声明周期 @Target(ElementType.METHOD) // 注解作用的地方 @Beta // 标注API在未来发行的版本是可能有不兼容的变化 public @interface MySubscribe { }
4.2 ObserverAction
用来表示@MySubscribe
注解的方法。
/** * 用来表示 @MySubscribe 注解方法 */ public class MyObserverAction { private Object target; private Method method; public MyObserverAction(Object target, Method method) { this.target = checkNotNull(target); this.method = method; this.method.setAccessible(true); } /** * event是method方法的参数 * @param event */ public void execute(Object event) { try { method.invoke(target, event); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } }
4.3 ObserverRegister
Observer 注册表。
/** * Observer 注册表 */ public class MyObserverRegister { // 注册表, 消息类型: 观察者方法 private ConcurrentMap<Class<?>, CopyOnWriteArraySet<MyObserverAction>> registry = new ConcurrentHashMap<>(); /** * 将观察者注册到 注册表中 * @param observer 观察者 */ public void register(Object observer) { Map<Class<?>, Collection<MyObserverAction>> observerActions = findAllObserverActions(observer); for (Map.Entry<Class<?>, Collection<MyObserverAction>> entry : observerActions.entrySet()) { Class<?> eventType = entry.getKey(); Collection<MyObserverAction> evenActions = entry.getValue(); CopyOnWriteArraySet<MyObserverAction> registryEvenActions = registry.getOrDefault(eventType, new CopyOnWriteArraySet<>()); registryEvenActions.addAll(evenActions); registry.put(eventType, registryEvenActions); } } /** * 获取匹配的观察者事件 * @param event * @return */ public List<MyObserverAction> getMatchedMyObserverActions(Object event) { List<MyObserverAction> result = new ArrayList<>(); Class<?> postedEventClass = event.getClass(); for (Map.Entry<Class<?>, CopyOnWriteArraySet<MyObserverAction>> entry : registry.entrySet()) { Class<?> eventClass = entry.getKey(); // 匹配相同类型或父类型 if (postedEventClass.isAssignableFrom(eventClass)) { result.addAll(entry.getValue()); } } return result; } // 消息类型(观察者类型类型及其父类型) 观察者方法 public Map<Class<?>, Collection<MyObserverAction>> findAllObserverActions(Object observer) { Map<Class<?>, Collection<MyObserverAction>> result = new HashMap<>(); // 观察者类型 Class<?> observerClass = observer.getClass(); for (Method method : getAnnotatedMethods(observerClass)) { Class<?>[] parameterTypes = method.getParameterTypes(); Class<?> eventType = parameterTypes[0]; result.putIfAbsent(eventType, new ArrayList<>()); result.get(eventType).add(new MyObserverAction(observer, method)); } return result; } /** * 根据观察者类型,查找方法列表 * @param clazz * @return */ public List<Method> getAnnotatedMethods(Class<?> clazz) { List<Method> result = new ArrayList<>(); for (Method method : clazz.getDeclaredMethods()) { if (method.isAnnotationPresent(MySubscribe.class)) { Class<?>[] parameterTypes = method.getParameterTypes(); checkArgument(parameterTypes.length==1, "方法%s 有一个注解@MySubscribe ,它有%s个参数,实际要求有且只有一个参数", method, parameterTypes.length); result.add(method); } } return result; } }
4.4 EventBus
/** * 实现 同步阻塞的 EventBus */ public class MyEventBus { private Executor executor; private MyObserverRegister register = new MyObserverRegister(); public MyEventBus() { // MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。 // 之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用 this(MoreExecutors.directExecutor()); } // 注意这里的修饰符 protected MyEventBus(Executor executor) { this.executor = executor; } public void register(Object observer) { register.register(observer); } public void post(Object event) { List<MyObserverAction> observerActions = register.getMatchedMyObserverActions(event); for (MyObserverAction observerAction : observerActions) { executor.execute(new Runnable() { @Override public void run() { observerAction.execute(event); } }); } } }
4.5 SyncEventBus
/** * 异步非阻塞的EventBus */ public class MySyncEventBus extends MyEventBus { public MySyncEventBus(Executor executor) { super(executor); } }
五、使用自定义的EventBus
public static void main(String[] args) { // 自定义的EventBus MyEventBus myEventBus = new MyEventBus(); // 注册一个观察者 myEventBus.register(new CurrentConditionsDisplayListener()); // 向观察者发送消息 myEventBus.post(23.0f); }
六、扩展
到此这篇关于Java EventBus手把手带你实现的文章就介绍到这了,更多相关Java EventBus内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!