android的消息队列机制
android下的線程,Looper線程,MessageQueue,Handler,Message等之間的關(guān)系,以及Message的send/post及Message dispatch的過程。
Looper線程
我們知道,線程是進程中某個單一順序的控制流,它是內(nèi)核做CPU調(diào)度的單位。那何為Looper線程呢?所謂Looper線程,即是借助于Looper和MessageQueue來管理控制流的一類線程。在android系統(tǒng)中,app的主線程即是借助于Looper和MessageQueue來管理控制流的,因而主線就是一個特殊的Looper線程。其實,不僅僅只有主線程可以用Looper和MessageQueue來管理控制流,其它的線程也一樣可以。我們可以先看一下android源代碼(Looper類,位置為frameworks/base/core/java/android/os/Looper.java)的注釋中給出的一種Looper線程的實現(xiàn)方式:
package?com.example.messagequeuedemo;import?android.os.Handler; import?android.os.Looper; import?android.util.Log;public?class?LooperThread?extends?Thread?{public?static?final?String?TAG?=?MainActivity.TAG;private?static?final?String?CompTAG?=?"LooperThread";public?Handler?mHandler;@Overridepublic?void?run()?{Log.d(TAG,?CompTAG?+?":?LooperThread=>run");Looper.prepare();mHandler?=?new?Handler()?{public?void?handleMessage(android.os.Message?msg)?{Log.d(TAG,?CompTAG?+?":?LooperThread=>Handler=>handleMessage");//?process?incoming?message?here}};Looper.loop();} }
可以看到,就是在線程的run()方法中,調(diào)用Looper.prepare()做一些初始化,然后創(chuàng)建一個Handler對象,最后執(zhí)行Looper.loop()啟動整個的事件循環(huán)。就是這么簡單的幾行代碼,一個可以使用消息隊列來管理線程執(zhí)行流程的Looper 線程就創(chuàng)建好了。
那么上面的每一行代碼,具體又都做了些什么呢?接著我們就先來看下,神秘的Looper.prepare()到底都干了些什么:
?????//?sThreadLocal.get()?will?return?null?unless?you've?called?prepare().static?final?ThreadLocal<Looper>?sThreadLocal?=?new?ThreadLocal<Looper>();/**?Initialize?the?current?thread?as?a?looper.*?This?gives?you?a?chance?to?create?handlers?that?then?reference*?this?looper,?before?actually?starting?the?loop.?Be?sure?to?call*?{@link?#loop()}?after?calling?this?method,?and?end?it?by?calling*?{@link?#quit()}.*/public?static?void?prepare()?{prepare(true);}private?static?void?prepare(boolean?quitAllowed)?{if?(sThreadLocal.get()?!=?null)?{throw?new?RuntimeException("Only?one?Looper?may?be?created?per?thread");}sThreadLocal.set(new?Looper(quitAllowed));}private?Looper(boolean?quitAllowed)?{mQueue?=?new?MessageQueue(quitAllowed);mRun?=?true;mThread?=?Thread.currentThread();}
由這段代碼可知,Looper.prepare()是一個靜態(tài)方法,它做的事情就是簡單地為當(dāng)前的線程創(chuàng)建一個Looper對象,并存儲在一個靜態(tài)的線程局部存儲變量中。在Looper的構(gòu)造函數(shù)中又創(chuàng)建了一個MessageQueue對象。同時Looper會引用到當(dāng)前的線程,并將一個表示執(zhí)行狀態(tài)的變量mRun設(shè)置為true。對于此處的線程局部存儲變量sThreadLocal,可以簡單地理解為一個HashMap,該HashMap中存放的數(shù)據(jù)其類型為Looper,key則為Thread,而每個線程又都只能獲得特定于這個線程的key,從而訪問到專屬于這個線程的數(shù)據(jù)。
啟動Looper線程就和啟動普通的線程一樣,比如:
public?class?MainActivity?extends?Activity?{public?static?final?String?TAG?=?"MessageQueueDemo";private?static?final?String?CompTAG?=?"MainActivity";private?LooperThread?mLooperThread;@Overrideprotected?void?onCreate(Bundle?savedInstanceState)?{Log.d(TAG,?CompTAG?+?":?MainActivity=>onCreate");super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);mLooperThread?=?new?LooperThread();mLooperThread.start();}
同樣是new一個對象,然后調(diào)用該對象的start()方法。
Android SDK本身也提供了一個class來實現(xiàn)LooperThread那樣的機制,以方便在app中創(chuàng)建一個執(zhí)行事件處理循環(huán)的后臺線程,但提供的功能要完善得多,這個class就是HandlerThread。我們可以看一下這個class的定義:
它不僅有事件處理循環(huán),還給app提供了靈活地設(shè)置HandlerThread的線程優(yōu)先級的方法;事件循環(huán)實際啟動前的回調(diào)(onLooperPrepared());獲取HandlerThread的Looper的方法(這個方法對于Handler比較重要);以及多種停掉HandlerThread的方法——quit()和quitSafely()——以避免資源的leak。
Looper線程有兩種,一種是我們上面看到的那種由app自己創(chuàng)建,并在后臺運行的類型;另外一種則是android Java應(yīng)用的主線程。創(chuàng)建前者的Looper對象需要使用Looper.prepare()方法,而創(chuàng)建后者的,則需使用Looper.prepareMainLooper()方法。我們可以看一下Looper.prepareMainLooper()的實現(xiàn):
????/***?Initialize?the?current?thread?as?a?looper,?marking?it?as?an*?application's?main?looper.?The?main?looper?for?your?application*?is?created?by?the?Android?environment,?so?you?should?never?need*?to?call?this?function?yourself.??See?also:?{@link?#prepare()}*/public?static?void?prepareMainLooper()?{prepare(false);synchronized?(Looper.class)?{if?(sMainLooper?!=?null)?{throw?new?IllegalStateException("The?main?Looper?has?already?been?prepared.");}sMainLooper?=?myLooper();}}
比較特別的地方即在于,此處調(diào)用prepare()方法傳進去的quitAllowed參數(shù)為false,即表示這個Looper不能夠被quit掉。其他倒是基本一樣。整個android系統(tǒng)中,調(diào)用到prepareMainLooper()方法的地方有兩個:
/frameworks/base/services/java/com/android/server/ H?A?D SystemServer.java 94?Looper.prepareMainLooper(); /frameworks/base/core/java/android/app/ H?A?D ActivityThread.java 5087?Looper.prepareMainLooper();
一處在SystemServer——system_server的主線程——的run()方法中,用于為system_server主線程初始化消息處理隊列;另外一處在ActivityThread的run()方法中,自然即是創(chuàng)建android app主線程的消息處理隊列了。
通過消息與Looper線程交互
那Looper線程的特別之處究竟在哪里呢?如前所述,這種線程有一個Looper與之關(guān)聯(lián),會使用消息隊列,或者稱為事件循環(huán)來管理執(zhí)行的流程。那這種特別之處又如何體現(xiàn)呢?答案即是其它線程可以向此類線程中丟消息進來(當(dāng)然此類線程本身也可以往自己的消息隊列里面丟消息),然后在事件處理循環(huán)中,這些事件會得到處理。那究竟要如何往Looper線程的消息隊列中發(fā)送消息呢?
回憶前面我們創(chuàng)建Looper線程的那段代碼,不是有創(chuàng)建一個Handler出來嘛。沒錯,就是通過Handler來向Looper線程的MessageQueue中發(fā)送消息的。可以看一下使用Handler向Looper線程發(fā)送消息的方法。LooperThread的代碼與上面的一樣,向Looper線程發(fā)送消息的部分的寫法:
package?com.intel.helloworld;import?android.os.Bundle; import?android.os.Handler; import?android.os.Message; import?android.app.Activity; import?android.util.Log; import?android.view.Menu;public?class?MainActivity?extends?Activity?{public?static?final?String?TAG?=?"LifecycleDemoApp";private?static?final?String?CompTAG?=?"MainActivity";public?static?final?int?MESSAGE_WHAT_CREATE?=?1;public?static?final?int?MESSAGE_WHAT_START?=?2;public?static?final?int?MESSAGE_WHAT_RESUME?=?3;public?static?final?int?MESSAGE_WHAT_PAUSE?=?4;public?static?final?int?MESSAGE_WHAT_STOP?=?5;public?static?final?int?MESSAGE_WHAT_DESTROY?=?6;LooperThread?mThread;@Overrideprotected?void?onCreate(Bundle?savedInstanceState)?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onCreate");super.onCreate(savedInstanceState);setContentView(R.layout.activity_main);mThread?=?new?LooperThread();mThread.start();}@Overrideprotected?void?onStart()?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onStart");super.onStart();Handler?handler?=?mThread.mHandler;Message?msg?=?Message.obtain();msg.what?=?MESSAGE_WHAT_START;handler.sendMessage(msg);}@Overrideprotected?void?onResume()?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onResume");super.onResume();Handler?handler?=?mThread.mHandler;Message?msg?=?Message.obtain();msg.what?=?MESSAGE_WHAT_RESUME;handler.sendMessage(msg);}@Overrideprotected?void?onPause()?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onPause");super.onPause();Handler?handler?=?mThread.mHandler;Message?msg?=?Message.obtain();msg.what?=?MESSAGE_WHAT_PAUSE;handler.sendMessage(msg);}@Overrideprotected?void?onStop()?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onStop");super.onStop();Handler?handler?=?mThread.mHandler;Message?msg?=?Message.obtain();msg.what?=?MESSAGE_WHAT_STOP;handler.sendMessage(msg);}@Overrideprotected?void?onDestroy()?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onDestroy");super.onDestroy();Handler?handler?=?mThread.mHandler;Message?msg?=?Message.obtain();msg.what?=?MESSAGE_WHAT_DESTROY;handler.sendMessage(msg);}@Override?public?boolean?onCreateOptionsMenu(Menu?menu)?{//?Inflate?the?menu;?this?adds?items?to?the?action?bar?if?it?is?present.getMenuInflater().inflate(R.menu.main,?menu);return?true;}@Overrideprotected?void?onSaveInstanceState(Bundle?outState)?{Log.d(TAG,?CompTAG?+?":?"?+?"Activity-->onSaveInstanceState");super.onSaveInstanceState(outState);} }
使用Handler向一個Looper線程發(fā)送消息的過程,基本上即是,調(diào)用Message.obtain()或Handler.obtainMessage()獲取一個Message對象->設(shè)置Message對象->調(diào)用在Looper線程中創(chuàng)建的Handler對象的方法發(fā)送消息。
Handler究竟是如何知道要向哪個MessageQueue發(fā)送消息的呢?從前面的代碼中,我們似乎看不到任何Handler與MessageQueue能關(guān)聯(lián)起來的跡象。這究竟是怎么回事呢?這也是我們特別強調(diào)要使用Looper線程中創(chuàng)建的Handler對象來向該Looper線程中發(fā)送消息的原因。我們可以看一下Handler對象構(gòu)造的過程:
????/***?Default?constructor?associates?this?handler?with?the?{@link?Looper}?for?the*?current?thread.**?If?this?thread?does?not?have?a?looper,?this?handler?won't?be?able?to?receive?messages*?so?an?exception?is?thrown.*/public?Handler()?{this(null,?false);}/***?Constructor?associates?this?handler?with?the?{@link?Looper}?for?the*?current?thread?and?takes?a?callback?interface?in?which?you?can?handle*?messages.**?If?this?thread?does?not?have?a?looper,?this?handler?won't?be?able?to?receive?messages*?so?an?exception?is?thrown.**?@param?callback?The?callback?interface?in?which?to?handle?messages,?or?null.*/public?Handler(Callback?callback)?{this(callback,?false);}/***?Use?the?provided?{@link?Looper}?instead?of?the?default?one.**?@param?looper?The?looper,?must?not?be?null.*/public?Handler(Looper?looper)?{this(looper,?null,?false);}/***?Use?the?provided?{@link?Looper}?instead?of?the?default?one?and?take?a?callback*?interface?in?which?to?handle?messages.**?@param?looper?The?looper,?must?not?be?null.*?@param?callback?The?callback?interface?in?which?to?handle?messages,?or?null.*/public?Handler(Looper?looper,?Callback?callback)?{this(looper,?callback,?false);}/***?Use?the?{@link?Looper}?for?the?current?thread*?and?set?whether?the?handler?should?be?asynchronous.**?Handlers?are?synchronous?by?default?unless?this?constructor?is?used?to?make*?one?that?is?strictly?asynchronous.**?Asynchronous?messages?represent?interrupts?or?events?that?do?not?require?global?ordering*?with?represent?to?synchronous?messages.??Asynchronous?messages?are?not?subject?to*?the?synchronization?barriers?introduced?by?{@link?MessageQueue#enqueueSyncBarrier?long)}.**?@param?async?If?true,?the?handler?calls?{@link?Message#setAsynchronous(boolean)}?for*?each?{@link?Message}?that?is?sent?to?it?or?{@link?Runnable}?that?is?posted?to?it.**?@hide*/public?Handler(boolean?async)?{this(null,?async);}/***?Use?the?{@link?Looper}?for?the?current?thread?with?the?specified?callback?interface*?and?set?whether?the?handler?should?be?asynchronous.**?Handlers?are?synchronous?by?default?unless?this?constructor?is?used?to?make*?one?that?is?strictly?asynchronous.**?Asynchronous?messages?represent?interrupts?or?events?that?do?not?require?global?ordering*?with?represent?to?synchronous?messages.??Asynchronous?messages?are?not?subject?to*?the?synchronization?barriers?introduced?by?{@link?MessageQueue#enqueueSyncBarrier?long)}.**?@param?callback?The?callback?interface?in?which?to?handle?messages,?or?null.*?@param?async?If?true,?the?handler?calls?{@link?Message#setAsynchronous(boolean)}?for*?each?{@link?Message}?that?is?sent?to?it?or?{@link?Runnable}?that?is?posted?to?it.**?@hide*/public?Handler(Callback?callback,?boolean?async)?{if?(FIND_POTENTIAL_LEAKS)?{final?Class<??extends?Handler>?klass?=?getClass();if?((klass.isAnonymousClass()?||?klass.isMemberClass()?||?klass.isLocalClass())?&&(klass.getModifiers()?&?Modifier.STATIC)?==?0)?{Log.w(TAG,?"The?following?Handler?class?should?be?static?or?leaks?might?occur:?"?+klass.getCanonicalName());}}mLooper?=?Looper.myLooper();if?(mLooper?==?null)?{throw?new?RuntimeException("Can't?create?handler?inside?thread?that?has?not?called?Looper.prepare()");}mQueue?=?mLooper.mQueue;mCallback?=?callback;mAsynchronous?=?async;}/***?Use?the?provided?{@link?Looper}?instead?of?the?default?one?and?take?a?callback*?interface?in?which?to?handle?messages.??Also?set?whether?the?handler*?should?be?asynchronous.**?Handlers?are?synchronous?by?default?unless?this?constructor?is?used?to?make*?one?that?is?strictly?asynchronous.**?Asynchronous?messages?represent?interrupts?or?events?that?do?not?require?global?ordering*?with?respect?to?synchronous?messages.??Asynchronous?messages?are?not?subject?to*?the?synchronization?barriers?introduced?by?{@link?MessageQueue#enqueueSyncBarrier(long)}.**?@param?looper?The?looper,?must?not?be?null.*?@param?callback?The?callback?interface?in?which?to?handle?messages,?or?null.*?@param?async?If?true,?the?handler?calls?{@link?Message#setAsynchronous(boolean)}?for*?each?{@link?Message}?that?is?sent?to?it?or?{@link?Runnable}?that?is?posted?to?it.**?@hide*/public?Handler(Looper?looper,?Callback?callback,?boolean?async)?{mLooper?=?looper;mQueue?=?looper.mQueue;mCallback?=?callback;mAsynchronous?=?async;}
Handler的構(gòu)造函數(shù)共有7個,其中4個不需要傳遞Looper參數(shù),3個需要。前面我們用的是不需要傳遞Looper對象的構(gòu)造函數(shù),因而現(xiàn)在我們主要關(guān)注那4個不需要傳遞Looper參數(shù)的。它們都是Handler(Callback callback, boolean async)不同形式的封裝,而在這個構(gòu)造函數(shù)中是通過Looper.myLooper()獲取到當(dāng)前線程的Looper對象,并與相關(guān)的MessageQueue關(guān)聯(lián)起來的。這也是前面我們在實現(xiàn)Looper線程時,要在其run方法中創(chuàng)建一個public Handler的依據(jù)。
當(dāng)然我們也可以使用那些能夠手動傳遞Looper對象的構(gòu)造函數(shù),在構(gòu)造Handler對象時,顯式地使其與特定的Looper/消息隊列關(guān)聯(lián)起來。比如配合HandlerThread.getLooper()方法來用。
(這個地方我們看到,創(chuàng)建Handler對象時,可以傳遞另外兩個參數(shù),一個是Callback,另一個是async,那這兩個參數(shù)在這套消息隊列機制中,又起到一個什么樣的作用呢?后面我們在來解答這個問題。)
Handler提供了兩組函數(shù)用于向一個Looper線程的MessageQueue中發(fā)送消息,分別是postXXX()族和sendXXX()族,這兩組函數(shù)的調(diào)用關(guān)系大體如下所示:
我們可以先看一下sendXXX()族消息發(fā)送方法:
????/***?Pushes?a?message?onto?the?end?of?the?message?queue?after?all?pending?messages*?before?the?current?time.?It?will?be?received?in?{@link?#handleMessage},*?in?the?thread?attached?to?this?handler.*??*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?sendMessage(Message?msg){return?sendMessageDelayed(msg,?0);}/***?Sends?a?Message?containing?only?the?what?value.*??*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?sendEmptyMessage(int?what){return?sendEmptyMessageDelayed(what,?0);}/***?Sends?a?Message?containing?only?the?what?value,?to?be?delivered*?after?the?specified?amount?of?time?elapses.*?@see?#sendMessageDelayed(android.os.Message,?long)?*?*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?sendEmptyMessageDelayed(int?what,?long?delayMillis)?{Message?msg?=?Message.obtain();msg.what?=?what;return?sendMessageDelayed(msg,?delayMillis);}/***?Sends?a?Message?containing?only?the?what?value,?to?be?delivered?*?at?a?specific?time.*?@see?#sendMessageAtTime(android.os.Message,?long)*??*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?sendEmptyMessageAtTime(int?what,?long?uptimeMillis)?{Message?msg?=?Message.obtain();msg.what?=?what;return?sendMessageAtTime(msg,?uptimeMillis);}/***?Enqueue?a?message?into?the?message?queue?after?all?pending?messages*?before?(current?time?+?delayMillis).?You?will?receive?it?in*?{@link?#handleMessage},?in?the?thread?attached?to?this?handler.*??*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.??Note?that?a*?????????result?of?true?does?not?mean?the?message?will?be?processed?--?if*?????????the?looper?is?quit?before?the?delivery?time?of?the?message*?????????occurs?then?the?message?will?be?dropped.*/public?final?boolean?sendMessageDelayed(Message?msg,?long?delayMillis){if?(delayMillis?<?0)?{delayMillis?=?0;}return?sendMessageAtTime(msg,?SystemClock.uptimeMillis()?+?delayMillis);}/***?Enqueue?a?message?into?the?message?queue?after?all?pending?messages*?before?the?absolute?time?(in?milliseconds)?<var>uptimeMillis</var>.*?<b>The?time-base?is?{@link?android.os.SystemClock#uptimeMillis}.</b>*?You?will?receive?it?in?{@link?#handleMessage},?in?the?thread?attached*?to?this?handler.*?*?@param?uptimeMillis?The?absolute?time?at?which?the?message?should?be*?????????delivered,?using?the*?????????{@link?android.os.SystemClock#uptimeMillis}?time-base.*?????????*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.??Note?that?a*?????????result?of?true?does?not?mean?the?message?will?be?processed?--?if*?????????the?looper?is?quit?before?the?delivery?time?of?the?message*?????????occurs?then?the?message?will?be?dropped.*/public?boolean?sendMessageAtTime(Message?msg,?long?uptimeMillis)?{MessageQueue?queue?=?mQueue;if?(queue?==?null)?{RuntimeException?e?=?new?RuntimeException(this?+?"?sendMessageAtTime()?called?with?no?mQueue");Log.w("Looper",?e.getMessage(),?e);return?false;}return?enqueueMessage(queue,?msg,?uptimeMillis);}/***?Enqueue?a?message?at?the?front?of?the?message?queue,?to?be?processed?on*?the?next?iteration?of?the?message?loop.??You?will?receive?it?in*?{@link?#handleMessage},?in?the?thread?attached?to?this?handler.*?<b>This?method?is?only?for?use?in?very?special?circumstances?--?it*?can?easily?starve?the?message?queue,?cause?ordering?problems,?or?have*?other?unexpected?side-effects.</b>*??*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?sendMessageAtFrontOfQueue(Message?msg)?{MessageQueue?queue?=?mQueue;if?(queue?==?null)?{RuntimeException?e?=?new?RuntimeException(this?+?"?sendMessageAtTime()?called?with?no?mQueue");Log.w("Looper",?e.getMessage(),?e);return?false;}return?enqueueMessage(queue,?msg,?0);}private?boolean?enqueueMessage(MessageQueue?queue,?Message?msg,?long?uptimeMillis)?{msg.target?=?this;if?(mAsynchronous)?{msg.setAsynchronous(true);}return?queue.enqueueMessage(msg,?uptimeMillis);}
這些方法之間大多只有一些細微的差別,它們最終都調(diào)用Handler.enqueueMessage()/MessageQueue.enqueueMessage()方法。(哈哈,這個地方,被我們逮到一個Handler類某貢獻者所犯的copy-paste錯誤:仔細看一下sendMessageAtTime()和sendMessageAtFrontOfQueue()這兩個方法,創(chuàng)建RuntimeException的部分,消息中都是"sendMessageAtTime()"。)Handler實際的職責(zé),并不完全如它的名稱所示,在處理message外,它還要負責(zé)發(fā)送Message到MessageQueue。
注意,在Handler.enqueueMessage()中,會將Message的target設(shè)為this,而且是不加檢查,強制覆蓋原來的值。后面我們將會看到,Message的target就是Message被dispatched到的Handler,也是將會處理這條Message的Handler。這個地方強制覆蓋的邏輯表明,我們用哪個Handler來發(fā)送一條消息,那么這條消息就將會被dispatched給哪個Handler來處理,消息的發(fā)送者即接收者。這使得Handler那些obtainMessage()方法,及Message需要Handler參數(shù)的那些obtain()方法顯得非常多余。通過這些方法會在獲取Message時設(shè)置它的target,但這些設(shè)置又總是會在后面被無條件地覆蓋掉。
再來看一下MessageQueue.enqueueMessage()方法:
????boolean?enqueueMessage(Message?msg,?long?when)?{if?(msg.isInUse())?{throw?new?AndroidRuntimeException(msg?+?"?This?message?is?already?in?use.");}if?(msg.target?==?null)?{throw?new?AndroidRuntimeException("Message?must?have?a?target.");}boolean?needWake;synchronized?(this)?{if?(mQuiting)?{RuntimeException?e?=?new?RuntimeException(msg.target?+?"?sending?message?to?a?Handler?on?a?dead?thread");Log.w("MessageQueue",?e.getMessage(),?e);return?false;}msg.when?=?when;Message?p?=?mMessages;if?(p?==?null?||?when?==?0?||?when?<?p.when)?{//?New?head,?wake?up?the?event?queue?if?blocked.msg.next?=?p;mMessages?=?msg;needWake?=?mBlocked;}?else?{//?Inserted?within?the?middle?of?the?queue.??Usually?we?don't?have?to?wake//?up?the?event?queue?unless?there?is?a?barrier?at?the?head?of?the?queue//?and?the?message?is?the?earliest?asynchronous?message?in?the?queue.needWake?=?mBlocked?&&?p.target?==?null?&&?msg.isAsynchronous();Message?prev;for?(;;)?{prev?=?p;p?=?p.next;if?(p?==?null?||?when?<?p.when)?{break;}if?(needWake?&&?p.isAsynchronous())?{needWake?=?false;}}msg.next?=?p;?//?invariant:?p?==?prev.nextprev.next?=?msg;}}if?(needWake)?{nativeWake(mPtr);}return?true;}
此處我們看到,MessageQueue用一個單向鏈表來保存所有的Messages,在鏈表中各個Message按照其請求執(zhí)行的時間先后來排序。將Message插入MessageQueue的算法也還算清晰簡潔,不必贅述。但此處,MessageQueue/Message的author為什么要自己實現(xiàn)一個單向鏈表,而沒有用Java標(biāo)準庫提供的容器組件呢?是為了插入一條新的Message方便,還是僅僅為了練習(xí)怎么用Java實現(xiàn)單向鏈表?wake的邏輯后面我們會再來研究。
向MessageQueue中發(fā)送消息的postXXX()方法:
????/***?Causes?the?Runnable?r?to?be?added?to?the?message?queue.*?The?runnable?will?be?run?on?the?thread?to?which?this?handler?is*?attached.**?@param?r?The?Runnable?that?will?be?executed.**?@return?Returns?true?if?the?Runnable?was?successfully?placed?in?to?the*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?post(Runnable?r){return??sendMessageDelayed(getPostMessage(r),?0);}/***?Causes?the?Runnable?r?to?be?added?to?the?message?queue,?to?be?run*?at?a?specific?time?given?by?<var>uptimeMillis</var>.*?<b>The?time-base?is?{@link?android.os.SystemClock#uptimeMillis}.</b>*?Time?spent?in?deep?sleep?will?add?an?additional?delay?to?execution.*?The?runnable?will?be?run?on?the?thread?to?which?this?handler?is?attached.**?@param?r?The?Runnable?that?will?be?executed.*?@param?uptimeMillis?The?absolute?time?at?which?the?callback?should?run,*?????????using?the?{@link?android.os.SystemClock#uptimeMillis}?time-base.*??*?@return?Returns?true?if?the?Runnable?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.??Note?that?a*?????????result?of?true?does?not?mean?the?Runnable?will?be?processed?--?if*?????????the?looper?is?quit?before?the?delivery?time?of?the?message*?????????occurs?then?the?message?will?be?dropped.*/public?final?boolean?postAtTime(Runnable?r,?long?uptimeMillis){return?sendMessageAtTime(getPostMessage(r),?uptimeMillis);}/***?Causes?the?Runnable?r?to?be?added?to?the?message?queue,?to?be?run*?at?a?specific?time?given?by?<var>uptimeMillis</var>.*?<b>The?time-base?is?{@link?android.os.SystemClock#uptimeMillis}.</b>*?Time?spent?in?deep?sleep?will?add?an?additional?delay?to?execution.*?The?runnable?will?be?run?on?the?thread?to?which?this?handler?is?attached.**?@param?r?The?Runnable?that?will?be?executed.*?@param?uptimeMillis?The?absolute?time?at?which?the?callback?should?run,*?????????using?the?{@link?android.os.SystemClock#uptimeMillis}?time-base.*?*?@return?Returns?true?if?the?Runnable?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.??Note?that?a*?????????result?of?true?does?not?mean?the?Runnable?will?be?processed?--?if*?????????the?looper?is?quit?before?the?delivery?time?of?the?message*?????????occurs?then?the?message?will?be?dropped.*?????????*?@see?android.os.SystemClock#uptimeMillis*/public?final?boolean?postAtTime(Runnable?r,?Object?token,?long?uptimeMillis){return?sendMessageAtTime(getPostMessage(r,?token),?uptimeMillis);}/***?Causes?the?Runnable?r?to?be?added?to?the?message?queue,?to?be?run*?after?the?specified?amount?of?time?elapses.*?The?runnable?will?be?run?on?the?thread?to?which?this?handler*?is?attached.*?<b>The?time-base?is?{@link?android.os.SystemClock#uptimeMillis}.</b>*?Time?spent?in?deep?sleep?will?add?an?additional?delay?to?execution.*??*?@param?r?The?Runnable?that?will?be?executed.*?@param?delayMillis?The?delay?(in?milliseconds)?until?the?Runnable*????????will?be?executed.*????????*?@return?Returns?true?if?the?Runnable?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.??Note?that?a*?????????result?of?true?does?not?mean?the?Runnable?will?be?processed?--*?????????if?the?looper?is?quit?before?the?delivery?time?of?the?message*?????????occurs?then?the?message?will?be?dropped.*/public?final?boolean?postDelayed(Runnable?r,?long?delayMillis){return?sendMessageDelayed(getPostMessage(r),?delayMillis);}/***?Posts?a?message?to?an?object?that?implements?Runnable.*?Causes?the?Runnable?r?to?executed?on?the?next?iteration?through?the*?message?queue.?The?runnable?will?be?run?on?the?thread?to?which?this*?handler?is?attached.*?<b>This?method?is?only?for?use?in?very?special?circumstances?--?it*?can?easily?starve?the?message?queue,?cause?ordering?problems,?or?have*?other?unexpected?side-effects.</b>*??*?@param?r?The?Runnable?that?will?be?executed.*?*?@return?Returns?true?if?the?message?was?successfully?placed?in?to?the?*?????????message?queue.??Returns?false?on?failure,?usually?because?the*?????????looper?processing?the?message?queue?is?exiting.*/public?final?boolean?postAtFrontOfQueue(Runnable?r){return?sendMessageAtFrontOfQueue(getPostMessage(r));}private?static?Message?getPostMessage(Runnable?r)?{Message?m?=?Message.obtain();m.callback?=?r;return?m;}private?static?Message?getPostMessage(Runnable?r,?Object?token)?{Message?m?=?Message.obtain();m.obj?=?token;m.callback?=?r;return?m;}
這組方法相對于前面的sendXXX()族方法而言,其特殊之處在于,它們都需要傳入一個Runnable參數(shù),post的消息,其特殊之處也正在于,Message的callback將是傳入的Runnable對象。這些特別的地方將影響這些消息在dispatch時的行為。
消息隊列中消息的處理
消息隊列中的消息是在Looper.loop()中被取出處理的:
????/***?Run?the?message?queue?in?this?thread.?Be?sure?to?call*?{@link?#quit()}?to?end?the?loop.*/public?static?void?loop()?{final?Looper?me?=?myLooper();if?(me?==?null)?{throw?new?RuntimeException("No?Looper;?Looper.prepare()?wasn't?called?on?this?thread.");}final?MessageQueue?queue?=?me.mQueue;//?Make?sure?the?identity?of?this?thread?is?that?of?the?local?process,//?and?keep?track?of?what?that?identity?token?actually?is.Binder.clearCallingIdentity();final?long?ident?=?Binder.clearCallingIdentity();for?(;;)?{Message?msg?=?queue.next();?//?might?blockif?(msg?==?null)?{//?No?message?indicates?that?the?message?queue?is?quitting.return;}//?This?must?be?in?a?local?variable,?in?case?a?UI?event?sets?the?loggerPrinter?logging?=?me.mLogging;if?(logging?!=?null)?{logging.println(">>>>>?Dispatching?to?"?+?msg.target?+?"?"?+msg.callback?+?":?"?+?msg.what);}msg.target.dispatchMessage(msg);if?(logging?!=?null)?{logging.println("<<<<<?Finished?to?"?+?msg.target?+?"?"?+?msg.callback);}//?Make?sure?that?during?the?course?of?dispatching?the//?identity?of?the?thread?wasn't?corrupted.final?long?newIdent?=?Binder.clearCallingIdentity();if?(ident?!=?newIdent)?{Log.wtf(TAG,?"Thread?identity?changed?from?0x"+?Long.toHexString(ident)?+?"?to?0x"+?Long.toHexString(newIdent)?+?"?while?dispatching?to?"+?msg.target.getClass().getName()?+?"?"+?msg.callback?+?"?what="?+?msg.what);}msg.recycle();}}
在取出的msg為NULL之前,消息處理循環(huán)都一直運行。為NULL的msg表明消息隊列被停掉了。在這個循環(huán)中,被取出的消息會被dispatch給一個Handler處理,即是msg.target,執(zhí)行Handler.dispatchMessage()方法。
注意:Looper.loop()循環(huán)體的末尾,調(diào)用了從消息隊列中取出且已經(jīng)被dispatched處理的Message的recycle()方法,這表明Looper會幫我們自動回收發(fā)送到它的消息隊列的消息。在我們將一條消息發(fā)送給Looper線程的消息隊列之后,我們就不需要再擔(dān)心消息的回收問題了,Looper自會幫我們很好的處理
接著來看Handler.dispatchMessage():
????/***?Callback?interface?you?can?use?when?instantiating?a?Handler?to?avoid*?having?to?implement?your?own?subclass?of?Handler.**?@param?msg?A?{@link?android.os.Message?Message}?object*?@return?True?if?no?further?handling?is?desired*/public?interface?Callback?{public?boolean?handleMessage(Message?msg);}/***?Subclasses?must?implement?this?to?receive?messages.*/public?void?handleMessage(Message?msg)?{}/***?Handle?system?messages?here.*/public?void?dispatchMessage(Message?msg)?{if?(msg.callback?!=?null)?{handleCallback(msg);}?else?{if?(mCallback?!=?null)?{if?(mCallback.handleMessage(msg))?{return;}}handleMessage(msg);}}private?static?void?handleCallback(Message?message)?{message.callback.run();}
可以認為有三種對象可能會實際處理一條消息,分別是消息的Runnable callback,Handler的Callback mCallback和Handler對象本身。但這三種對象在獲取消息的處理權(quán)方面有一定的優(yōu)先級,消息的Runnable callback優(yōu)先級最高,在它不為空時,只執(zhí)行這個callback;優(yōu)先級次高的是Handler的Callback mCallback,在它不為空時,Handler會先把消息丟給它處理,如果它不處理返回了false,Handler才會調(diào)用自己的handleMessage()來處理。
通常,Handler的handleMessage()方法通常需要override,來實現(xiàn)消息處理的主要邏輯。Handler的mCallback,使得開發(fā)者可以比較方便的將消息處理的邏輯和發(fā)送消息的Handler完全分開。
此處也可見post消息的特殊之處,此類消息將完全繞過Handler中用于處理消息的handleMessage() 方法,而只會執(zhí)行消息的sender所實現(xiàn)的Runnable。
Sleep-Wakeup機制
還有一個問題,當(dāng)MessageQueue中沒有Messages時,Looper線程會做什么呢?它會不停地輪詢,并檢查消息隊列中是否有消息嗎?計算機科學(xué)發(fā)展到現(xiàn)在,閉上眼睛我們都能猜到,Looper線程一定不會去輪詢的。Looper線程也確實沒有去輪詢消息隊列。在消息隊列為空時,Looper線程會去休眠,然后在消息隊列中有了消息之后,再被喚醒。但這樣的機制又是如何實現(xiàn)的呢?
Sleep-Wakeup機制所需設(shè)施的建立
我們從Sleep-Wakeup機制所需設(shè)施的建立開始。回憶前面的Looper構(gòu)造函數(shù),它會創(chuàng)建一個MessageQueue對象,而Sleep-Wakeup機制所需設(shè)施正是在MessageQueue對象的創(chuàng)建過程中創(chuàng)建出來的。(在android消息隊列機制中,消息取出和壓入的主要邏輯都在MessageQueue中完成,MessageQueue實現(xiàn)一個定制的阻塞隊列,將等待-喚醒的邏輯都放在這個類里想必也沒什么讓人吃驚的地方吧。)我們來看MessageQueue的構(gòu)造函數(shù):
????MessageQueue(boolean?quitAllowed)?{mQuitAllowed?=?quitAllowed;mPtr?=?nativeInit();}private?native?static?long?nativeInit();
這個方法調(diào)用nativeInit()方法來創(chuàng)建Sleep-Wakeup機制所需設(shè)施。來看nativeInit()的實現(xiàn)(在frameworks/base/core/jni/android_os_MessageQueue.cpp):
NativeMessageQueue::NativeMessageQueue()?:?mInCallback(false),?mExceptionObj(NULL)?{mLooper?=?Looper::getForThread();if?(mLooper?==?NULL)?{mLooper?=?new?Looper(false);Looper::setForThread(mLooper);} }static?jint?android_os_MessageQueue_nativeInit(JNIEnv*?env,?jclass?clazz)?{NativeMessageQueue*?nativeMessageQueue?=?new?NativeMessageQueue();if?(!nativeMessageQueue)?{jniThrowRuntimeException(env,?"Unable?to?allocate?native?queue");return?0;}nativeMessageQueue->incStrong(env);return?reinterpret_cast<jint>(nativeMessageQueue); }static?JNINativeMethod?gMessageQueueMethods[]?=?{/*?name,?signature,?funcPtr?*/{?"nativeInit",?"()I",?(void*)android_os_MessageQueue_nativeInit?},{?"nativeDestroy",?"(I)V",?(void*)android_os_MessageQueue_nativeDestroy?},{?"nativePollOnce",?"(II)V",?(void*)android_os_MessageQueue_nativePollOnce?},{?"nativeWake",?"(I)V",?(void*)android_os_MessageQueue_nativeWake?} };
可以看到,nativeInit()所做的事情,就是創(chuàng)建一個NativeMessageQueue對象,在NativeMessageQueue的構(gòu)造函數(shù)中,會來創(chuàng)建一個Looper對象。與Java層的Looper對象類似,native層的這種Looper對象也是保存在線程局部存儲變量中的,每個線程一個。接著我們來看Looper類的構(gòu)造函數(shù)和Looper::getForThread()函數(shù),來了解一下,native層的線程局部存儲API的用法(Looper類的實現(xiàn)在frameworks/native/libs/utils/Looper.cpp):
//?Hint?for?number?of?file?descriptors?to?be?associated?with?the?epoll?instance. static?const?int?EPOLL_SIZE_HINT?=?8;//?Maximum?number?of?file?descriptors?for?which?to?retrieve?poll?events?each?iteration. static?const?int?EPOLL_MAX_EVENTS?=?16;static?pthread_once_t?gTLSOnce?=?PTHREAD_ONCE_INIT; static?pthread_key_t?gTLSKey?=?0;Looper::Looper(bool?allowNonCallbacks)?:mAllowNonCallbacks(allowNonCallbacks),?mSendingMessage(false),mResponseIndex(0),?mNextMessageUptime(LLONG_MAX)?{int?wakeFds[2];int?result?=?pipe(wakeFds);LOG_ALWAYS_FATAL_IF(result?!=?0,?"Could?not?create?wake?pipe.??errno=%d",?errno);mWakeReadPipeFd?=?wakeFds[0];mWakeWritePipeFd?=?wakeFds[1];result?=?fcntl(mWakeReadPipeFd,?F_SETFL,?O_NONBLOCK);LOG_ALWAYS_FATAL_IF(result?!=?0,?"Could?not?make?wake?read?pipe?non-blocking.??errno=%d",errno);result?=?fcntl(mWakeWritePipeFd,?F_SETFL,?O_NONBLOCK);LOG_ALWAYS_FATAL_IF(result?!=?0,?"Could?not?make?wake?write?pipe?non-blocking.??errno=%d",errno);//?Allocate?the?epoll?instance?and?register?the?wake?pipe.mEpollFd?=?epoll_create(EPOLL_SIZE_HINT);LOG_ALWAYS_FATAL_IF(mEpollFd?<?0,?"Could?not?create?epoll?instance.??errno=%d",?errno);struct?epoll_event?eventItem;memset(&?eventItem,?0,?sizeof(epoll_event));?//?zero?out?unused?members?of?data?field?unioneventItem.events?=?EPOLLIN;eventItem.data.fd?=?mWakeReadPipeFd;result?=?epoll_ctl(mEpollFd,?EPOLL_CTL_ADD,?mWakeReadPipeFd,?&?eventItem);LOG_ALWAYS_FATAL_IF(result?!=?0,?"Could?not?add?wake?read?pipe?to?epoll?instance.??errno=%d",errno); }void?Looper::initTLSKey()?{int?result?=?pthread_key_create(&?gTLSKey,?threadDestructor);LOG_ALWAYS_FATAL_IF(result?!=?0,?"Could?not?allocate?TLS?key."); }void?Looper::threadDestructor(void?*st)?{Looper*?const?self?=?static_cast<Looper*>(st);if?(self?!=?NULL)?{self->decStrong((void*)threadDestructor);} }void?Looper::setForThread(const?sp<Looper>&?looper)?{sp<Looper>?old?=?getForThread();?//?also?has?side-effect?of?initializing?TLSif?(looper?!=?NULL)?{looper->incStrong((void*)threadDestructor);}pthread_setspecific(gTLSKey,?looper.get());if?(old?!=?NULL)?{old->decStrong((void*)threadDestructor);} }sp<Looper>?Looper::getForThread()?{int?result?=?pthread_once(&?gTLSOnce,?initTLSKey);LOG_ALWAYS_FATAL_IF(result?!=?0,?"pthread_once?failed");return?(Looper*)pthread_getspecific(gTLSKey); }
關(guān)于pthread庫提供的線程局部存儲API的用法,可以看到,每個線程局部存儲對象,都需要一個key,通過pthread_key_create()函數(shù)創(chuàng)建,隨后各個線程就可以通過這個key并借助于pthread_setspecific()和pthread_getspecific()函數(shù)來保存或者獲取相應(yīng)的線程局部存儲的變量了。再來看Looper的構(gòu)造函數(shù)。它創(chuàng)建了一個pipe,兩個文件描述符。然后設(shè)置管道的兩個文件描述屬性為非阻塞I/O。接著是創(chuàng)建并設(shè)置epoll實例。由此我們了解到,android的消息隊列是通過epoll機制來實現(xiàn)其Sleep-Wakeup機制的。
喚醒
然后來看當(dāng)其他線程向Looper線程的MessageQueue中插入了消息時,Looper線程是如何被叫醒的。回憶我們前面看到的MessageQueue類的enqueueMessage()方法,它在最后插入消息之后,有調(diào)用一個nativeWake()方法。沒錯,正是這個nativeWake()方法執(zhí)行了叫醒Looper線程的動作。那它又是如何叫醒Looper線程的呢?來看它的實現(xiàn):
static?void?android_os_MessageQueue_nativeWake(JNIEnv*?env,?jclass?clazz,?jint?ptr)?{NativeMessageQueue*?nativeMessageQueue?=?reinterpret_cast<NativeMessageQueue*>(ptr);return?nativeMessageQueue->wake(); }//?----------------------------------------------------------------------------static?JNINativeMethod?gMessageQueueMethods[]?=?{/*?name,?signature,?funcPtr?*/{?"nativeInit",?"()I",?(void*)android_os_MessageQueue_nativeInit?},{?"nativeDestroy",?"(I)V",?(void*)android_os_MessageQueue_nativeDestroy?},{?"nativePollOnce",?"(II)V",?(void*)android_os_MessageQueue_nativePollOnce?},{?"nativeWake",?"(I)V",?(void*)android_os_MessageQueue_nativeWake?} };
它只是調(diào)用了native層的Looper對象的wake()函數(shù)。接著再來看native Looper的wake()函數(shù):
void?Looper::wake()?{ #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?wake",?this); #endifssize_t?nWrite;do?{nWrite?=?write(mWakeWritePipeFd,?"W",?1);}?while?(nWrite?==?-1?&&?errno?==?EINTR);if?(nWrite?!=?1)?{if?(errno?!=?EAGAIN)?{ALOGW("Could?not?write?wake?signal,?errno=%d",?errno);}} }
它所做的事情,就是向管道的用于寫的那個文件中寫入一個“W”字符。
休眠
Looper線程休眠的過程。我們知道,Looper線程在Looper.loop()方法中,不斷地從MessageQueue中取出消息,然后處理,如此循環(huán)往復(fù),永不止息。不難想象,休眠的時機應(yīng)該是在取出消息的時候。Looper.loop()通過MessageQueue.next()從消息隊列中取出消息。來看MessageQueue.next()方法:
????Message?next()?{int?pendingIdleHandlerCount?=?-1;?//?-1?only?during?first?iterationint?nextPollTimeoutMillis?=?0;for?(;;)?{if?(nextPollTimeoutMillis?!=?0)?{Binder.flushPendingCommands();}nativePollOnce(mPtr,?nextPollTimeoutMillis);synchronized?(this)?{//?Try?to?retrieve?the?next?message.??Return?if?found.final?long?now?=?SystemClock.uptimeMillis();Message?prevMsg?=?null;Message?msg?=?mMessages;if?(msg?!=?null?&&?msg.target?==?null)?{//?Stalled?by?a?barrier.??Find?the?next?asynchronous?message?in?the?queue.do?{prevMsg?=?msg;msg?=?msg.next;}?while?(msg?!=?null?&&?!msg.isAsynchronous());}if?(msg?!=?null)?{if?(now?<?msg.when)?{//?Next?message?is?not?ready.??Set?a?timeout?to?wake?up?when?it?is?ready.nextPollTimeoutMillis?=?(int)?Math.min(msg.when?-?now,?Integer.MAX_VALUE);}?else?{//?Got?a?message.mBlocked?=?false;if?(prevMsg?!=?null)?{prevMsg.next?=?msg.next;}?else?{mMessages?=?msg.next;}msg.next?=?null;if?(false)?Log.v("MessageQueue",?"Returning?message:?"?+?msg);msg.markInUse();return?msg;}}?else?{//?No?more?messages.nextPollTimeoutMillis?=?-1;}//?Process?the?quit?message?now?that?all?pending?messages?have?been?handled.if?(mQuiting)?{dispose();return?null;}//?If?first?time?idle,?then?get?the?number?of?idlers?to?run.//?Idle?handles?only?run?if?the?queue?is?empty?or?if?the?first?message//?in?the?queue?(possibly?a?barrier)?is?due?to?be?handled?in?the?future.if?(pendingIdleHandlerCount?<?0&&?(mMessages?==?null?||?now?<?mMessages.when))?{pendingIdleHandlerCount?=?mIdleHandlers.size();}if?(pendingIdleHandlerCount?<=?0)?{//?No?idle?handlers?to?run.??Loop?and?wait?some?more.mBlocked?=?true;continue;}if?(mPendingIdleHandlers?==?null)?{mPendingIdleHandlers?=?new?IdleHandler[Math.max(pendingIdleHandlerCount,?4)];}mPendingIdleHandlers?=?mIdleHandlers.toArray(mPendingIdleHandlers);}//?Run?the?idle?handlers.//?We?only?ever?reach?this?code?block?during?the?first?iteration.for?(int?i?=?0;?i?<?pendingIdleHandlerCount;?i++)?{final?IdleHandler?idler?=?mPendingIdleHandlers[i];mPendingIdleHandlers[i]?=?null;?//?release?the?reference?to?the?handlerboolean?keep?=?false;try?{keep?=?idler.queueIdle();}?catch?(Throwable?t)?{Log.wtf("MessageQueue",?"IdleHandler?threw?exception",?t);}if?(!keep)?{synchronized?(this)?{mIdleHandlers.remove(idler);}}}//?Reset?the?idle?handler?count?to?0?so?we?do?not?run?them?again.pendingIdleHandlerCount?=?0;//?While?calling?an?idle?handler,?a?new?message?could?have?been?delivered//?so?go?back?and?look?again?for?a?pending?message?without?waiting.nextPollTimeoutMillis?=?0;}}
值得注意的是上面那個對于nativePollOnce()的調(diào)用。wait機制的實現(xiàn)正在于此。來看這個方法的實現(xiàn),在native的JNI code里面:
class?MessageQueue?:?public?RefBase?{ public:/*?Gets?the?message?queue's?looper.?*/inline?sp<Looper>?getLooper()?const?{return?mLooper;}/*?Checks?whether?the?JNI?environment?has?a?pending?exception.**?If?an?exception?occurred,?logs?it?together?with?the?specified?message,*?and?calls?raiseException()?to?ensure?the?exception?will?be?raised?when*?the?callback?returns,?clears?the?pending?exception?from?the?environment,*?then?returns?true.**?If?no?exception?occurred,?returns?false.*/bool?raiseAndClearException(JNIEnv*?env,?const?char*?msg);/*?Raises?an?exception?from?within?a?callback?function.*?The?exception?will?be?rethrown?when?control?returns?to?the?message?queue?which*?will?typically?cause?the?application?to?crash.**?This?message?can?only?be?called?from?within?a?callback?function.??If?it?is?called*?at?any?other?time,?the?process?will?simply?be?killed.**?Does?nothing?if?exception?is?NULL.**?(This?method?does?not?take?ownership?of?the?exception?object?reference.*?The?caller?is?responsible?for?releasing?its?reference?when?it?is?done.)*/virtual?void?raiseException(JNIEnv*?env,?const?char*?msg,?jthrowable?exceptionObj)?=?0;protected:MessageQueue();virtual?~MessageQueue();protected:sp<Looper>?mLooper; };class?NativeMessageQueue?:?public?MessageQueue?{ public:NativeMessageQueue();virtual?~NativeMessageQueue();virtual?void?raiseException(JNIEnv*?env,?const?char*?msg,?jthrowable?exceptionObj);void?pollOnce(JNIEnv*?env,?int?timeoutMillis);void?wake();private:bool?mInCallback;jthrowable?mExceptionObj; };void?NativeMessageQueue::pollOnce(JNIEnv*?env,?int?timeoutMillis)?{mInCallback?=?true;mLooper->pollOnce(timeoutMillis);mInCallback?=?false;if?(mExceptionObj)?{env->Throw(mExceptionObj);env->DeleteLocalRef(mExceptionObj);mExceptionObj?=?NULL;} }static?void?android_os_MessageQueue_nativePollOnce(JNIEnv*?env,?jclass?clazz,jint?ptr,?jint?timeoutMillis)?{NativeMessageQueue*?nativeMessageQueue?=?reinterpret_cast<NativeMessageQueue*>(ptr);nativeMessageQueue->pollOnce(env,?timeoutMillis); }
繼續(xù)追Looper::pollOnce()的實現(xiàn)(在frameworks/native/libs/utils/Looper.cpp):
int?Looper::pollOnce(int?timeoutMillis,?int*?outFd,?int*?outEvents,?void**?outData)?{int?result?=?0;for?(;;)?{while?(mResponseIndex?<?mResponses.size())?{const?Response&?response?=?mResponses.itemAt(mResponseIndex++);int?ident?=?response.request.ident;if?(ident?>=?0)?{int?fd?=?response.request.fd;int?events?=?response.events;void*?data?=?response.request.data; #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?returning?signalled?identifier?%d:?""fd=%d,?events=0x%x,?data=%p",this,?ident,?fd,?events,?data); #endifif?(outFd?!=?NULL)?*outFd?=?fd;if?(outEvents?!=?NULL)?*outEvents?=?events;if?(outData?!=?NULL)?*outData?=?data;return?ident;}}if?(result?!=?0)?{ #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?returning?result?%d",?this,?result); #endifif?(outFd?!=?NULL)?*outFd?=?0;if?(outEvents?!=?NULL)?*outEvents?=?0;if?(outData?!=?NULL)?*outData?=?NULL;return?result;}result?=?pollInner(timeoutMillis);} }int?Looper::pollInner(int?timeoutMillis)?{ #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?waiting:?timeoutMillis=%d",?this,?timeoutMillis); #endif//?Adjust?the?timeout?based?on?when?the?next?message?is?due.if?(timeoutMillis?!=?0?&&?mNextMessageUptime?!=?LLONG_MAX)?{nsecs_t?now?=?systemTime(SYSTEM_TIME_MONOTONIC);int?messageTimeoutMillis?=?toMillisecondTimeoutDelay(now,?mNextMessageUptime);if?(messageTimeoutMillis?>=?0&&?(timeoutMillis?<?0?||?messageTimeoutMillis?<?timeoutMillis))?{timeoutMillis?=?messageTimeoutMillis;} #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?next?message?in?%lldns,?adjusted?timeout:?timeoutMillis=%d",this,?mNextMessageUptime?-?now,?timeoutMillis); #endif}//?Poll.int?result?=?ALOOPER_POLL_WAKE;mResponses.clear();mResponseIndex?=?0;struct?epoll_event?eventItems[EPOLL_MAX_EVENTS];int?eventCount?=?epoll_wait(mEpollFd,?eventItems,?EPOLL_MAX_EVENTS,?timeoutMillis);//?Acquire?lock.mLock.lock();//?Check?for?poll?error.if?(eventCount?<?0)?{if?(errno?==?EINTR)?{goto?Done;}ALOGW("Poll?failed?with?an?unexpected?error,?errno=%d",?errno);result?=?ALOOPER_POLL_ERROR;goto?Done;}//?Check?for?poll?timeout.if?(eventCount?==?0)?{ #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?timeout",?this); #endifresult?=?ALOOPER_POLL_TIMEOUT;goto?Done;}//?Handle?all?events. #if?DEBUG_POLL_AND_WAKEALOGD("%p?~?pollOnce?-?handling?events?from?%d?fds",?this,?eventCount); #endiffor?(int?i?=?0;?i?<?eventCount;?i++)?{int?fd?=?eventItems[i].data.fd;uint32_t?epollEvents?=?eventItems[i].events;if?(fd?==?mWakeReadPipeFd)?{if?(epollEvents?&?EPOLLIN)?{awoken();}?else?{ALOGW("Ignoring?unexpected?epoll?events?0x%x?on?wake?read?pipe.",?epollEvents);}}?else?{ssize_t?requestIndex?=?mRequests.indexOfKey(fd);if?(requestIndex?>=?0)?{int?events?=?0;if?(epollEvents?&?EPOLLIN)?events?|=?ALOOPER_EVENT_INPUT;if?(epollEvents?&?EPOLLOUT)?events?|=?ALOOPER_EVENT_OUTPUT;if?(epollEvents?&?EPOLLERR)?events?|=?ALOOPER_EVENT_ERROR;if?(epollEvents?&?EPOLLHUP)?events?|=?ALOOPER_EVENT_HANGUP;pushResponse(events,?mRequests.valueAt(requestIndex));}?else?{ALOGW("Ignoring?unexpected?epoll?events?0x%x?on?fd?%d?that?is?""no?longer?registered.",?epollEvents,?fd);}}} Done:?;//?Invoke?pending?message?callbacks.mNextMessageUptime?=?LLONG_MAX;while?(mMessageEnvelopes.size()?!=?0)?{nsecs_t?now?=?systemTime(SYSTEM_TIME_MONOTONIC);const?MessageEnvelope&?messageEnvelope?=?mMessageEnvelopes.itemAt(0);if?(messageEnvelope.uptime?<=?now)?{//?Remove?the?envelope?from?the?list.//?We?keep?a?strong?reference?to?the?handler?until?the?call?to?handleMessage//?finishes.??Then?we?drop?it?so?that?the?handler?can?be?deleted?*before*//?we?reacquire?our?lock.{?//?obtain?handlersp<MessageHandler>?handler?=?messageEnvelope.handler;Message?message?=?messageEnvelope.message;mMessageEnvelopes.removeAt(0);mSendingMessage?=?true;mLock.unlock();#if?DEBUG_POLL_AND_WAKE?||?DEBUG_CALLBACKSALOGD("%p?~?pollOnce?-?sending?message:?handler=%p,?what=%d",this,?handler.get(),?message.what); #endifhandler->handleMessage(message);}?//?release?handlermLock.lock();mSendingMessage?=?false;result?=?ALOOPER_POLL_CALLBACK;}?else?{//?The?last?message?left?at?the?head?of?the?queue?determines?the?next?wakeup?time.mNextMessageUptime?=?messageEnvelope.uptime;break;}}//?Release?lock.mLock.unlock();//?Invoke?all?response?callbacks.for?(size_t?i?=?0;?i?<?mResponses.size();?i++)?{Response&?response?=?mResponses.editItemAt(i);if?(response.request.ident?==?ALOOPER_POLL_CALLBACK)?{int?fd?=?response.request.fd;int?events?=?response.events;void*?data?=?response.request.data; #if?DEBUG_POLL_AND_WAKE?||?DEBUG_CALLBACKSALOGD("%p?~?pollOnce?-?invoking?fd?event?callback?%p:?fd=%d,?events=0x%x,?data=%p",this,?response.request.callback.get(),?fd,?events,?data); #endifint?callbackResult?=?response.request.callback->handleEvent(fd,?events,?data);if?(callbackResult?==?0)?{removeFd(fd);}//?Clear?the?callback?reference?in?the?response?structure?promptly?because?we//?will?not?clear?the?response?vector?itself?until?the?next?poll.response.request.callback.clear();result?=?ALOOPER_POLL_CALLBACK;}}return?result; }
它通過調(diào)用epoll_wait()函數(shù)來等待消息的到來。
HandlerThread的退出
HandlerThread或使用Looper/MessageQueue自定義的類似東西,必須在不需要時被停掉。如前所見,每次創(chuàng)建MessageQueue,都會占用好幾個描述符,但在Linux/Android上,一個進程所能打開的文件描述符的最大個數(shù)是有限制的,大多為1024個。如果一個app打開的文件描述符達到了這個上限,則將會出現(xiàn)許多各式各樣的古怪問題,像進程間通信,socket,輸入系統(tǒng)等很多機制,都會依賴于類文件的東西。同時,HandlerThread也總會占用一個線程。這些資源都是只有在顯式地停掉之后才會被釋放的。
我們來看一下HandlerThread的退出機制。先來看一個使用了HandlerThread,并適時地退出的例子,代碼在frameworks/base/core/java/android/app/IntentService.java:
????@Overridepublic?void?onCreate()?{//?TODO:?It?would?be?nice?to?have?an?option?to?hold?a?partial?wakelock//?during?processing,?and?to?have?a?static?startService(Context,?Intent)//?method?that?would?launch?the?service?&?hand?off?a?wakelock.super.onCreate();HandlerThread?thread?=?new?HandlerThread("IntentService["?+?mName?+?"]");thread.start();mServiceLooper?=?thread.getLooper();mServiceHandler?=?new?ServiceHandler(mServiceLooper);}@Overridepublic?void?onStart(Intent?intent,?int?startId)?{Message?msg?=?mServiceHandler.obtainMessage();msg.arg1?=?startId;msg.obj?=?intent;mServiceHandler.sendMessage(msg);}/***?You?should?not?override?this?method?for?your?IntentService.?Instead,*?override?{@link?#onHandleIntent},?which?the?system?calls?when?the?IntentService*?receives?a?start?request.*?@see?android.app.Service#onStartCommand*/@Overridepublic?int?onStartCommand(Intent?intent,?int?flags,?int?startId)?{onStart(intent,?startId);return?mRedelivery???START_REDELIVER_INTENT?:?START_NOT_STICKY;}@Overridepublic?void?onDestroy()?{mServiceLooper.quit();}
在這個例子中,Service的onCreate()方法創(chuàng)建了HandlerThread,保存了HandlerThread的Looper,然后在Service的onDestroy()方法中停掉了Looper,實際上也即是退出了HandlerThread。
來看一下Looper停止方法具體的實現(xiàn):
????/***?Quits?the?looper.*?<p>*?Causes?the?{@link?#loop}?method?to?terminate?without?processing?any*?more?messages?in?the?message?queue.*?</p><p>*?Any?attempt?to?post?messages?to?the?queue?after?the?looper?is?asked?to?quit?will?fail.*?For?example,?the?{@link?Handler#sendMessage(Message)}?method?will?return?false.*?</p><p?class="note">*?Using?this?method?may?be?unsafe?because?some?messages?may?not?be?delivered*?before?the?looper?terminates.??Consider?using?{@link?#quitSafely}?instead?to?ensure*?that?all?pending?work?is?completed?in?an?orderly?manner.*?</p>**?@see?#quitSafely*/public?void?quit()?{mQueue.quit(false);}/***?Quits?the?looper?safely.*?<p>*?Causes?the?{@link?#loop}?method?to?terminate?as?soon?as?all?remaining?messages*?in?the?message?queue?that?are?already?due?to?be?delivered?have?been?handled.*?However?pending?delayed?messages?with?due?times?in?the?future?will?not?be*?delivered?before?the?loop?terminates.*?</p><p>*?Any?attempt?to?post?messages?to?the?queue?after?the?looper?is?asked?to?quit?will?fail.*?For?example,?the?{@link?Handler#sendMessage(Message)}?method?will?return?false.*?</p>*/public?void?quitSafely()?{mQueue.quit(true);}
Looper有提供兩個退出方法,quit()和quitSafely(),這兩個方法都會阻止再向消息隊列中發(fā)送消息。但它們的主要區(qū)別在于,前者不會再處理消息隊列中還沒有被處理的所有消息,而后者則會在處理完那些已經(jīng)到期的消息之后才真的退出。
由前面我們對Looper.loop()方法的分析,也不難理解,MessageQueue退出即Looper退出。此處也是直接調(diào)用了MessageQueue.quit()方法。那我們就來看一下MessageQueue的quit()方法:
????void?quit(boolean?safe)?{if?(!mQuitAllowed)?{throw?new?RuntimeException("Main?thread?not?allowed?to?quit.");}synchronized?(this)?{if?(mQuitting)?{return;}mQuitting?=?true;if?(safe)?{removeAllFutureMessagesLocked();}?else?{removeAllMessagesLocked();}//?We?can?assume?mPtr?!=?0?because?mQuitting?was?previously?false.nativeWake(mPtr);}}
主要做了幾個事情:第一,設(shè)置標(biāo)記mQuitting為true,以表明HandlerThread要退出了;第二,根據(jù)傳入的參數(shù)safe,刪除隊列里面適當(dāng)類型的消息;第三,調(diào)用nativeWake(mPtr),將HandlerThread線程喚醒。我們都知道,Java里面是沒有辦法直接終止另外一個線程的,同時強制中止一個線程也是很不安全的,所以,這里也是只設(shè)置一個標(biāo)記,然后在MessageQueue.next()中獲取消息時檢查此標(biāo)記,以在適當(dāng)?shù)臅r候退出。MessageQueue.next()里中止消息處理循環(huán)的,主要是下面這幾行:
????????????????//?Process?the?quit?message?now?that?all?pending?messages?have?been?handled.if?(mQuitting)?{dispose();return?null;}
然后在 MessageQueue.dispose()方法中完成最終的退出,及資源清理的工作:
????private?void?dispose()?{if?(mPtr?!=?0)?{nativeDestroy(mPtr);mPtr?=?0;}}
android中消息隊列機制,大體如此。
Done。
轉(zhuǎn)載于:https://my.oschina.net/wolfcs/blog/160601
總結(jié)
以上是生活随笔為你收集整理的android的消息队列机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 随机过程及其在金融领域中的应用 第三章
- 下一篇: 混乱开发,既伤身体又伤感情