前一篇文章我们分析了订阅事件的方法register,这篇我们继续淦发送事件的方法post
Android 轻量级线程间通信EventBus
Android EventBus保姆级源码解析(一)注册方法register
Android EventBus保姆级源码解析(二)发送事件post
Android EventBus保姆级源码解析(三)黏性事件原理
按照惯例,先上总结(太长不看版):
postSingleEvent
开始分发postSingleEvent
中检查是否允许订阅并分发Event的超类或者接口,然后再交给postSingleEventForEventType
处理postSingleEventForEventType
通过subscriptionsByEventType
列表(<订阅class,<订阅类,订阅方法>>的set),取到监听此Event事件的所有<订阅类,订阅方法>列表,遍历列表用postToSubscription
方法执行处理订阅方法postToSubscription
判断订阅方法的线程模式,在对应的线程中通过反射调用订阅方法post方法的使用很简单,这篇就继续从发送事件的post方法剖析源码。
EventBus.getDefault().post(new MessageEvent())
/** Posts the given event to the event bus. */
public void post(Object event) {
// 获取当前线程的PostingThreadState对象,该对象包含事件队列,保存在ThreadLocal中。
PostingThreadState postingState = currentPostingThreadState.get();
//获取当前线程的事件队列,将post的事件添加到队列中
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
// 判断事件是否在分发中,如果没有则遍历事件队列进行实际分发。
if (!postingState.isPosting) {
//判断当前线程是否是主线程
postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//遍历队列,按顺序出队事件并进行分发
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post方法中主要是对分发状态的控制,将当前Event事件入队,然后遍历事件队列出队并调用postSingleEvent
方法对Event事件进行进一步的处理
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//这个flag上一篇说过,表示Event是否可以被继承,如果是就查找所有的父类和接口,
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
//查找所有的父类和接口
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
//查找并分发事件到该Event事件的所有订阅者,并且保存查找是否找到的flag
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
//查找并分发事件到该Event事件的所有订阅者
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
//如果没有查找订阅者,则事件由Event包装为NoSubscriberEvent重新发送
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
Log.d(TAG, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
显然,EventBus.postSingleEvent
检查处理了一下允许监听子类的情况,分发事件的具体逻辑又还在postSingleEventForEventType
方法中,我们继续看源码
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 上一篇说过subscriptionsByEventType,它是<事件类型(EventClass),订阅(订阅者,订阅者方法)list>的键值对
//获取事件类型(EventClass)对应的订阅(订阅者,订阅者方法)list
subscriptions = subscriptionsByEventType.get(eventClass);
}
//如果存在订阅者&订阅者方法则继续处理,否则返回false表示没有找到
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
//遍历到订阅者后设置postThreadState
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//重点来了,正是这个方法真正去处理了Event事件,进行了订阅方法的回调
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
所以postSingleEventForEventType
获取了到所有的监听此Event事件的所有<订阅类,订阅方法>列表,遍历列表用postToSubscription
方法执行处理订阅方法
postToSubscription
这个方法是不是很熟悉,正是我们在上一篇的结尾中没有讲到的黏性事件的额外处理,关于为什么黏性事件先去执行我们后面再分析,来继续跟进postToSubscription
的代码:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
//判断订阅方法的线程模式,这正是我们写订阅方法时在Subscribe注解中指定的线程模式ThreadMode
switch (subscription.subscriberMethod.threadMode) {
//当前线程
case POSTING:
invokeSubscriber(subscription, event);
break;
//主线程
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
由此我们可以看到真正的处理Event事件的真正操作,通过判断当前订阅方法的threadMode
的值,按照订阅方法的线程值分为了四个主要部分处理,POSTING
代表着当前线程,在哪个线程发送事件,就在哪个线程处理事件;MAIN
代表只在主线程处理事件;BACKGROUND
代表只在非主线程处理事件;ASYNC
也是代表在非主线程处理事件。
POSTINNG
当然是最简单的,不需要做额外的处理,当前线程直接用invokeSubscriber
通过反射调用订阅方法:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//反射调用包装里的订阅方法
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
MAIN
判断当前线程是不是主线程,如果是就直接调用订阅方法,不是就通过mainThreadPoster.enqueue
来执行事件的处理,这个时候又出线了陌生的家伙mainThreadPoster
,找一下它是怎么来的
// 在EventBus里面是这么定义的
private final HandlerPoster mainThreadPoster;
//在构造函数里实例化
EventBus(EventBusBuilder builder) {
...
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
...
}
看看HandlerPoster
这个类
final class HandlerPoster extends Handler {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
//构造时传入的参数是Looper.getMainLooper(),主线程的looper,又是继承自Handler,明显是主线程处理事物的handler
HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
//这里又使用了一个PendingPostQueue的类
queue = new PendingPostQueue();
}
//刚才调用的就是这个方法了,obtainPendingPost这个方法好像又对subscription&event做了一个包装
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//queue入队?看起来像是一个处理事件的消息队列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
}
看到HanlderPoster.enqueue
方法,引入了一个PendingPost
和它的队列PendingPostQueue
,直觉告诉我应该是类似handler的messageQueue消息队列的东西,接着往里看
//平平无奇的节点类,包含了一个构造对象和清除对象的方法,pendingPostPool用来作为缓存池和锁对象
final class PendingPost {
private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();
Object event;
Subscription subscription;
PendingPost next;
private PendingPost(Object event, Subscription subscription) {
this.event = event;
this.subscription = subscription;
}
static PendingPost obtainPendingPost(Subscription subscription, Object event) {
...
}
static void releasePendingPost(PendingPost pendingPost) {
...
}
}
//平平无奇的队列,包含了出队和入队的方法
final class PendingPostQueue {
private PendingPost head;
private PendingPost tail;
synchronized void enqueue(PendingPost pendingPost) {
...
}
synchronized PendingPost poll() {
...
}
}
平平无奇的事件队列,继续回到HandlerPoster
的enqueue
void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//pendingPost入队
queue.enqueue(pendingPost);
//handlerActive 默认为false
if (!handlerActive) {
handlerActive = true;
//在主线程调用HandlerMessage
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//死循环遍历事件队列
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
//当取到的时间为空时,同步再取一次避免线程不安全(有另一个线程此时入队事件就可以同时处理)的情况
pendingPost = queue.poll();
//事件仍为空退出循环
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//执行订阅方法
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
//如果处理循环耗时超过maxMillisInsideHandleMessage,则会抛出异常
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
所以这就通过这个HanlderPoster
的handleMessage
方法就可以在主线程遍历并且处理事件队列了,再看看处理这个时间队列节点的invokeSubscriber
void invokeSubscriber(PendingPost pendingPost) {
Object event = pendingPost.event;
Subscription subscription = pendingPost.subscription;
//清空pendingPost
PendingPost.releasePendingPost(pendingPost);
if (subscription.active) {
//最后还是用invokeSubscriber反射调用了订阅方法
invokeSubscriber(subscription, event);
}
}
所以 MAIN
这个case的流程就走完了,BackgroundPoster
、ASYNC
后面的两个case和MAIN
同理,BACKGROUND
的逻辑是如果在主线程,那么会开启一条新的线程处理事件,如果不在主线程,那么直接处理事件;而ASYNC
的逻辑则是不管你处于什么线程,我都新开一条线程处理事件。
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
自此post的流程也讲完了,那黏性事件是怎么实现的呢,相信大家心里也有头绪了,下篇见~