EventBus设计与实现分析——事件的发布
前面在 EventBus設(shè)計(jì)與實(shí)現(xiàn)分析——特性介紹中介紹了EventBus的基本用法,及其提供的大多數(shù)特性的用法;在EventBus設(shè)計(jì)與實(shí)現(xiàn)分析——訂閱者的注冊(cè) 中介紹了EventBus中訂閱者注冊(cè)的過程。這里就繼續(xù)分析EventBus的代碼,來了解其事件發(fā)布的過程。
事件的發(fā)布
如我們前面已經(jīng)了解到的,在EventBus中,有兩種不同類型得事件,一種是普通事件,事件被通知給訂閱者之后即被丟棄,另一種是Sticky事件,事件在被通知給訂閱者之后會(huì)被保存起來,下次有訂閱者注冊(cè)針對(duì)這種事件的訂閱時(shí),訂閱者會(huì)直接得到通知。
在EventBus中,會(huì)以兩個(gè)不同的方法來發(fā)布這兩種不同類型的事件,這兩個(gè)方法分別是post(Object event)和postSticky(Object event):
private final Map<Class<?>, Object> stickyEvents;private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {@Overrideprotected PostingThreadState initialValue() {return new PostingThreadState();}}; ....../** Posts the given event to the event bus. */public void post(Object event) {PostingThreadState postingState = currentPostingThreadState.get();List<Object> eventQueue = postingState.eventQueue;eventQueue.add(event);if (!postingState.isPosting) {postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();postingState.isPosting = true;if (postingState.canceled) {throw new EventBusException("Internal error. Abort state was not reset");}try {while (!eventQueue.isEmpty()) {postSingleEvent(eventQueue.remove(0), postingState);}} finally {postingState.isPosting = false;postingState.isMainThread = false;}}} ....../*** Posts the given event to the event bus and holds on to the event (because it is sticky). The most recent sticky* event of an event's type is kept in memory for future access by subscribers using {@link Subscribe#sticky()}.*/public void postSticky(Object event) {synchronized (stickyEvents) {stickyEvents.put(event.getClass(), event);}// Should be posted after it is putted, in case the subscriber wants to remove immediatelypost(event);} ....../** For ThreadLocal, much faster to set (and get multiple values). */final static class PostingThreadState {final List<Object> eventQueue = new ArrayList<Object>();boolean isPosting;boolean isMainThread;Subscription subscription;Object event;boolean canceled;}postSticky()僅是在保存了事件之后調(diào)用post()來發(fā)布事件而已。而在post()中,會(huì)借助于PostingThreadState來執(zhí)行事件發(fā)布的過程。PostingThreadState為發(fā)布的事件提供了排隊(duì)功能,同時(shí)它還描述一些發(fā)布的線程狀態(tài)。PostingThreadState還是發(fā)布過程跟外界交流的一個(gè)窗口,外部可通過EventBus類提供的一些方法來控制這個(gè)狀態(tài),進(jìn)而影響發(fā)布過程,比如取消發(fā)布等操作。PostingThreadState對(duì)象在ThreadLocal變量中保存,可見發(fā)布的事件的隊(duì)列是每個(gè)線程一個(gè)的。post()方法會(huì)逐個(gè)取出事件隊(duì)列中的每一個(gè)事件,調(diào)用postSingleEvent()方法來發(fā)布。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {Class<?> eventClass = event.getClass();boolean subscriptionFound = false;if (eventInheritance) {List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);int countTypes = eventTypes.size();for (int h = 0; h < countTypes; h++) {Class<?> clazz = eventTypes.get(h);subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);}} else {subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);}if (!subscriptionFound) {if (logNoSubscriberMessages) {Log.d(TAG, "No subscribers registered for event " + eventClass);}if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&eventClass != SubscriberExceptionEvent.class) {post(new NoSubscriberEvent(this, event));}}}private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {CopyOnWriteArrayList<Subscription> subscriptions;synchronized (this) {subscriptions = subscriptionsByEventType.get(eventClass);}if (subscriptions != null && !subscriptions.isEmpty()) {for (Subscription subscription : subscriptions) {postingState.event = event;postingState.subscription = subscription;boolean aborted = false;try {postToSubscription(subscription, event, postingState.isMainThread);aborted = postingState.canceled;} finally {postingState.event = null;postingState.subscription = null;postingState.canceled = false;}if (aborted) {break;}}return true;}return false;}....../** Looks up all Class objects including super classes and interfaces. Should also work for interfaces. */private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {synchronized (eventTypesCache) {List<Class<?>> eventTypes = eventTypesCache.get(eventClass);if (eventTypes == null) {eventTypes = new ArrayList<>();Class<?> clazz = eventClass;while (clazz != null) {eventTypes.add(clazz);addInterfaces(eventTypes, clazz.getInterfaces());clazz = clazz.getSuperclass();}eventTypesCache.put(eventClass, eventTypes);}return eventTypes;}}/** Recurses through super interfaces. */static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {for (Class<?> interfaceClass : interfaces) {if (!eventTypes.contains(interfaceClass)) {eventTypes.add(interfaceClass);addInterfaces(eventTypes, interfaceClass.getInterfaces());}}}postSingleEvent()要發(fā)布事件,首先需要找到訂閱者,我們前面在 訂閱者的注冊(cè) 中看到,訂閱者注冊(cè)時(shí)會(huì)在subscriptionsByEventType中保存事件類型和訂閱者的映射關(guān)系,那要找到訂閱者豈不是很容易?
其實(shí)不完全是。關(guān)鍵是對(duì)于事件類型的處理。要通知的事件類型的訂閱者不一定僅僅包含事件對(duì)象本身的類型的訂閱者,還可能要通知事件類型的父類或?qū)崿F(xiàn)的接口的類型的訂閱者。在eventInheritance被置為true時(shí),就需要通知事件類型的父類或?qū)崿F(xiàn)的接口的類型的訂閱者。lookupAllEventTypes()和addInterfaces()就用于查找所有這樣的類型。
postSingleEvent()會(huì)逐個(gè)事件類型的去通知相應(yīng)得訂閱者,這一任務(wù)由postSingleEventForEventType()來完成。而在postSingleEventForEventType()中則是根據(jù)subscriptionsByEventType找到所有的訂閱者方法,并通過postToSubscription方法來逐個(gè)的向這些訂閱者方法通知事件。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {switch (subscription.subscriberMethod.threadMode) {case POSTING:invokeSubscriber(subscription, event);break;case MAIN:if (isMainThread) {invokeSubscriber(subscription, event);} else {mainThreadPoster.enqueue(subscription, event);}break;case BACKGROUND:if (isMainThread) {backgroundPoster.enqueue(subscription, event);} else {invokeSubscriber(subscription, event);}break;case ASYNC:asyncPoster.enqueue(subscription, event);break;default:throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);}}....../*** Invokes the subscriber if the subscriptions is still active. Skipping subscriptions prevents race conditions* between {@link #unregister(Object)} and event delivery. Otherwise the event might be delivered after the* subscriber unregistered. This is particularly important for main thread delivery and registrations bound to the* live cycle of an Activity or Fragment.*/void invokeSubscriber(PendingPost pendingPost) {Object event = pendingPost.event;Subscription subscription = pendingPost.subscription;PendingPost.releasePendingPost(pendingPost);if (subscription.active) {invokeSubscriber(subscription, event);}}void invokeSubscriber(Subscription subscription, Object event) {try {subscription.subscriberMethod.method.invoke(subscription.subscriber, event);} catch (InvocationTargetException e) {handleSubscriberException(subscription, event, e.getCause());} catch (IllegalAccessException e) {throw new IllegalStateException("Unexpected exception", e);}}在postToSubscription()中事件的通知又分為同步的通知和異步的通知。同步的通知是直接調(diào)用invokeSubscriber(Subscription subscription, Object event)方法,這會(huì)將事件對(duì)象傳遞給訂閱者方法進(jìn)行調(diào)用。而異步的通知?jiǎng)t是將事件及訂閱者拋給某個(gè)poster就結(jié)束。
對(duì)于某個(gè)訂閱者的通知要采用同步通知還是異步通知?jiǎng)t需要根據(jù)訂閱者的ThreadMode及事件發(fā)布的線程來定。具體得規(guī)則為:
訂閱者的線程模式是POSTING --------------------------------> 同步通知
訂閱者的線程模式是MAIN + 事件發(fā)布線程是主線程 ---------------> 同步通知
訂閱者的線程模式是BACKGROUND + 事件發(fā)布線程不是主線程 ------> 同步通知
訂閱者的線程模式是BACKGROUND + 事件發(fā)布線程是主線程 --------> 異步通知
訂閱者的線程模式是MAIN + 事件發(fā)布線程不是主線程 --------------> 異步通知
訂閱者的線程模式是ASYNC ----------------------------------> 異步通知
同步通知和異步通知各三種。但三種異步通知本身又各不相同,它們分別由三種不同的Poster來處理,訂閱者的線程模式是BACKGROUND + 事件發(fā)布線程是主線程的異步通知由BackgroundPoster來處理,訂閱者的線程模式是MAIN + 事件發(fā)布線程不是主線程的異步通知由HandlerPoster來處理,而訂閱者的線程模式是ASYNC的異步通知由AsyncPoster來處理。
接著就來看一下這些Poster。首先是HandlerPoster:
package org.greenrobot.eventbus;import android.os.Handler; import android.os.Looper; import android.os.Message; import android.os.SystemClock;final class HandlerPoster extends Handler {private final PendingPostQueue queue;private final int maxMillisInsideHandleMessage;private final EventBus eventBus;private boolean handlerActive;HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {super(looper);this.eventBus = eventBus;this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;queue = new PendingPostQueue();}void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);synchronized (this) {queue.enqueue(pendingPost);if (!handlerActive) {handlerActive = true;if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");}}}}@Overridepublic void handleMessage(Message msg) {boolean rescheduled = false;try {long started = SystemClock.uptimeMillis();while (true) {PendingPost pendingPost = queue.poll();if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll();if (pendingPost == null) {handlerActive = false;return;}}}eventBus.invokeSubscriber(pendingPost);long timeInMethod = SystemClock.uptimeMillis() - started;if (timeInMethod >= maxMillisInsideHandleMessage) {if (!sendMessage(obtainMessage())) {throw new EventBusException("Could not send handler message");}rescheduled = true;return;}}} finally {handlerActive = rescheduled;}} }這是一個(gè)Handler。其內(nèi)部有一個(gè)PendingPostQueue queue,enqueue()操作即是用描述訂閱者方法的Subscription對(duì)象和事件對(duì)象構(gòu)造一個(gè)PendingPost對(duì)象,然后將這個(gè)PendingPost對(duì)象放入queue中,并在Handler沒有在處理事件分發(fā)時(shí)發(fā)送一個(gè)消息來喚醒對(duì)于事件分發(fā)的處理。
而在handleMessage()中,則是逐個(gè)從queue中取出PendingPost對(duì)象,并通過EventBus的invokeSubscriber(PendingPost pendingPost)來傳遞事件對(duì)象調(diào)用訂閱者方法。這里調(diào)用的invokeSubscriber()方法與前面那個(gè)同步版本略有差異,它會(huì)將Subscription對(duì)象和事件對(duì)象從PendingPost對(duì)象中提取出來,并調(diào)用同步版的方法,同時(shí)還會(huì)釋放PendingPost對(duì)象。
這里有一個(gè)蠻巧妙得設(shè)計(jì),就是那個(gè)maxMillisInsideHandleMessage,它用于限制一次事件發(fā)布所能消耗的最多的主線程時(shí)間。如果事件限制到了的時(shí)候訂閱者沒有通知完,則會(huì)發(fā)送一個(gè)消息,在下一輪中繼續(xù)處理。
這是一個(gè)典型的生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者是事件的發(fā)布者線程,而消費(fèi)者則是主線程。
PendingPost對(duì)象是通過一個(gè)鏈表來組織的。
package org.greenrobot.eventbus;final class PendingPostQueue {private PendingPost head;private PendingPost tail;synchronized void enqueue(PendingPost pendingPost) {if (pendingPost == null) {throw new NullPointerException("null cannot be enqueued");}if (tail != null) {tail.next = pendingPost;tail = pendingPost;} else if (head == null) {head = tail = pendingPost;} else {throw new IllegalStateException("Head present, but no tail");}notifyAll();}synchronized PendingPost poll() {PendingPost pendingPost = head;if (head != null) {head = head.next;if (head == null) {tail = null;}}return pendingPost;}synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {if (head == null) {wait(maxMillisToWait);}return poll();}}還有PendingPost:
package org.greenrobot.eventbus;import java.util.ArrayList; import java.util.List;final class PendingPost {private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();Object event;Subscription subscription;PendingPost next;private PendingPost(Object event, Subscription subscription) {this.event = event;this.subscription = subscription;}static PendingPost obtainPendingPost(Subscription subscription, Object event) {synchronized (pendingPostPool) {int size = pendingPostPool.size();if (size > 0) {PendingPost pendingPost = pendingPostPool.remove(size - 1);pendingPost.event = event;pendingPost.subscription = subscription;pendingPost.next = null;return pendingPost;}}return new PendingPost(event, subscription);}static void releasePendingPost(PendingPost pendingPost) {pendingPost.event = null;pendingPost.subscription = null;pendingPost.next = null;synchronized (pendingPostPool) {// Don't let the pool grow indefinitelyif (pendingPostPool.size() < 10000) {pendingPostPool.add(pendingPost);}}}}PendingPostQueue是一個(gè)線程安全的鏈表,其中鏈表的節(jié)點(diǎn)是PendingPost,它提供了最最基本的入隊(duì)和出隊(duì)操作而已。PendingPost再次用了對(duì)象池,它提供了獲取對(duì)象和釋放對(duì)象的方法。EventBus的作者真的還是蠻喜歡用對(duì)象池的嘛。
然后再來看BackgroundPoster:
package org.greenrobot.eventbus;import android.util.Log;/*** Posts events in background.* * @author Markus*/ final class BackgroundPoster implements Runnable {private final PendingPostQueue queue;private final EventBus eventBus;private volatile boolean executorRunning;BackgroundPoster(EventBus eventBus) {this.eventBus = eventBus;queue = new PendingPostQueue();}public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);synchronized (this) {queue.enqueue(pendingPost);if (!executorRunning) {executorRunning = true;eventBus.getExecutorService().execute(this);}}}@Overridepublic void run() {try {try {while (true) {PendingPost pendingPost = queue.poll(1000);if (pendingPost == null) {synchronized (this) {// Check again, this time in synchronizedpendingPost = queue.poll();if (pendingPost == null) {executorRunning = false;return;}}}eventBus.invokeSubscriber(pendingPost);}} catch (InterruptedException e) {Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);}} finally {executorRunning = false;}}}BackgroundPoster與HandlerPoster還是挺像的。兩者的差別在于BackgroundPoster是一個(gè)Runnable,它的enqueue()操作喚醒對(duì)于事件分發(fā)的處理的方法,是將對(duì)象本身放進(jìn)EventBus的ExecutorService中執(zhí)行來實(shí)現(xiàn)的;另外在處理事件分發(fā)的run()方法中,無需像HandlerPoster的handleMessage()方法那樣考慮時(shí)間限制,它會(huì)一次性的將隊(duì)列中所有的PendingPost處理完才結(jié)束。
對(duì)于某一個(gè)特定事件,一次性的將所有的PendingPost遞交給BackgroundPoster,因而大概率的它們會(huì)在同一個(gè)線程被通知。但如果訂閱者對(duì)事件的處理過快,在下一個(gè)PendingPost還沒來得及入隊(duì)時(shí)即執(zhí)行結(jié)束,則還是有可能在不同的線程中被通知。
最后再來看一下AsyncPoster:
class AsyncPoster implements Runnable {private final PendingPostQueue queue;private final EventBus eventBus;AsyncPoster(EventBus eventBus) {this.eventBus = eventBus;queue = new PendingPostQueue();}public void enqueue(Subscription subscription, Object event) {PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);queue.enqueue(pendingPost);eventBus.getExecutorService().execute(this);}@Overridepublic void run() {PendingPost pendingPost = queue.poll();if(pendingPost == null) {throw new IllegalStateException("No pending post available");}eventBus.invokeSubscriber(pendingPost);}}它會(huì)對(duì)每一個(gè)通知(訂閱者方法 + 訂閱者對(duì)象 + 事件對(duì)象)都起一個(gè)不同的task來進(jìn)行。
用一張圖來總結(jié)EventBus中事件通知的過程:
EventBus發(fā)布事件的過程大體如此。
總結(jié)
以上是生活随笔為你收集整理的EventBus设计与实现分析——事件的发布的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: EventBus设计与实现分析——特性介
- 下一篇: OkHttp3 HTTP请求执行流程分析