目 录CONTENT

文章目录

终于懂了系列之EventBus 源码解析(超详细)

小王同学
2024-07-24 / 0 评论 / 0 点赞 / 87 阅读 / 0 字

EventBus 源码解析

使用场景

跨组件通信:不同的Activity、Fragment、Service、ViewModel等之间的通信。
异步任务完成通知:例如,在后台线程完成数据加载后通知UI更新。
全局事件广播:例如,用户登录状态变化、网络连接状态变化等全局事件的广播。
动态模块通信:在模块化开发中,不同模块之间的通信。

EventBus的优点

  1. 松耦合通信:组件之间无需直接引用,通过事件进行通信,降低了代码耦合度,提高了代码的可维护性。
  2. 代码简洁:使用注解的方式订阅和发布事件,代码更加简洁,减少了回调和接口定义。
  3. 异步处理:支持异步事件处理,可以在后台线程处理任务,主线程更新UI,简化了线程间的通信。
  4. 易于扩展:新的组件可以方便地添加和移除事件订阅者,无需修改现有的代码。
  5. 灵活性:任何地方都可以发送和接收事件,增加了应用程序架构的灵活性。
  6. 便捷的线程模式:提供不同的线程模式(如MAIN, BACKGROUND, ASYNC等),方便开发者选择合适的线程来处理事件。

EventBus的缺点:

代码难以追踪:
• 事件的发送和接收是松散耦合的,难以通过代码直接追踪事件的流向,增加了调试和维护的难度。
潜在的性能开销:
• 在事件总线中注册大量事件和订阅者,可能带来一定的性能开销,尤其是在频繁发送大量事件的情况下。
可能导致内存泄漏:
• 如果没有正确地注销订阅者(如Activity或Fragment),可能会导致内存泄漏。
难以调试:
• 由于事件机制的松耦合特性,调试时很难快速定位问题的来源。
事件管理复杂:
• 在大型项目中,事件的种类和数量较多,可能导致事件管理变得复杂,需要特别注意事件的命名和管理。
不适合所有场景:
• 对于简单的组件间通信,使用EventBus可能显得过于复杂,传统的接口和回调方式可能更加合适。

简单使用

第一步:定义事件

public class MyEvent {
    private String message;

    public void MyEventMessage(String message) {
        this.message = message;
    }
}

这里事件被定义成一个普通类,在类的内部创建一个字段,同时创建一个方法。

第二步:准备订阅者


@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(MyEvent event) {
    String message = event.getMessage();
    ...
}

@Override
public void onStart() {
    super.onStart();
    EventBus.getDefault().register(this);
}
 
@Override
public void onStop() {
    EventBus.getDefault().unregister(this);
    super.onStop();
}

这里需要在创建需要订阅者是调用 EventBus.getDefault().register(this)方法进行注册,在适当的位置调用 EventBus.getDefault().unregister(this)方法进行解注册。同时创建一个接收事件的方法 onMessageEvent方法,使用注解 @Subscribe进行标示,同时生命事件接收的线程为主线程。

第三步:发送事件

EventBus.getDefault().post(new MyEvent("Hello,EventBus!!!"));

在需要发送事件的地方调用 EventBus.getDefault().post方法,将第一步创建的事件对象传入即可。
到这里 EventBus的使用方法就算结束了,看起来挺简单的,但是这里面有很多需要注意的地方,别着急,这些都会在代码探究中进行讲解。

知识点准备

在进行源码分析之前,我们要知道 EventBus中的三个知识点:角色分配、线程模型和事件类型。

角色分配

Event

事件,它可以是任意类型,EventBus会根据事件的类型进行全局发送。

Subscriber

事件订阅者,也可以称之为事件的接收者。在 EventBus3.0之前,使用时必须以 OnEvent开头的几种接收方法来接收事件。这些方法分别为:onEventonEventMainThreadonEventBackgroundThreadonEventAsync,但是在3.0之后处理方法可以自定义,但是要以注解 @subscribe进行标示,同时指定线程模型,线程模型默认为 POSTING的方式。

Publisher

事件的发布者,它可以在任意线程中发布事件。在使用时通常调用 EventBus.getDefault()方法获取一个 EventBus对象,再通过链式编程调用 post()方法进行事件发送。

线程模型

事件模型是指事件订阅者所在线程和发布事件者所在线程的关系。

POSTING

事件的订阅和事件的发布处于同一线程。这是默认设置。事件传递意味着最少的开销,因为它完全避免了线程切换。因此,对于已知在非常短的时间内完成而不需要主线程的简单任务,这是推荐的模式。使用此模式的事件处理程序必须快速返回,以避免阻塞可能是主线程的发布线程。

MAIN

Android上,用户将在 Android的主线程(UI线程)中被调用。如果发布线程是主线程,则将直接调用订阅方方法,从而阻塞发布线程。否则,事件将排队等待传递(非阻塞)。使用此模式的订阅服务器必须快速返回,以避免阻塞主线程。如果不在 Android上,则行为与 POSTING相同。

MAIN_ORDERED

Android上,用户将在 Android的主线程(UI线程)中被调用。与{@link#MAIN}不同,事件将始终排队等待传递。这确保了 post调用是非阻塞的。

BACKGROUND

Android上,用户将在后台线程中被调用。如果发布线程不是主线程,则将在发布线程中直接调用订阅方方法。如果发布线程是主线程,则 EventBus使用单个后台线程,该线程将按顺序传递其所有事件。使用此模式的订阅服务器应尝试快速返回,以避免阻塞后台线程。如果不在 Android上,则始终使用后台线程。

ASYNC

订阅服务器将在单独的线程中调用这始终独立于发布线程和主线程。发布事件从不等待使用此模式的订阅服务器方法。如果订户方法的执行可能需要一些时间(例如,用于网络访问),则应使用此模式。避免同时触发大量长时间运行的异步订阅服务器方法来限制并发线程的数量。EventBus使用线程池有效地重用来自已完成的异步订阅服务器通知的线程。

事件类型

普通事件

普通事件是指已有的事件订阅者能够收到事件发送者发送的事件,在事件发送之后注册的事件接收者将无法收到事件。发送普通件可以调用 EventBus.getDefault().post()方法进行发送。

粘性事件

粘性事件是指,不管是在事件发送之前注册的事件接收者还是在事件发送之后注册的事件接收者都能够收到事件。这里于普通事件的区别之处在于事件接收处需要定义事件接收类型,它可以通过 @Subscribe(threadMode = xxx, sticky = true)的方式进行声明;在事件发送时需要调用 EventBus.getDefault().postSticky()方法进行发送。事件类型默认为普通事件。

事件优先级

订阅者优先级以影响事件传递顺序。在同一传递线程 ThreadMode中,优先级较高的订阅者将在优先级较低的其他订阅者之前接收事件。默认优先级为 0。注意:优先级不影响具有不同 ThreadMode的订阅服务器之间的传递顺序!

EventBus源码解析

2.1 注册过程

我们先看 EventBus.register方法,在分析过程中遇到的字段再回过头来分析。
EventBus.java

public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}

上面的代码可以分为两个部分,2~3行完成了注册对象中 @Subscribe方法的解析,后面的代码真正完成了注册。
我们先看看 subscriberMethodFinder.findSubscriberMethods(subscriberClass)是如何完成 @Subscribe方法的解析的。subscriberMethodFinder在EventBus创建的时候就已经确定了,其构造方法如下:

SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
                       boolean ignoreGeneratedIndex) {
    this.subscriberInfoIndexes = subscriberInfoIndexes;
    this.strictMethodVerification = strictMethodVerification;
    this.ignoreGeneratedIndex = ignoreGeneratedIndex;
}

subscriberInfoIndexes里面只有注解解释器生成的 MyEventBusIndex对象,其他两个参数都是默认值false。下面我们深入 findSubscriberMethods方法:

private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
    if (subscriberMethods != null) {
        return subscriberMethods;
    }

    if (ignoreGeneratedIndex) {
        subscriberMethods = findUsingReflection(subscriberClass);
    } else {
        subscriberMethods = findUsingInfo(subscriberClass);
    }
    if (subscriberMethods.isEmpty()) {
        throw new EventBusException("Subscriber " + subscriberClass
                                    + " and its super classes have no public methods with the @Subscribe annotation");
    } else {
        METHOD_CACHE.put(subscriberClass, subscriberMethods);
        return subscriberMethods;
    }
}

findSubscriberMethods方法,是取缓存的操作。假设一下,由于一个页面的生命周期频繁的发生变化,导致频繁的注册、注销,这时候缓存就非常有用了。方法最后返回返回值前,会进行缓存的更新。这就是一个完成的缓存流程了。
第7行中,由于 ignoreGeneratedIndex默认为false,所以执行 findUsingInfo方法来获取订阅类的回调方法。顺便提一下, findUsingReflection方法顾名思义是使用反射来获取回调方法的, findUsingInfo方法在没有从注解处理器生成的Index文件中找到回调方法时,也会使用反射来获取回调方法。因此,findUsingInfo等于是 findUsingReflection方法的加强版本。两个方法如下:

private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }


private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findUsingReflectionInSingleClass(findState);
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
    FindState findState = prepareFindState();
    findState.initForSubscriber(subscriberClass);
    while (findState.clazz != null) {
        findState.subscriberInfo = getSubscriberInfo(findState);
        if (findState.subscriberInfo != null) {
            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
            for (SubscriberMethod subscriberMethod : array) {
                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                    findState.subscriberMethods.add(subscriberMethod);
                }
            }
        } else {
            findUsingReflectionInSingleClass(findState);
        }
        findState.moveToSuperclass();
    }
    return getMethodsAndRelease(findState);
}

我们顺着分析一下 findUsingInfo方法。通过 prepareFindState()方法尝试从对象池中获取一个 FindState对象,若对象池中没有可用的对象,则新建一个。

然后初始化 FindState对象,使其内部的 subscriberClassclazz都是订阅类。

image-mdgi.png

接着通过一个循环,遍历 subscriberClass 及其超类,查找所有符合条件的订阅方法:
• 首先尝试使用 getSubscriberInfo 从缓存或预先生成的信息中获取订阅方法信息。
• 如果找到了信息,则遍历所有订阅方法,并使用 checkAdd 方法检查是否需要添加到结果列表中。
• 如果没有找到信息,则使用反射查找当前类中的订阅方法。
• 最后将查找范围移到超类。

接着,看 getSubscriberInfo方法,在该方法中会将注解处理器生成的Index里面的回调方法返回:

private SubscriberInfo getSubscriberInfo(FindState findState) {
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
    SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
    if (findState.clazz == superclassInfo.getSubscriberClass()) {
        return superclassInfo;
    }
}
if (subscriberInfoIndexes != null) {
    for (SubscriberInfoIndex index : subscriberInfoIndexes) {
        SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
        if (info != null) {
            return info;
        }
    }
}
return null;
}

如果 findState 中已经存在当前类的 SubscriberInfo,并且它包含了超类的 SubscriberInfo,则检查当前类是否与超类的 SubscriberInfo 中的订阅者类相同。如果相同,则直接返回该超类的 SubscriberInfo。

否则就从索引中查找,如果 subscriberInfoIndexes(预先生成的索引列表)不为空,则遍历这些索引,尝试从中获取当前类的 SubscriberInfo。如果找到了匹配的信息,则返回该信息。

在通过 findState.checkAdd校验之后,两个示例方法都加入了 findState.subscriberMethods中,然后由 getMethodsAndRelease(findState)方法返回。

image-pgus.png

getMethodsAndRelease方法就是将 findState.subscriberMethods复制了出来,然后回收了 FindState对象。
上面就是注解处理器生成的Index参与的时候的流程。

在这里还是有必要说一下没有Index参与时的流程,也就是 findUsingReflectionInSingleClass方法是如何获取订阅类的回调方法的。
其实不用说我们也知道,肯定是反射,且从方法名上也可以看出来。该方法的源码以及解释如下:

private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
    // This is faster than getMethods, especially when subscribers are fat classes like Activities
    // 获取该类的所有方法,不包括父类
    methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
    // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
    // 获取该类以及父类的所有public方法,同时指定忽略父类
    methods = findState.clazz.getMethods();
    findState.skipSuperClasses = true;
}
for (Method method : methods) {
    int modifiers = method.getModifiers();
    // 判断方法是否是PUBLIC且不是ABSTRACT、STATIC、SYNTHETIC
    if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
        Class<?>[] parameterTypes = method.getParameterTypes();
        // 判断该方法的参数是不是只有一个
        if (parameterTypes.length == 1) {
            // 判断方法是否有Subscribe注解
            Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
            if (subscribeAnnotation != null) {
                // 方法是PUBLIC且不是ABSTRACT、STATIC、SYNTHETIC
                // 同时方法参数只有一个,且带有Subscribe注解
                // 以上条件都满足后,就进行最后的校验,然后添加到findState.subscriberMethods中
                Class<?> eventType = parameterTypes[0];
                if (findState.checkAdd(method, eventType)) {
                    ThreadMode threadMode = subscribeAnnotation.threadMode();
                    findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                                                         subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                }
            }
        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
            // 如果开启了严格验证(默认不开启),且方法有Subscribe注解,则抛出异常
            String methodName = method.getDeclaringClass().getName() + "." + method.getName();
            throw new EventBusException("@Subscribe method " + methodName +
                                        "must have exactly 1 parameter but has " + parameterTypes.length);
        }
    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
        // 如果开启了严格验证(默认不开启),且方法有Subscribe注解,则抛出异常
        String methodName = method.getDeclaringClass().getName() + "." + method.getName();
        throw new EventBusException(methodName +
                                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
    }
}
}

从上面方法的分析我们可以看出,订阅类的方法要满足以下条件,才能够顺利的进行注册:

  1. 方法必须是 public的,且不能是 abstractstaticsynthetic
  2. 方法参数必须只有一个
  3. 方法必须带有 @Subscribe注解

上面这些内容就是订阅类回调方法的获取过程了,下面说说回调方法是如何进行注册的。方法为 EventBus.subscribe
EventBus.java

// Must be called in synchronized block
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    // eventType为回调方法的入参类型,Object类型
    Class<?> eventType = subscriberMethod.eventType;
    // 将订阅者以及订阅方法封装为一个Subscription类
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    // subscriptionsByEventType:以订阅方法的参数为key,Subscription为value进行保存
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                                        + eventType);
        }
    }

    // 在订阅参数对应的Subscription列表中按照订阅方法的优先级进行排序
    // priority的数值越高,越在列表的前面;相同priority的方法,后来的方法会在后面
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }

    // typesBySubscriber:以订阅者为key,订阅方法参数为value进行保存
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    // 最后,如果订阅方法为粘性的,则会在注册时接收到粘性事件
    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}

在上面的方法中,会先将订阅者以及订阅事件包装成为一个 Subscription对象,然后与其他参数一起保存起来,具体为下面两个保存的地方:

  • subscriptionsByEventType 以订阅事件参数为key,对应 Subscription对象数组
  • typesBySubscriber 以订阅者对象为key,对应订阅事件参数数组

另外,Subscription对象还会根据priority进行排序,具体来说:priority的数值越高,越在列表的前面;相同priority的方法,后来的方法会在后面。
同时我们还可以看出粘性事件的触发机制:

  1. 粘性事件发出时,会主动通知所有可以处理的方法,不管方法是否是粘性的
public void postSticky(Object event) {
synchronized (stickyEvents) {
    stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}
  1. 在订阅者进行注册时,如果有可以响应的粘性事件,粘性方法会被触发
if (subscriberMethod.sticky) {
    if (eventInheritance) {
        ...
        for (...) {
            ...
            if (...) {
                Object stickyEvent = entry.getValue();
                checkPostStickyEventToSubscription(newSubscription, stickyEvent);
            }
        }
    } else {
        Object stickyEvent = stickyEvents.get(eventType);
        checkPostStickyEventToSubscription(newSubscription, stickyEvent);
    }
}

2.2 发送事件

我们可以调用 EventBus.post以及 EventBus.postSticky这两个方法来发送事件,前者是普通事件,后者是粘性事件。
粘性事件的发送比较简单,该方法除了保存了事件之外,还调用了 post方法当做非粘性事件进行了分发。此时,会主动通知所有可以处理的方法,不管方法是否是粘性的:
EventBus.java

public void postSticky(Object event) {
synchronized (stickyEvents) {
    stickyEvents.put(event.getClass(), event);
}
// Should be posted after it is putted, in case the subscriber wants to remove immediately
post(event);
}

另外,在订阅者进行注册时,如果有可以响应的粘性事件,粘性方法会被触发
粘性事件不会被消耗掉,除非手动remove掉

  • removeStickyEvent(Class<T>)
  • removeStickyEvent(Object)
  • removeAllStickyEvents()

更多可以参考Sticky Events
接下来,我们看看 EventBus.post是如何触发订阅者的回调事件的。
EventBus.java

/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>();
    boolean isPosting;
    boolean isMainThread;
    Subscription subscription;
    Object event;
    boolean canceled;
}

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

/** Posts the given event to the event bus. */
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);

// 检查当前状态是否已经在发布事件
if (!postingState.isPosting) {
    // 设置当前线程是否为主线程
    postingState.isMainThread = isMainThread();
    // 标记为正在发布事件
    postingState.isPosting = true;
    // 检查发布状态是否已经被取消,如果是,则抛出异常
    if (postingState.canceled) {
        throw new EventBusException("Internal error. Abort state was not reset");
    }
    try {
        // 当事件队列不为空时,持续发布事件
        while (!eventQueue.isEmpty()) {
            // 发布单个事件,并从事件队列中移除它
            postSingleEvent(eventQueue.remove(0), postingState);
        }
    } finally {
        // 无论是否发生异常,都将发布状态重置为未发布状态
        postingState.isPosting = false;
        // 重置主线程状态
        postingState.isMainThread = false;
    }
}

EventBus.post的相关代码如上所示,post的事件会加入到一个事件队列里面,然后开始执行。当在极短时间内多次调用 post方法时,只会将事件添加到队列里面,会取时间队列的头进行事件分发。最后所有事件处理完成后,标志位复位。
下面我们看看 postSingleEvent的方法,注释都在里面:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    if (eventInheritance) {
        // lookupAllEventTypes方法的作用是查询class的父类以及接口,显然示例中就是一个Object
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            // 调用postSingleEventForEventType方法
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 直接以当前的eventClass调用postSingleEventForEventType方法
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    // 最后,当没有订阅者可以处理该事件时,EventBus会抛出一个NoSubscriberEvent事件
    if (!subscriptionFound) {
        if (logNoSubscriberMessages) {
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
            eventClass != SubscriberExceptionEvent.class) {
            post(new NoSubscriberEvent(this, event));
        }
    }
}

接着看看 postSingleEventForEventType方法

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    // subscriptions就是示例中对应的两个方法
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                // 调用postToSubscription进行真正的分发
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}

在上面的方法中,最终调用了 postToSubscription方法完成了最终的事件分发。且这里需要注意一个系列,只要eventClass可以找到对应的Subscription,那么该方法就会返回true,也就是说已经发送给订阅者了。
最后看看 postToSubscription方法,该方法会根据方法的threaMode值,决定在哪如何触发回调方法:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            if (mainThreadPoster != null) {
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // temporary: technically not correct as poster not decoupled from subscriber
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

这里面5种ThreadMode,采取的手段也不一样。从该枚举值的定义可以看出,分为5种情况,下表就是每种情况的含义:
ThreadMode的含义
无论在哪个线程中执行,最后都会调用 invokeSubscriber方法来触发回调任务:

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

这里通过反射调用了订阅者的回调方法。至此,事件的发送已经分析完毕。

2.3 注销过程

注销过程原理比较简单,就是将注册时保存到 subscriptionsByEventTypetypesBySubscriber两个集合中的元素删除。
这两个集合在注册过程中分析过: - subscriptionsByEventType 以订阅事件参数为key,对应Subscription对象数组 - typesBySubscriber 以订阅者对象为key,对应订阅事件参数数组
EventBus.unregister代码如下:

/** Unregisters the given subscriber from all event classes. */
public synchronized void unregister(Object subscriber) {
    // 先通过subscriber订阅者对象从`typesBySubscriber`中获取订阅事件参数数组
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        // 以订阅事件参数为key,从`subscriptionsByEventType`中获取到对应的`Subscrition`对象
        for (Class<?> eventType : subscribedTypes) {
            unsubscribeByEventType(subscriber, eventType);
        }
        typesBySubscriber.remove(subscriber);
    } else {
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

/** Only updates subscriptionsByEventType, not typesBySubscriber! Caller must update typesBySubscriber. */
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

显然,先通过subscriber订阅者对象从 typesBySubscriber中获取订阅事件参数数组;然后以订阅事件参数为key,从 subscriptionsByEventType中获取到对应的 Subscrition对象。
以上就是EventBus的注销过程了。

总结

register 注册事件

register方法用于注册一个订阅者(subscriber),使其能够接收事件。注册过程包括以下步骤:

  1. 检查是否已注册:EventBus首先检查传入的订阅者是否已经注册过(从缓存中读取),如果已经注册则直接返回。
  2. 获取订阅方法:通过反射或预生成的订阅信息来获取订阅者类中的所有订阅方法。
    EventBus通过扫描订阅者类及其超类,找到所有使用@Subscribe注解的方法。
  3. 缓存订阅方法:将获取的订阅方法缓存起来,以提高下次注册时的速度。
  4. 添加到订阅者Map中:将订阅者和其对应的订阅方法添加到内部的订阅者Map(subscriptionsByEventType)中,以便在事件发布时快速找到相应的订阅方法。

post 发布事件

post方法用于发布事件,流程如下:

  1. 添加事件到队列:将事件添加到内部的事件队列(eventQueue)中,以确保事件的顺序性。
  2. 处理事件队列:通过postSingleEvent方法逐个处理队列中的事件。
  3. 查找订阅方法:根据事件的类型在内部的订阅者Map中查找所有订阅该事件的订阅方法。
  4. 事件分发:将事件分发给所有找到的订阅方法。EventBus支持在不同的线程模式(如主线程、后台线程、异步线程)下分发事件。

postSticky 粘性事件原理

粘性事件的实现原理主要依赖于一个内部的stickyEvents Map,用来存储所有发布过的粘性事件。
在发布粘性事件时,事件不仅会被存储,还会像普通事件一样发布出去。
新的订阅者在注册时,EventBus会检查是否有对应的粘性事件存在,如果存在,就立即将事件分发给该订阅者。
这样,粘性事件机制确保了事件发布后,新的订阅者仍能接收到该事件。

0

评论区