戏(细)说Executor框架线程池任务执行全过程(上)
原文鏈接 ??歸檔下發(fā)表于infoq.com 2015年6月的兩篇文章。
內(nèi)容綜述
基于Executor接口中將任務(wù)提交和任務(wù)執(zhí)行解耦的設(shè)計(jì),ExecutorService和其各種功能強(qiáng)大的實(shí)現(xiàn)類提供了非常簡(jiǎn)便方式來(lái)提交任務(wù)并獲取任務(wù)執(zhí)行結(jié)果,封裝了任務(wù)執(zhí)行的全部過(guò)程。本文嘗試通過(guò)對(duì)j.u.c.下該部分源碼的解析以ThreadPoolExecutor為例來(lái)追蹤任務(wù)提交、執(zhí)行、獲取執(zhí)行結(jié)果的整個(gè)過(guò)程。為了避免陷入枯燥的源碼解釋,將該過(guò)程和過(guò)程中涉及的角色與我們工作中的場(chǎng)景和場(chǎng)景中涉及的角色進(jìn)行映射,力圖生動(dòng)和深入淺出。
一、前言
1.5后引入的Executor框架的最大優(yōu)點(diǎn)是把任務(wù)的提交和執(zhí)行解耦。要執(zhí)行任務(wù)的人只需把Task描述清楚,然后提交即可。這個(gè)Task是怎么被執(zhí)行的,被誰(shuí)執(zhí)行的,什么時(shí)候執(zhí)行的,提交的人就不用關(guān)心了。具體點(diǎn)講,提交一個(gè)Callable對(duì)象給ExecutorService(如最常用的線程池ThreadPoolExecutor),將得到一個(gè)Future對(duì)象,調(diào)用Future對(duì)象的get方法等待執(zhí)行結(jié)果就好了。
經(jīng)過(guò)這樣的封裝,對(duì)于使用者來(lái)說(shuō),提交任務(wù)獲取結(jié)果的過(guò)程大大簡(jiǎn)化,調(diào)用者直接從提交的地方就可以等待獲取執(zhí)行結(jié)果。而封裝最大的效果是使得真正執(zhí)行任務(wù)的線程們變得不為人知。有沒(méi)有覺(jué)得這個(gè)場(chǎng)景似曾相識(shí)?我們工作中當(dāng)老大的老大(且稱作LD^2)把一個(gè)任務(wù)交給我們老大(LD)的時(shí)候,到底是LD自己干,還是轉(zhuǎn)過(guò)身來(lái)拉來(lái)一幫苦逼的兄弟加班加點(diǎn)干,那LD^2是不管的。LD^2只用把人描述清楚提及給LD,然后喝著咖啡等著收LD的report即可。等LD一封郵件非常優(yōu)雅地報(bào)告LD^2report結(jié)果時(shí),實(shí)際操作中是碼農(nóng)A和碼農(nóng)B干了一個(gè)月,還是碼農(nóng)ABCDE加班干了一個(gè)禮拜,大多是不用體現(xiàn)的。這套機(jī)制的優(yōu)點(diǎn)就是LD^2找個(gè)合適的LD出來(lái)提交任務(wù)即可,接口友好有效,不用為具體怎么干費(fèi)神費(fèi)力。
二、 一個(gè)最簡(jiǎn)單的例子
看上去這個(gè)執(zhí)行過(guò)程是這個(gè)樣子。調(diào)用這段代碼的是老大的老大了,他所需要干的所有事情就是找到一個(gè)合適的老大(如下面例子中l(wèi)aodaA就榮幸地被選中了),提交任務(wù)就好了。
使用上非常簡(jiǎn)單,其實(shí)只有兩行語(yǔ)句來(lái)完成所有功能:創(chuàng)建一個(gè)線程池,提交任務(wù)并等待獲取執(zhí)行結(jié)果。
例子中生成線程池采用了工具類Executors的靜態(tài)方法。除了newFixedThreadPool可以生成固定大小的線程池,newCachedThreadPool可以生成一個(gè)無(wú)界、可以自動(dòng)回收的線程池,newSingleThreadScheduledExecutor可以生成一個(gè)單個(gè)線程的線程池。newScheduledThreadPool還可以生成支持周期任務(wù)的線程池。一般用戶場(chǎng)景下各種不同設(shè)置要求的線程池都可以這樣生成,不用自己new一個(gè)線程池出來(lái)。
三、代碼剖析
這套機(jī)制怎么用,上面兩句語(yǔ)句就做到了,非常方便和友好。但是submit的task是怎么被執(zhí)行的?是誰(shuí)執(zhí)行的?如何做到在調(diào)用的時(shí)候只有等待執(zhí)行結(jié)束才能get到結(jié)果。這些都是1.5之后Executor接口下的線程池、Future接口下的可獲得執(zhí)行結(jié)果的的任務(wù),配合AQS和原有的Runnable來(lái)做到的。在下文中我們嘗試通過(guò)剖析每部分的代碼來(lái)了解Task提交,Task執(zhí)行,獲取Task執(zhí)行結(jié)果等幾個(gè)主要步驟。為了控制篇幅,突出主要邏輯,文章中引用的代碼片段去掉了異常捕獲、非主要條件判斷、非主要操作。文中只是以最常用的ThreadPoolExecutor線程池舉例,其實(shí)ExecutorService接口下定義了很多功能豐富的其他類型,有各自的特點(diǎn),但風(fēng)格類似。本文重點(diǎn)是介紹任務(wù)提交的過(guò)程,過(guò)程中涉及的ExecutorService、ThreadPoolExecutor、AQS、Future、FutureTask等只會(huì)介紹該過(guò)程中用到的內(nèi)容,不會(huì)對(duì)每個(gè)類都詳細(xì)展開。
1、 任務(wù)提交
從類圖上可以看到,接口ExecutorService繼承自Executor。不像Executor中只定義了一個(gè)方法來(lái)執(zhí)行任務(wù),在ExecutorService中,正如其名字暗示的一樣,定義了一個(gè)服務(wù),定義了完整的線程池的行為,可以接受提交任務(wù)、執(zhí)行任務(wù)、關(guān)閉服務(wù)。抽象類AbstractExecutorService類實(shí)現(xiàn)了ExecutorService接口,也實(shí)現(xiàn)了接口定義的默認(rèn)行為。
?
AbstractExecutorService任務(wù)提交的submit方法有三個(gè)實(shí)現(xiàn)。第一個(gè)接收一個(gè)Runnable的Task,沒(méi)有執(zhí)行結(jié)果;第二個(gè)是兩個(gè)參數(shù):一個(gè)任務(wù),一個(gè)執(zhí)行結(jié)果;第三個(gè)一個(gè)Callable,本身就包含執(zhí)任務(wù)內(nèi)容和執(zhí)行結(jié)果。 submit方法的返回結(jié)果是Future類型,調(diào)用該接口定義的get方法即可獲得執(zhí)行結(jié)果。?V get() 方法的返回值類型V是在提交任務(wù)時(shí)就約定好了的。
除了submit任務(wù)的方法外,作為對(duì)服務(wù)的管理,在ExecutorService接口中還定義了服務(wù)的關(guān)閉方法shutdown和shutdownNow方法,可以平緩或者立即關(guān)閉執(zhí)行服務(wù),實(shí)現(xiàn)該方法的子類根據(jù)自身特征支持該定義。在ThreadPoolExecutor中,維護(hù)了RUNNING、SHUTDOWN、STOP、TERMINATED四種狀態(tài)來(lái)實(shí)現(xiàn)對(duì)線程池的管理。線程池的完整運(yùn)行機(jī)制不是本文的重點(diǎn),重點(diǎn)還是關(guān)注submit過(guò)程中的邏輯。
1) 看AbstractExecutorService中代碼提交部分,構(gòu)造好一個(gè)FutureTask對(duì)象后,調(diào)用execute()方法執(zhí)行任務(wù)。我們知道這個(gè)方法是頂級(jí)接口Executor中定義的最重要的方法。。FutureTask類型實(shí)現(xiàn)了Runnable接口,因此滿足Executor中execute()方法的約定。同時(shí)比較有意思的是,該對(duì)象在execute執(zhí)行后,就又作為submit方法的返回值返回,因?yàn)镕utureTask同時(shí)又實(shí)現(xiàn)了Future接口,滿足Future接口的約定。
2) Submit傳入的參數(shù)都被封裝成了FutureTask類型來(lái)execute的,對(duì)應(yīng)前面三個(gè)不同的參數(shù)類型都會(huì)封裝成FutureTask。
?
3) Executor接口中定義的execute方法的作用就是執(zhí)行提交的任務(wù),該方法在抽象類AbstractExecutorService中沒(méi)有實(shí)現(xiàn),留到子類中實(shí)現(xiàn)。我們觀察下子類ThreadPoolExecutor,使用最廣泛的線程池如何來(lái)execute那些submit的任務(wù)的。這個(gè)方法看著比較簡(jiǎn)單,但是線程池什么時(shí)候創(chuàng)建新的作業(yè)線程來(lái)處理任務(wù),什么時(shí)候只接收任務(wù)不創(chuàng)建作業(yè)線程,另外什么時(shí)候拒絕任務(wù)。線程池的接收任務(wù)、維護(hù)工作線程的策略都要在其中體現(xiàn)。
作為必要的預(yù)備知識(shí),先補(bǔ)充下ThreadPoolExecutor有兩個(gè)最重要的集合屬性,分別是存儲(chǔ)接收任務(wù)的任務(wù)隊(duì)列和用來(lái)干活的作業(yè)集合。
?
其中阻塞隊(duì)列workQueue是來(lái)存儲(chǔ)待執(zhí)行的任務(wù)的,在構(gòu)造線程池時(shí)可以選擇滿足該BlockingQueue 接口定義的SynchronousQueue、LinkedBlockingQueue或者DelayedWorkQueue等不同阻塞隊(duì)列來(lái)實(shí)現(xiàn)不同特征的線程池。
關(guān)注下execute(Runnable command)方法中調(diào)用到的addIfUnderCorePoolSize,workQueue.offer(command) , ensureQueuedTaskHandled(command),addIfUnderMaximumPoolSize(command)這幾個(gè)操作。尤其幾個(gè)名字較長(zhǎng)的private方法,把方法名的駝峰式的單詞分開,加上對(duì)方法上下文的了解就能理解其功能。
因?yàn)榍懊嬲f(shuō)到的幾個(gè)方法在里面即是操作,又返回一個(gè)布爾值,影響后面的邏輯,所以不大方便在方法體中為每條語(yǔ)句加注釋來(lái)說(shuō)明,需要大致關(guān)聯(lián)起來(lái)看。所以首先需要把execute方法的主要邏輯說(shuō)明下,再看其中各自方法的作用。
- 如果線程池的狀態(tài)是RUNNING,線程池的大小小于配置的核心線程數(shù),說(shuō)明還可以創(chuàng)建新線程,則啟動(dòng)新的線程執(zhí)行這個(gè)任務(wù)。
- 如果線程池的狀態(tài)是RUNNING ,線程池的大小小于配置的最大線程數(shù),并且任務(wù)隊(duì)列已經(jīng)滿了,說(shuō)明現(xiàn)有線程已經(jīng)不能支持當(dāng)前的任務(wù)了,并且線程池還有繼續(xù)擴(kuò)充的空間,就可以創(chuàng)建一個(gè)新的線程來(lái)處理提交的任務(wù)。
- 如果線程池的狀態(tài)是RUNNING,當(dāng)前線程池的大小大于等于配置的核心線程數(shù),說(shuō)明根據(jù)配置當(dāng)前的線程數(shù)已經(jīng)夠用,不用創(chuàng)建新線程,只需把任務(wù)加入任務(wù)隊(duì)列即可。如果任務(wù)隊(duì)列不滿,則提交的任務(wù)在任務(wù)隊(duì)列中等待處理;如果任務(wù)隊(duì)列滿了則需要考慮是否要擴(kuò)展線程池的容量。
- 當(dāng)線程池已經(jīng)關(guān)閉或者上面的條件都不能滿足時(shí),則進(jìn)行拒絕策略,拒絕策略在RejectedExecutionHandler接口中定義,可以有多種不同的實(shí)現(xiàn)。
上面其實(shí)也是對(duì)最主要思路的解析,詳細(xì)展開可能還會(huì)更復(fù)雜。簡(jiǎn)單梳理下思路:構(gòu)建線程池時(shí)定義了一個(gè)額定大小,當(dāng)線程池內(nèi)工作線程數(shù)小于額定大小,有新任務(wù)進(jìn)來(lái)就創(chuàng)建新工作線程,如果超過(guò)該閾值,則一般就不創(chuàng)建了,只是把接收任務(wù)加到任務(wù)隊(duì)列里面。但是如果任務(wù)隊(duì)列里的任務(wù)實(shí)在太多了,那還是要申請(qǐng)額外的工作線程來(lái)幫忙。如果還是不夠用就拒絕服務(wù)。這個(gè)場(chǎng)景其實(shí)也是每天我們工作中會(huì)碰到的場(chǎng)景。我們管人的老大,手里都有一定HC(Head Count),當(dāng)上面老大有活分下來(lái),手里人不夠,但是不超過(guò)HC,我們就自己招人;如果超過(guò)了還是忙不過(guò)來(lái),那就向上門老大申請(qǐng)借調(diào)人手來(lái)幫忙;如果還是干不完,那就沒(méi)辦法了,新任務(wù)咱就不接了。
4) addIfUnderCorePoolSize方法檢查如果當(dāng)前線程池的大小小于配置的核心線程數(shù),說(shuō)明還可以創(chuàng)建新線程,則啟動(dòng)新的線程執(zhí)行這個(gè)任務(wù)。
5)? 和上一個(gè)方法類似,addIfUnderMaximumPoolSize檢查如果線程池的大小小于配置的最大線程數(shù),并且任務(wù)隊(duì)列已經(jīng)滿了(就是execute方法試圖把當(dāng)前線程加入任務(wù)隊(duì)列時(shí)不成功),說(shuō)明現(xiàn)有線程已經(jīng)不能支持當(dāng)前的任務(wù)了,但線程池還有繼續(xù)擴(kuò)充的空間,就可以創(chuàng)建一個(gè)新的線程來(lái)處理提交的任務(wù)。
6)? 在ensureQueuedTaskHandled方法中,判斷如果當(dāng)前狀態(tài)不是RUNING,則當(dāng)前任務(wù)不加入到任務(wù)隊(duì)列中,判斷如果狀態(tài)是停止,線程數(shù)小于允許的最大數(shù),且任務(wù)隊(duì)列還不空,則加入一個(gè)新的工作線程到線程池來(lái)幫助處理還未處理完的任務(wù)。
7)?? 在前面方法中都會(huì)調(diào)用adThread方法創(chuàng)建一個(gè)工作線程,差別是創(chuàng)建的有些工作線程上面關(guān)聯(lián)接收到的任務(wù)firstTask,有些沒(méi)有。該方法為當(dāng)前接收到的任務(wù)firstTask創(chuàng)建Worker,并將Worker添加到作業(yè)集合HashSet<Worker> workers中,并啟動(dòng)作業(yè)。
至此,任務(wù)提交過(guò)程簡(jiǎn)單描述完畢,并介紹了任務(wù)提交后ExecutorService框架下線程池的主要應(yīng)對(duì)邏輯,其實(shí)就是接收任務(wù),根據(jù)需要?jiǎng)?chuàng)建或者維護(hù)管理線程。
維護(hù)這些工作線程干什么用?先不用看后面的代碼,想想我們老大每月辛苦地把老板豐厚的薪水遞到我們手里,定期還要領(lǐng)著大家出去happy下,又是定期的關(guān)心下個(gè)人生活,所有做的這些都是為什么呢?木訥的代碼工不往這邊使勁動(dòng)腦子,但是猜還是能猜的到的,就讓干活唄。本文想著重表達(dá)細(xì)節(jié),諸如線程池里的Worker是怎么工作的,Task到底是不是在這些工作線程中執(zhí)行的,如何保證執(zhí)行完成后,外面等待任務(wù)的老大拿到想要結(jié)果,我們將在下篇文章中詳細(xì)介紹。?
總結(jié)
以上是生活随笔為你收集整理的戏(细)说Executor框架线程池任务执行全过程(上)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: eclipse使用maven tomca
- 下一篇: Mysql 1030 Got error