Android EventBus保姆级源码解析(二)发送事件post

柯栋
2023-12-01

前一篇文章我们分析了订阅事件的方法register,这篇我们继续淦发送事件的方法post

Android 轻量级线程间通信EventBus
Android EventBus保姆级源码解析(一)注册方法register
Android EventBus保姆级源码解析(二)发送事件post
Android EventBus保姆级源码解析(三)黏性事件原理

总结

按照惯例,先上总结(太长不看版):

  • 获取当前线程的Event事件list,把post的事件(Event)加入list,如果当前没有开始分发,则遍历Event事件出队调用postSingleEvent开始分发
  • postSingleEvent中检查是否允许订阅并分发Event的超类或者接口,然后再交给postSingleEventForEventType处理
  • postSingleEventForEventType通过subscriptionsByEventType列表(<订阅class,<订阅类,订阅方法>>的set),取到监听此Event事件的所有<订阅类,订阅方法>列表,遍历列表用postToSubscription方法执行处理订阅方法
  • postToSubscription判断订阅方法的线程模式,在对应的线程中通过反射调用订阅方法

EventBus.post

post方法的使用很简单,这篇就继续从发送事件的post方法剖析源码。

EventBus.getDefault().post(new MessageEvent())
    /** Posts the given event to the event bus. */
    public void post(Object event) {
     // 获取当前线程的PostingThreadState对象,该对象包含事件队列,保存在ThreadLocal中。
        PostingThreadState postingState = currentPostingThreadState.get();
        //获取当前线程的事件队列,将post的事件添加到队列中
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        // 判断事件是否在分发中,如果没有则遍历事件队列进行实际分发。
        if (!postingState.isPosting) {
            //判断当前线程是否是主线程
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //遍历队列,按顺序出队事件并进行分发
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }
  • 其中 PostingThreadState类保存了事件队列、post状态、当前线程等信息,使用ThreadLocal包装保证了线程一致性。

post方法中主要是对分发状态的控制,将当前Event事件入队,然后遍历事件队列出队并调用postSingleEvent方法对Event事件进行进一步的处理

EventBus.postSingleEvent

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        //这个flag上一篇说过,表示Event是否可以被继承,如果是就查找所有的父类和接口,
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            //查找所有的父类和接口
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                //查找并分发事件到该Event事件的所有订阅者,并且保存查找是否找到的flag
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            //查找并分发事件到该Event事件的所有订阅者
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        //如果没有查找订阅者,则事件由Event包装为NoSubscriberEvent重新发送
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

显然,EventBus.postSingleEvent检查处理了一下允许监听子类的情况,分发事件的具体逻辑又还在postSingleEventForEventType方法中,我们继续看源码

EventBus.postSingleEventForEventType

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            // 上一篇说过subscriptionsByEventType,它是<事件类型(EventClass),订阅(订阅者,订阅者方法)list>的键值对
            //获取事件类型(EventClass)对应的订阅(订阅者,订阅者方法)list
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        //如果存在订阅者&订阅者方法则继续处理,否则返回false表示没有找到
        if (subscriptions != null && !subscriptions.isEmpty()) {
            for (Subscription subscription : subscriptions) {
                //遍历到订阅者后设置postThreadState
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted = false;
                try {
                    //重点来了,正是这个方法真正去处理了Event事件,进行了订阅方法的回调
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

所以postSingleEventForEventType获取了到所有的监听此Event事件的所有<订阅类,订阅方法>列表,遍历列表用postToSubscription方法执行处理订阅方法

postToSubscription这个方法是不是很熟悉,正是我们在上一篇的结尾中没有讲到的黏性事件的额外处理,关于为什么黏性事件先去执行我们后面再分析,来继续跟进postToSubscription的代码:

EventBus.postToSubscription

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        //判断订阅方法的线程模式,这正是我们写订阅方法时在Subscribe注解中指定的线程模式ThreadMode
        switch (subscription.subscriberMethod.threadMode) {
            //当前线程
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            //主线程
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

由此我们可以看到真正的处理Event事件的真正操作,通过判断当前订阅方法的threadMode的值,按照订阅方法的线程值分为了四个主要部分处理,POSTING代表着当前线程,在哪个线程发送事件,就在哪个线程处理事件;MAIN代表只在主线程处理事件;BACKGROUND代表只在非主线程处理事件;ASYNC也是代表在非主线程处理事件。

POSTINNG当然是最简单的,不需要做额外的处理,当前线程直接用invokeSubscriber通过反射调用订阅方法:

    void invokeSubscriber(Subscription subscription, Object event) {
       try {
           //反射调用包装里的订阅方法
           subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
       } catch (InvocationTargetException e) {
           handleSubscriberException(subscription, event, e.getCause());
       } catch (IllegalAccessException e) {
           throw new IllegalStateException("Unexpected exception", e);
       }
   }

MAIN判断当前线程是不是主线程,如果是就直接调用订阅方法,不是就通过mainThreadPoster.enqueue来执行事件的处理,这个时候又出线了陌生的家伙mainThreadPoster,找一下它是怎么来的

// 在EventBus里面是这么定义的
private final HandlerPoster mainThreadPoster;
//在构造函数里实例化
EventBus(EventBusBuilder builder) {
       ...
       mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
       ...
   }

HandlerPoster

看看HandlerPoster这个类

final class HandlerPoster extends Handler {

   private final PendingPostQueue queue;
   private final int maxMillisInsideHandleMessage;
   private final EventBus eventBus;
   private boolean handlerActive;

   //构造时传入的参数是Looper.getMainLooper(),主线程的looper,又是继承自Handler,明显是主线程处理事物的handler
   HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
       super(looper);
       this.eventBus = eventBus;
       this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
       //这里又使用了一个PendingPostQueue的类
       queue = new PendingPostQueue();
   }
   
   //刚才调用的就是这个方法了,obtainPendingPost这个方法好像又对subscription&event做了一个包装
   void enqueue(Subscription subscription, Object event) {
       PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
       synchronized (this) {
           //queue入队?看起来像是一个处理事件的消息队列
           queue.enqueue(pendingPost);
           if (!handlerActive) {
               handlerActive = true;
               if (!sendMessage(obtainMessage())) {
                   throw new EventBusException("Could not send handler message");
               }
           }
       }
   }

}

看到HanlderPoster.enqueue方法,引入了一个PendingPost和它的队列PendingPostQueue,直觉告诉我应该是类似handler的messageQueue消息队列的东西,接着往里看

//平平无奇的节点类,包含了一个构造对象和清除对象的方法,pendingPostPool用来作为缓存池和锁对象
final class PendingPost {
   private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

   Object event;
   Subscription subscription;
   PendingPost next;

   private PendingPost(Object event, Subscription subscription) {
       this.event = event;
       this.subscription = subscription;
   }

   static PendingPost obtainPendingPost(Subscription subscription, Object event) {
       ...
   }

   static void releasePendingPost(PendingPost pendingPost) {
       ...
   }

}
//平平无奇的队列,包含了出队和入队的方法
final class PendingPostQueue {
   private PendingPost head;
   private PendingPost tail;

   synchronized void enqueue(PendingPost pendingPost) {
       ...
   }

   synchronized PendingPost poll() {
       ...
   }

}

HandlerPoster.enqueue

平平无奇的事件队列,继续回到HandlerPosterenqueue

    void enqueue(Subscription subscription, Object event) {
       PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
       synchronized (this) {
           //pendingPost入队
           queue.enqueue(pendingPost);
           //handlerActive 默认为false
           if (!handlerActive) {
               handlerActive = true;
               //在主线程调用HandlerMessage
               if (!sendMessage(obtainMessage())) {
                   throw new EventBusException("Could not send handler message");
               }
           }
       }
   }
   
   
   @Override
   public void handleMessage(Message msg) {
       boolean rescheduled = false;
       try {
           long started = SystemClock.uptimeMillis();
           while (true) {
               //死循环遍历事件队列
               PendingPost pendingPost = queue.poll();
               if (pendingPost == null) {
                   synchronized (this) {
                       // Check again, this time in synchronized
                       //当取到的时间为空时,同步再取一次避免线程不安全(有另一个线程此时入队事件就可以同时处理)的情况
                       pendingPost = queue.poll();
                       //事件仍为空退出循环
                       if (pendingPost == null) {
                           handlerActive = false;
                           return;
                       }
                   }
               }
               //执行订阅方法
               eventBus.invokeSubscriber(pendingPost);
               long timeInMethod = SystemClock.uptimeMillis() - started;
               //如果处理循环耗时超过maxMillisInsideHandleMessage,则会抛出异常
               if (timeInMethod >= maxMillisInsideHandleMessage) {
                   if (!sendMessage(obtainMessage())) {
                       throw new EventBusException("Could not send handler message");
                   }
                   rescheduled = true;
                   return;
               }
           }
       } finally {
           handlerActive = rescheduled;
       }
   }

所以这就通过这个HanlderPosterhandleMessage方法就可以在主线程遍历并且处理事件队列了,再看看处理这个时间队列节点的invokeSubscriber

    void invokeSubscriber(PendingPost pendingPost) {
       Object event = pendingPost.event;
       Subscription subscription = pendingPost.subscription;
       //清空pendingPost
       PendingPost.releasePendingPost(pendingPost);
       if (subscription.active) {
           //最后还是用invokeSubscriber反射调用了订阅方法
           invokeSubscriber(subscription, event);
       }
   }

所以 MAIN这个case的流程就走完了,BackgroundPosterASYNC后面的两个case和MAIN同理,BACKGROUND的逻辑是如果在主线程,那么会开启一条新的线程处理事件,如果不在主线程,那么直接处理事件;而ASYNC的逻辑则是不管你处于什么线程,我都新开一条线程处理事件。

    case BACKGROUND:
        if (isMainThread) {
            backgroundPoster.enqueue(subscription, event);
        } else {
            invokeSubscriber(subscription, event);
        }
        break;
    case ASYNC:
        asyncPoster.enqueue(subscription, event);
        break;

自此post的流程也讲完了,那黏性事件是怎么实现的呢,相信大家心里也有头绪了,下篇见~

 类似资料: