2万字,看完这篇才敢说自己真的懂线程池!
前言
線程池可以說是 Java 進階必備的知識點了,也是面試中必備的考點,可能不少人看了一些文章后能對線程池工作原理說上一二,但這還遠(yuǎn)遠(yuǎn)不夠,如果碰到比較有經(jīng)驗的面試官再繼續(xù)追問,很可能會被吊打,考慮如下問題:
Tomcat 的線程池和 JDK 的線程池實現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實現(xiàn)嗎?
我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個問題:壓測時接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊列為 5000,你能從這個設(shè)置中發(fā)現(xiàn)出一些問題并對這些參數(shù)進行調(diào)優(yōu)嗎?
線程池里的線程真的有核心線程和非核心線程之分?
線程池被 shutdown 后,還能產(chǎn)生新的線程?
線程把任務(wù)丟給線程池后肯定就馬上返回了?
線程池里的線程異常后會再次新增線程嗎,如何捕獲這些線程拋出的異常?
線程池的大小如何設(shè)置,如何動態(tài)設(shè)置線程池的參數(shù)
線程池的狀態(tài)機畫一下?
阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?
使用線程池應(yīng)該避免哪些問題,能否簡單說下線程池的最佳實踐?
如何優(yōu)雅關(guān)閉線程池
如何對線程池進行監(jiān)控
相信不少人看了這些問題會有些懵逼
其實這些問題的答案大多數(shù)都藏在線程池的源碼里,所以深入了解線程池的源碼非常重要,本章我們將會來學(xué)習(xí)一下線程池的源碼,相信看完之后,以上的問題大部分都能回答,另外一些問題我們也會在文中與大家一起探討。
本文將會從以下幾個方面來介紹線程池的原理。
為什么要用線程池
線程池是如何工作的
線程池提交任務(wù)的兩種方式
ThreadPoolExecutor 源碼剖析
解答開篇的問題
線程池的最佳實踐
總結(jié)
相信大家看完對線程池的理解會更進一步,肝文不易,看完別完了三連哦。
為什么要用線程池
創(chuàng)建線程有三大開銷,如下:
1、其實 Java 中的線程模型是基于操作系統(tǒng)原生線程模型實現(xiàn)的,也就是說 Java 中的線程其實是基于內(nèi)核線程實現(xiàn)的,線程的創(chuàng)建,析構(gòu)與同步都需要進行系統(tǒng)調(diào)用,而系統(tǒng)調(diào)用需要在用戶態(tài)與內(nèi)核中來回切換,代價相對較高,線程的生命周期包括「線程創(chuàng)建時間」,「線程執(zhí)行任務(wù)時間」,「線程銷毀時間」,創(chuàng)建和銷毀都需要導(dǎo)致系統(tǒng)調(diào)用。2、每個 Thread 都需要有一個內(nèi)核線程的支持,也就意味著每個 Thread 都需要消耗一定的內(nèi)核資源(如內(nèi)核線程的??臻g),因此能創(chuàng)建的 Thread 是有限的,默認(rèn)一個線程的線程棧大小是 1 M,有圖有真相
圖中所示,在 Java 8 下,創(chuàng)建 19 個線程(thread #19)需要創(chuàng)建 19535 KB,即 1 M 左右,reserved 代表如果創(chuàng)建 19 個線程,操作系統(tǒng)保證會為其分配這么多空間(實際上并不一定分配),committed 則表示實際已分配的空間大小。
畫外音:注意,這是在 Java 8 下的線程占用空間情況,但在 Java 11 中,對線程作了很大的優(yōu)化,創(chuàng)建一個線程大概只需要 40 KB,空間消耗大大減少
3、線程多了,導(dǎo)致不可忽視的上下文切換開銷。
由此可見,線程的創(chuàng)建是昂貴的,所以必須以線程池的形式來管理這些線程,在線程池中合理設(shè)置線程大小和管理線程,以達(dá)到以合理的創(chuàng)建線程大小以達(dá)到最大化收益,最小化風(fēng)險的目的,對于開發(fā)人員來說,要完成任務(wù)不用關(guān)心線程如何創(chuàng)建,如何銷毀,如何協(xié)作,只需要關(guān)心提交的任務(wù)何時完成即可,對線程的調(diào)優(yōu),監(jiān)控等這些細(xì)枝末節(jié)的工作通通交給線程池來實現(xiàn),所以也讓開發(fā)人員得到極大的解脫!
類似線程池的這種池化思想應(yīng)用在很多地方,比如數(shù)據(jù)庫連接池,Http 連接池等,避免了昂貴資源的創(chuàng)建,提升了性能,也解放了開發(fā)人員。
ThreadPoolExecutor 設(shè)計架構(gòu)圖
首先我們來看看 Executor 框架的設(shè)計圖
Executor: 最頂層的 Executor 接口只提供了一個 execute 接口,實現(xiàn)了提交任務(wù)與執(zhí)行任務(wù)的解藕,這個方法是最核心的,也是我們源碼剖析的重點,此方法最終是由 ThreadPoolExecutor 實現(xiàn)的,
ExecutorService 擴展了 Executor 接口,實現(xiàn)了終止執(zhí)行器,單個/批量提交任務(wù)等方法
AbstractExecutorService 實現(xiàn)了 ExecutorService 接口,實現(xiàn)了除 execute 以外的所有方法,只將一個最重要的 execute 方法交給 ThreadPoolExecutor 實現(xiàn)。
這樣的分層設(shè)計雖然層次看起來挺多,但每一層每司其職,邏輯清晰,值得借鑒。
線程池是如何工作的
首先我們來看下如何創(chuàng)建一個線程池
ThreadPoolExecutor?threadPool?=?new?ThreadPoolExecutor(10,?20,?600L,TimeUnit.SECONDS,?new?LinkedBlockingQueue<>(4096),new?NamedThreadFactory("common-work-thread")); //?設(shè)置拒絕策略,默認(rèn)為?AbortPolicy threadPool.setRejectedExecutionHandler(new?ThreadPoolExecutor.AbortPolicy());看下其構(gòu)造方法簽名如下
public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler)?{//?省略代碼若干 }要理解這些參數(shù)具體代表的意義,必須清楚線程池提交任務(wù)與執(zhí)行任務(wù)流程,如下
圖片來自美團技術(shù)團隊
步驟如下
1、corePoolSize:如果提交任務(wù)后線程還在運行,當(dāng)線程數(shù)小于 corePoolSize 值時,無論線程池中的線程是否忙碌,都會創(chuàng)建線程,并把任務(wù)交給此新創(chuàng)建的線程進行處理,如果線程數(shù)少于等于 corePoolSize,那么這些線程不會回收,除非將 allowCoreThreadTimeOut 設(shè)置為 true,但一般不這么干,因為頻繁地創(chuàng)建銷毀線程會極大地增加系統(tǒng)調(diào)用的開銷。
2、workQueue:如果線程數(shù)大于核心數(shù)(corePoolSize)且小于最大線程數(shù)(maximumPoolSize),則會將任務(wù)先丟到阻塞隊列里,然后線程自己去阻塞隊列中拉取任務(wù)執(zhí)行。
3、maximumPoolSize: 線程池中最大可創(chuàng)建的線程數(shù),如果提交任務(wù)時隊列滿了且線程數(shù)未到達(dá)這個設(shè)定值,則會創(chuàng)建線程并執(zhí)行此次提交的任務(wù),如果提交任務(wù)時隊列滿了但線池數(shù)已經(jīng)到達(dá)了這個值,此時說明已經(jīng)超出了線池程的負(fù)載能力,就會執(zhí)行拒絕策略,這也好理解,總不能讓源源不斷地任務(wù)進來把線程池給壓垮了吧,我們首先要保證線程池能正常工作。
4、RejectedExecutionHandler:一共有以下四種拒絕策略
AbortPolicy:丟棄任務(wù)并拋出異常,這也是默認(rèn)策略;
CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù),所以開頭的問題「線程把任務(wù)丟給線程池后肯定就馬上返回了?」我們可以回答了,如果用的是 CallerRunsPolicy 策略,提交任務(wù)的線程(比如主線程)提交任務(wù)后并不能保證馬上就返回,當(dāng)觸發(fā)了這個 reject 策略不得不親自來處理這個任務(wù)。
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy:直接丟棄任務(wù),不拋出任何異常,這種策略只適用于不重要的任務(wù)。
5、keepAliveTime: 線程存活時間,如果在此時間內(nèi)超出 corePoolSize 大小的線程處于 idle 狀態(tài),這些線程會被回收
6、threadFactory:可以用此參數(shù)設(shè)置線程池的命名,指定 defaultUncaughtExceptionHandler(有啥用,后文闡述),甚至可以設(shè)定線程為守護線程。
現(xiàn)在問題來了,該如何合理設(shè)置這些參數(shù)呢。
首先來看線程大小設(shè)置
<<Java 并發(fā)編程實戰(zhàn)>>告訴我們應(yīng)該分兩種情況
針對 CPU 密集型的任務(wù),在有 Ncpu個處理器的系統(tǒng)上,當(dāng)線程池的大小為 Ncpu + 1 時,通常能實現(xiàn)最優(yōu)的利用率,+1 是因為當(dāng)計算密集型線程偶爾由于缺頁故障或其他原因而暫停工作時,這個"額外"的線程也能確保 CPU 的時鐘周期不會被浪費,所謂 CPU 密集,就是線程一直在忙碌,這樣將線程池的大小設(shè)置為 Ncpu + 1 避免了線程的上下文切換,讓線程時刻處于忙碌狀態(tài),將 CPU 的利用率最大化。
針對 IO 密集型的任務(wù),它也給出了如下計算公式
這些公式看看就好,實際的業(yè)務(wù)場景中基本用不上,這些公式太過理論化了,脫離業(yè)務(wù)場景,僅可作個理論參考,舉個例子,你說 CPU 密集型任務(wù)設(shè)置線程池大小為 N + 1個,但實際上在業(yè)務(wù)中往往不只設(shè)置一個線程池,這種情況套用的公式就懵逼了
再來看 workQueue 的大小設(shè)置
由上文可知,如果最大線程大于核心線程數(shù),當(dāng)且僅當(dāng)核心線程滿了且 workQueue 也滿的情況下,才會新增新的線程,也就是說如果 workQueue 是無界隊列,那么當(dāng)線程數(shù)增加到 corePoolSize 后,永遠(yuǎn)不會再新增新的線程了,也就是說此時 maximumPoolSize 的設(shè)置就無效了,也無法觸發(fā) RejectedExecutionHandler 拒絕策略,任務(wù)只會源源不斷地填充到 workQueue,直到 OOM。
所以 workQueue 應(yīng)該為有界隊列,至少保證在任務(wù)過載的情況下線程池還能正常工作,那么哪些是有有界隊列,哪些是無界隊列呢。
有界隊列我們常用的以下兩個
LinkedBlockingQueue: 鏈表構(gòu)成的有界隊列,按先進先出(FIFO)的順序?qū)υ剡M行排列,但注意在創(chuàng)建時需指定其大小,否則其大小默認(rèn)為 Integer.MAX_VALUE,相當(dāng)于無界隊列了
ArrayBlockingQueue: 數(shù)組實現(xiàn)的有界隊列,按先進先出(FIFO)的順序?qū)υ剡M行排列。
無界隊列我們常用 PriorityBlockingQueue 這個優(yōu)先級隊列,任務(wù)插入的時候可以指定其權(quán)重以讓這些任務(wù)優(yōu)先執(zhí)行,但這個隊列很少用,原因很簡單,線程池里的任務(wù)執(zhí)行順序一般是平等的,如果真有必須某些類型的任務(wù)需要優(yōu)先執(zhí)行,大不了再開個線程池好了,將不同的任務(wù)類型用不同的線程池隔離開來,也是合理利用線程池的一種實踐。
說到這我相信大家應(yīng)該能回答開頭的問題「阿里 Java 代碼規(guī)范為什么不允許使用 Executors 快速創(chuàng)建線程池?」,最常見的是以下兩種創(chuàng)建方式
image-20201109002227476newCachedThreadPool 方法的最大線程數(shù)設(shè)置成了 Integer.MAX_VALUE,而 newSingleThreadExecutor 方法創(chuàng)建 workQueue 時 LinkedBlockingQueue 未聲明大小,相當(dāng)于創(chuàng)建了無界隊列,一不小心就會導(dǎo)致 OOM。
threadFactory 如何設(shè)置
一般業(yè)務(wù)中會有多個線程池,如果某個線程池出現(xiàn)了問題,定位是哪一個線程出問題很重要,所以為每個線程池取一個名字就很有必要了,我司用的 dubbo 的 NamedThreadFactory 來生成 threadFactory,創(chuàng)建很簡單
new?NamedThreadFactory("demo-work")它的實現(xiàn)還是很巧妙的,有興趣地可以看看它的源碼,每調(diào)用一次,底層有個計數(shù)器會加一,會依次命名為 「demo-work-thread-1」, 「demo-work-thread-2」, 「demo-work-thread-3」這樣遞增的字符串。
在實際的業(yè)務(wù)場景中,一般很難確定 corePoolSize, workQueue,maximumPoolSize 的大小,如果出問題了,一般來說只能重新設(shè)置一下這些參數(shù)再發(fā)布,這樣往往需要耗費一些時間,美團的這篇文章給出了讓人眼前一亮的解決方案,當(dāng)發(fā)現(xiàn)問題(線程池監(jiān)控告警)時,動態(tài)調(diào)整這些參數(shù),可以讓這些參數(shù)實時生效,能在發(fā)現(xiàn)問題時及時解決,確實是個很好的思路。
線程池提交任務(wù)的兩種方式
線程池創(chuàng)建好了,該怎么給它提交任務(wù),有兩種方式,調(diào)用 execute 和 submit 方法,來看下這兩個方法的方法簽名
//?方式一:execute 方法 public?void?execute(Runnable?command)?{ }//?方式二:ExecutorService 中 submit 的三個方法 <T>?Future<T>?submit(Callable<T>?task); <T>?Future<T>?submit(Runnable?task,?T?result); Future<?>?submit(Runnable?task);區(qū)別在于調(diào)用 execute 無返回值,而調(diào)用 ?submit 可以返回 Future,那么這個 Future 能到底能干啥呢,看它的接口
public?interface?Future<V>?{/***?取消正在執(zhí)行的任務(wù),如果任務(wù)已執(zhí)行或已被取消,或者由于某些原因不能取消則返回?false*?如果任務(wù)未開始或者任務(wù)已開始但可以中斷(mayInterruptIfRunning?為?true),則*?可以取消/中斷此任務(wù)*/boolean?cancel(boolean?mayInterruptIfRunning);/***?任務(wù)在完成前是否已被取消*/boolean?isCancelled();/***?正常的執(zhí)行完流程流程,或拋出異常,或取消導(dǎo)致的任務(wù)完成都會返回?true*/boolean?isDone();/***?阻塞等待任務(wù)的執(zhí)行結(jié)果*/V?get()?throws?InterruptedException,?ExecutionException;/***?阻塞等待任務(wù)的執(zhí)行結(jié)果,不過這里指定了時間,如果在?timeout?時間內(nèi)任務(wù)還未執(zhí)行完成,*?則拋出?TimeoutException?異常*/V?get(long?timeout,?TimeUnit?unit)throws?InterruptedException,?ExecutionException,?TimeoutException; }可以用 Future 取消任務(wù),判斷任務(wù)是否已取消/完成,甚至可以阻塞等待結(jié)果。
submit 為啥能提交任務(wù)(Runnable)的同時也能返回任務(wù)(Future)的執(zhí)行結(jié)果呢
原來在最后執(zhí)行 execute 前用 newTaskFor 將 task 封裝成了 RunnableFuture,newTaskFor 返回了 FutureTask 這個類,結(jié)構(gòu)圖如下
可以看到 FutureTask 這個接口既實現(xiàn)了 Runnable 接口,也實現(xiàn) Future 接口,所以在提交任務(wù)的同時也能利用 Future 接口來執(zhí)行任務(wù)的取消,獲取任務(wù)的狀態(tài),等待執(zhí)行結(jié)果這些操作。
execute 與 submit 除了是否能返回執(zhí)行結(jié)果這一區(qū)別外,還有一個重要區(qū)別,那就是使用 execute 執(zhí)行如果發(fā)生了異常,是捕獲不到的,默認(rèn)會執(zhí)行 ThreadGroup 的 uncaughtException 方法(下圖數(shù)字 2 對應(yīng)的邏輯)
所以如果你想監(jiān)控執(zhí)行 execute 方法時發(fā)生的異常,需要通過 threadFactory 來指定一個 UncaughtExceptionHandler,這樣就會執(zhí)行上圖中的 1,進而執(zhí)行 UncaughtExceptionHandler 中的邏輯,如下所示:
//1.實現(xiàn)一個自己的線程池工廠 ThreadFactory?factory?=?(Runnable?r)?->?{//創(chuàng)建一個線程Thread?t?=?new?Thread(r);//給創(chuàng)建的線程設(shè)置UncaughtExceptionHandler對象?里面實現(xiàn)異常的默認(rèn)邏輯t.setDefaultUncaughtExceptionHandler((Thread?thread1,?Throwable?e)?->?{//?在此設(shè)置統(tǒng)計監(jiān)控邏輯System.out.println("線程工廠設(shè)置的exceptionHandler"?+?e.getMessage());});return?t; };//?2.創(chuàng)建一個自己定義的線程池,使用自己定義的線程工廠 ExecutorService?service?=?new?ThreadPoolExecutor(1,?1,?0,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue(10),factory);//3.提交任務(wù) service.execute(()->{int?i=1/0; });執(zhí)行以上邏輯最終會輸出「線程工廠設(shè)置的exceptionHandler/ by zero」,通過這樣的方式就能通過設(shè)定的 defaultUncaughtExceptionHandler 來執(zhí)行我們的監(jiān)控邏輯了。
如果用 submit ,如何捕獲異常呢,當(dāng)我們調(diào)用 future.get 就可以捕獲
Callable?testCallable?=?xxx; Future?future?=?executor.submit(myCallable); try?{future1.get(3)); }?catch?(InterruptedException?e)?{e.printStackTrace(); }?catch?(ExecutionException?e)?{e.printStackTrace(); }那么 future 為啥在 get 的時候才捕獲異步呢,因為在執(zhí)行 submit 時拋出異常后此異常被保存了起來,而在 get 的時候才被拋出
關(guān)于 execute 和 submit 的執(zhí)行流程 why 神的這篇文章寫得非常透徹,我就不拾人牙慧了,建議大家好好品品,收獲會很大!
ThreadPoolExecutor 源碼剖析
前面鋪墊了這么多,終于到了最核心的源碼剖析環(huán)節(jié)了。
對于線程池來說,我們最關(guān)心的是它的「狀態(tài)」和「可運行的線程數(shù)量」,一般來說我們可以選擇用兩個變量來記錄,不過 Doug Lea 只用了一個變量(ctl)就達(dá)成目的了,我們知道變量越多,代碼的可維護性就越差,也越容易出 bug, 所以只用一個變量就達(dá)成了兩個變量的效果,這讓代碼的可維護性大大提高,那么他是怎么設(shè)計的呢
//?ThreadPoolExecutor.java public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{private?final?AtomicInteger?ctl?=?new?AtomicInteger(ctlOf(RUNNING,?0));private?static?final?int?COUNT_BITS?=?Integer.SIZE?-?3;private?static?final?int?CAPACITY???=?(1?<<?COUNT_BITS)?-?1;//?結(jié)果:111 00000000000000000000000000000private?static?final?int?RUNNING????=?-1?<<?COUNT_BITS;//?結(jié)果:?000?00000000000000000000000000000private?static?final?int?SHUTDOWN???=??0?<<?COUNT_BITS;//?結(jié)果:?001 00000000000000000000000000000private?static?final?int?STOP???????=??1?<<?COUNT_BITS;//?結(jié)果:?010?00000000000000000000000000000private?static?final?int?TIDYING????=??2?<<?COUNT_BITS;//?結(jié)果:?011 00000000000000000000000000000private?static?final?int?TERMINATED?=??3?<<?COUNT_BITS;//?獲取線程池的狀態(tài)private?static?int?runStateOf(int?c)?????{?return?c?&?~CAPACITY;?}//?獲取線程數(shù)量private?static?int?workerCountOf(int?c)??{?return?c?&?CAPACITY;?} }可以看到,ctl 是一個 原子類的 Integer 變量,有 32 位,低 29 位表示線程數(shù)量, 29 位最大可以表示 (2^29)-1 (大概 5 億多),足夠記錄線程大小了,如果未來還是不夠,可以把 ctl 聲明為 AtomicLong,高 3 位用來表示線程池的狀態(tài),3 位可以表示 8 個線程池的狀態(tài),由于線程池總共只有五個狀態(tài),所以 3 位也是足夠了,線程池的五個狀態(tài)如下
RUNNING: 接收新的任務(wù),并能繼續(xù)處理 workQueue 中的任務(wù)
SHUTDOWN: 不再接收新的任務(wù),不過能繼續(xù)處理 workQueue 中的任務(wù)
STOP: 不再接收新的任務(wù),也不再處理 workQueue 中的任務(wù),并且會中斷正在處理任務(wù)的線程
TIDYING: 所有的任務(wù)都完結(jié)了,并且線程數(shù)量(workCount)為 0 時即為此狀態(tài),進入此狀態(tài)后會調(diào)用 terminated() 這個鉤子方法進入 TERMINATED 狀態(tài)
TERMINATED: 調(diào)用 terminated() 方法后即為此狀態(tài)
線程池的狀態(tài)流轉(zhuǎn)及觸發(fā)條件如下
有了這些基礎(chǔ),我們來分析下 execute 的源碼
public?void?execute(Runnable?command)?{if?(command?==?null)throw?new?NullPointerException();int?c?=?ctl.get();//?如果當(dāng)前線程數(shù)少于核心線程數(shù)(corePoolSize),無論核心線程是否忙碌,都創(chuàng)建線程,直到達(dá)到?corePoolSize?為止if?(workerCountOf(c)?<?corePoolSize)?{//?創(chuàng)建線程并將此任務(wù)交給?worker?處理(此時此任務(wù)即?worker?中的?firstTask)if?(addWorker(command,?true))return;c?=?ctl.get();}//?如果線程池處于?RUNNING?狀態(tài),并且線程數(shù)大于?corePoolSize?或者?//?線程數(shù)少于?corePoolSize?但創(chuàng)建線程失敗了,則將任務(wù)丟進?workQueue?中if?(isRunning(c)?&&?workQueue.offer(command))?{int?recheck?=?ctl.get();//?這里需要再次檢查線程池是否處于?RUNNING?狀態(tài),因為在任務(wù)入隊后可能線程池狀態(tài)會發(fā)生變化,(比如調(diào)用了?shutdown?方法等),如果線程狀態(tài)發(fā)生變化了,則移除此任務(wù),執(zhí)行拒絕策略if?(!?isRunning(recheck)?&&?remove(command))reject(command);//?如果線程池在?RUNNING?狀態(tài)下,線程數(shù)為?0,則新建線程加速處理?workQueue?中的任務(wù)else?if?(workerCountOf(recheck)?==?0)addWorker(null,?false);}//?這段邏輯說明線程數(shù)大于?corePoolSize?且任務(wù)入隊失敗了,此時會以最大線程數(shù)(maximumPoolSize)為界來創(chuàng)建線程,如果失敗,說明線程數(shù)超過了?maximumPoolSize,則執(zhí)行拒絕策略else?if?(!addWorker(command,?false))reject(command); }從這段代碼中可以看到,創(chuàng)建線程是調(diào)用 addWorker 實現(xiàn)的,在分析 addWorker 之前,有必要簡單提一下 Worker,線程池把每一個執(zhí)行任務(wù)的線程都封裝為 Worker 的形式,取名為 Worker 很形象,線程池的本質(zhì)是生產(chǎn)者-消費者模型,生產(chǎn)者不斷地往 workQueue 中丟 task, workQueue 就像流水線一樣不斷地輸送著任務(wù),而 worker(工人) 不斷地取任務(wù)來執(zhí)行
那么問題來了,為啥要把線程封裝到 worker 中呢,線程池拿到 task 后直接丟給線程處理或者讓線程自己去 workQueue 中處理不就完了?將線程封裝為 worker 主要是為了更好地管理線程的中斷
來看下 Worker 的定義
//?此處可以看出?worker?既是一個?Runnable?任務(wù),也實現(xiàn)了?AQS(實際上是用?AQS?實現(xiàn)了一個獨占鎖,這樣由于?worker?運行時會上鎖,執(zhí)行?shutdown,setCorePoolSize,setMaximumPoolSize等方法時會試著中斷線程(interruptIdleWorkers)?,在這個方法中斷方法中會先嘗試獲取?worker?的鎖,如果不成功,說明?worker?在運行中,此時會先讓?worker?執(zhí)行完任務(wù)再關(guān)閉?worker?的線程,實現(xiàn)優(yōu)雅關(guān)閉線程的目的) private?final?class?Workerextends?AbstractQueuedSynchronizerimplements?Runnable{private?static?final?long?serialVersionUID?=?6138294804551838833L;//?實際執(zhí)行任務(wù)的線程final?Thread?thread;//?上文提到,如果當(dāng)前線程數(shù)少于核心線程數(shù),創(chuàng)建線程并將提交的任務(wù)交給?worker?處理處理,此時?firstTask?即為此提交的任務(wù),如果?worker?從?workQueue?中獲取任務(wù),則?firstTask?為空Runnable?firstTask;//?統(tǒng)計完成的任務(wù)數(shù)volatile?long?completedTasks;Worker(Runnable?firstTask)?{//?初始化為?-1,這樣在線程運行前(調(diào)用runWorker)禁止中斷,在?interruptIfStarted()?方法中會判斷?getState()>=0setState(-1);?this.firstTask?=?firstTask;//?根據(jù)線程池的?threadFactory?創(chuàng)建一個線程,將?worker?本身傳給線程(因為?worker?實現(xiàn)了?Runnable?接口)this.thread?=?getThreadFactory().newThread(this);}public?void?run()?{//?thread?啟動后會調(diào)用此方法runWorker(this);}//?1?代表被鎖住了,0?代表未鎖protected?boolean?isHeldExclusively()?{return?getState()?!=?0;}//?嘗試獲取鎖protected?boolean?tryAcquire(int?unused)?{//?從這里可以看出它是一個獨占鎖,因為當(dāng)獲取鎖后,cas?設(shè)置?state?不可能成功,這里我們也能明白上文中將?state?設(shè)置為?-1?的作用,這種情況下永遠(yuǎn)不可能獲取得鎖,而?worker?要被中斷首先必須獲取鎖if?(compareAndSetState(0,?1))?{setExclusiveOwnerThread(Thread.currentThread());return?true;}return?false;}//?嘗試釋放鎖protected?boolean?tryRelease(int?unused)?{setExclusiveOwnerThread(null);setState(0);return?true;}????public?void?lock()????????{?acquire(1);?}public?boolean?tryLock()??{?return?tryAcquire(1);?}public?void?unlock()??????{?release(1);?}public?boolean?isLocked()?{?return?isHeldExclusively();?}//?中斷線程,這個方法會被?shutdowNow?調(diào)用,從中可以看出?shutdownNow?要中斷線程不需要獲取鎖,也就是說如果線程正在運行,照樣會給你中斷掉,所以一般來說我們不用?shutdowNow?來中斷線程,太粗暴了,中斷時線程很可能在執(zhí)行任務(wù),影響任務(wù)執(zhí)行void?interruptIfStarted()?{Thread?t;//?中斷也是有條件的,必須是?state?>=?0?且?t?!=?null?且線程未被中斷//?如果?state?==?-1?,不執(zhí)行中斷,再次明白了為啥上文中?setState(-1)?的意義if?(getState()?>=?0?&&?(t?=?thread)?!=?null?&&?!t.isInterrupted())?{try?{t.interrupt();}?catch?(SecurityException?ignore)?{}}}}通過上文對 Worker 類的分析,相信大家不難理解 將線程封裝為 worker 主要是為了更好地管理線程的中斷 這句話。
理解了 Worker 的意義,我們再來看 addWorker 的方法
private?boolean?addWorker(Runnable?firstTask,?boolean?core)?{retry:for?(;;)?{int?c?=?ctl.get();//?獲取線程池的狀態(tài)int?rs?=?runStateOf(c);//?如果線程池的狀態(tài)?>=?SHUTDOWN,即為?SHUTDOWN,STOP,TIDYING,TERMINATED?這四個狀態(tài),只有一種情況有可能創(chuàng)建線程,即線程狀態(tài)為?SHUTDOWN,?且隊列非空時,firstTask?==?null?代表創(chuàng)建一個不接收新任務(wù)的線程(此線程會從?workQueue?中獲取任務(wù)再執(zhí)行),這種情況下創(chuàng)建線程是為了加速處理完?workQueue?中的任務(wù)if?(rs?>=?SHUTDOWN?&&!?(rs?==?SHUTDOWN?&&firstTask?==?null?&&!?workQueue.isEmpty()))return?false;for?(;;)?{//?獲取線程數(shù)int?wc?=?workerCountOf(c);//?如果超過了線程池的最大?CAPACITY(5?億多,基本不可能)//?或者?超過了?corePoolSize(core?為?true)?或者?maximumPoolSize(core?為?false)?時//?則返回?falseif?(wc?>=?CAPACITY?||wc?>=?(core???corePoolSize?:?maximumPoolSize))return?false;//?否則?CAS?增加線程的數(shù)量,如果成功跳出雙重循環(huán)if?(compareAndIncrementWorkerCount(c))break?retry;c?=?ctl.get();??//?Re-read?ctl//?如果線程運行狀態(tài)發(fā)生變化,跳到外層循環(huán)繼續(xù)執(zhí)行if?(runStateOf(c)?!=?rs)continue?retry;//?說明是因為?CAS?增加線程數(shù)量失敗所致,繼續(xù)執(zhí)行?retry?的內(nèi)層循環(huán)}}boolean?workerStarted?=?false;boolean?workerAdded?=?false;Worker?w?=?null;try?{//?能執(zhí)行到這里,說明滿足增加?worker?的條件了,所以創(chuàng)建?worker,準(zhǔn)備添加進線程池中執(zhí)行任務(wù)w?=?new?Worker(firstTask);final?Thread?t?=?w.thread;if?(t?!=?null)?{//?加鎖,是因為下文要把?w?添加進?workers?中,?workers?是?HashSet,不是線程安全的,所以需要加鎖予以保證final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{//??再次?check?線程池的狀態(tài)以防執(zhí)行到此步時發(fā)生中斷等int?rs?=?runStateOf(ctl.get());//?如果線程池狀態(tài)小于?SHUTDOWN(即為?RUNNING),//?或者狀態(tài)為?SHUTDOWN?但?firstTask?==?null(代表不接收任務(wù),只是創(chuàng)建線程處理?workQueue?中的任務(wù)),則滿足添加?worker?的條件if?(rs?<?SHUTDOWN?||(rs?==?SHUTDOWN?&&?firstTask?==?null))?{//?如果線程已啟動,顯然有問題(因為創(chuàng)建?worker?后,還沒啟動線程呢),拋出異常if?(t.isAlive())?throw?new?IllegalThreadStateException();workers.add(w);int?s?=?workers.size();//?記錄最大的線程池大小以作監(jiān)控之用if?(s?>?largestPoolSize)largestPoolSize?=?s;workerAdded?=?true;}}?finally?{mainLock.unlock();}//?說明往?workers?中添加?worker?成功,此時啟動線程if?(workerAdded)?{t.start();workerStarted?=?true;}}}?finally?{//?添加線程失敗,執(zhí)行?addWorkerFailed?方法,主要做了將?worker?從?workers?中移除,減少線程數(shù),并嘗試著關(guān)閉線程池這樣的操作if?(!?workerStarted)addWorkerFailed(w);}return?workerStarted; }從這段代碼我們可以看到多線程下情況的不可預(yù)料性,我們發(fā)現(xiàn)在滿足條件情況下,又對線程狀態(tài)重新進行了 check,以防期間出現(xiàn)中斷等線程池狀態(tài)發(fā)生變更的操作,這也給我們以啟發(fā):多線程環(huán)境下的各種臨界條件一定要考慮到位。
執(zhí)行 addWorker 創(chuàng)建 worker 成功后,線程開始執(zhí)行了(t.start()),由于在創(chuàng)建 Worker 時,將 Worker ?自己傳給了此線程,所以啟動線程后,會調(diào)用 ?Worker 的 run 方法
public?void?run()?{runWorker(this); }可以看到最終會調(diào)用 ?runWorker 方法,接下來我們來分析下 runWorker 方法
final?void?runWorker(Worker?w)?{Thread?wt?=?Thread.currentThread();Runnable?task?=?w.firstTask;w.firstTask?=?null;//?unlock?會調(diào)用?tryRelease?方法將?state?設(shè)置成?0,代表允許中斷,允許中斷的條件上文我們在?interruptIfStarted()?中有提過,即?state?>=?0w.unlock();boolean?completedAbruptly?=?true;try?{//?如果在提交任務(wù)時創(chuàng)建了線程,并把任務(wù)丟給此線程,則會先執(zhí)行此?task//?否則從任務(wù)隊列中獲取?task?來執(zhí)行(即?getTask()?方法)while?(task?!=?null?||?(task?=?getTask())?!=?null)?{w.lock();//?如果線程池狀態(tài)為?>=?STOP(即?STOP,TIDYING,TERMINATED?)時,則線程應(yīng)該中斷//?如果線程池狀態(tài)?<?STOP,?線程不應(yīng)該中斷,如果中斷了(Thread.interrupted()?返回?true,并清除標(biāo)志位),再次判斷線程池狀態(tài)(防止在清除標(biāo)志位時執(zhí)行了?shutdownNow()?這樣的方法),如果此時線程池為?STOP,執(zhí)行線程中斷if?((runStateAtLeast(ctl.get(),?STOP)?||(Thread.interrupted()?&&runStateAtLeast(ctl.get(),?STOP)))?&&!wt.isInterrupted())wt.interrupt();try?{//?執(zhí)行任務(wù)前,子類可實現(xiàn)此鉤子方法作為統(tǒng)計之用beforeExecute(wt,?task);Throwable?thrown?=?null;try?{task.run();}?catch?(RuntimeException?x)?{thrown?=?x;?throw?x;}?catch?(Error?x)?{thrown?=?x;?throw?x;}?catch?(Throwable?x)?{thrown?=?x;?throw?new?Error(x);}?finally?{//?執(zhí)行任務(wù)后,子類可實現(xiàn)此鉤子方法作為統(tǒng)計之用afterExecute(task,?thrown);}}?finally?{task?=?null;w.completedTasks++;w.unlock();}}completedAbruptly?=?false;}?finally?{//?如果執(zhí)行到這只有兩種可能,一種是執(zhí)行過程中異常中斷了,一種是隊列里沒有任務(wù)了,從這里可以看出線程沒有核心線程與非核心線程之分,哪個任務(wù)異常了或者正常退出了都會執(zhí)行此方法,此方法會根據(jù)情況將線程數(shù)-1processWorkerExit(w,?completedAbruptly);} }來看看 processWorkerExit 方法是咋樣的
private?void?processWorkerExit(Worker?w,?boolean?completedAbruptly)?{//?如果異常退出,cas?執(zhí)行線程池減?1?操作if?(completedAbruptly)?decrementWorkerCount();final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{completedTaskCount?+=?w.completedTasks;//?加鎖確保線程安全地移除?worker?workers.remove(w);}?finally?{mainLock.unlock();}//?woker?既然異常退出,可能線程池狀態(tài)變了(如執(zhí)行?shutdown?等),嘗試著關(guān)閉線程池tryTerminate();int?c?=?ctl.get();//??如果線程池處于?STOP?狀態(tài),則如果?woker?是異常退出的,重新新增一個?woker,如果是正常退出的,在?wokerQueue?為非空的條件下,確保至少有一個線程在運行以執(zhí)行?wokerQueue?中的任務(wù)????if?(runStateLessThan(c,?STOP))?{if?(!completedAbruptly)?{int?min?=?allowCoreThreadTimeOut???0?:?corePoolSize;if?(min?==?0?&&?!?workQueue.isEmpty())min?=?1;if?(workerCountOf(c)?>=?min)return;?//?replacement?not?needed}addWorker(null,?false);} }接下來我們分析 woker 從 workQueue 中取任務(wù)的方法 getTask
private?Runnable?getTask()?{boolean?timedOut?=?false;?//?Did?the?last?poll()?time?out?for?(;;)?{int?c?=?ctl.get();int?rs?=?runStateOf(c);//?如果線程池狀態(tài)至少為?STOP?或者//?線程池狀態(tài)?==?SHUTDOWN?并且任務(wù)隊列是空的//?則減少線程數(shù)量,返回?null,這種情況下上文分析的?runWorker?會執(zhí)行?processWorkerExit?從而讓獲取此?Task?的?woker?退出if?(rs?>=?SHUTDOWN?&&?(rs?>=?STOP?||?workQueue.isEmpty()))?{decrementWorkerCount();return?null;}int?wc?=?workerCountOf(c);//?如果?allowCoreThreadTimeOut?為?true,代表任何線程在?keepAliveTime?時間內(nèi)處于?idle?狀態(tài)都會被回收,如果線程數(shù)大于?corePoolSize,本身在?keepAliveTime?時間內(nèi)處于?idle?狀態(tài)就會被回收boolean?timed?=?allowCoreThreadTimeOut?||?wc?>?corePoolSize;//?worker?應(yīng)該被回收的幾個條件,這個比較簡單,就此略過if?((wc?>?maximumPoolSize?||?(timed?&&?timedOut))&&?(wc?>?1?||?workQueue.isEmpty()))?{if?(compareAndDecrementWorkerCount(c))return?null;continue;}try?{//?阻塞獲取?task,如果在?keepAliveTime?時間內(nèi)未獲取任務(wù),說明超時了,此時?timedOut?為?trueRunnable?r?=?timed??workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS)?:workQueue.take();if?(r?!=?null)return?r;timedOut?=?true;}?catch?(InterruptedException?retry)?{timedOut?=?false;}} }經(jīng)過以上源碼剖析,相信我們對線程池的工作原理了解得八九不離十了,再來簡單過一下其他一些比較有用的方法,開頭我們提到線程池的監(jiān)控問題,我們看一下可以監(jiān)控哪些指標(biāo)
int getCorePoolSize():獲取核心線程數(shù)。
int getLargestPoolSize():歷史峰值線程數(shù)。
int getMaximumPoolSize():最大線程數(shù)(線程池線程容量)。
int getActiveCount():當(dāng)前活躍線程數(shù)
int getPoolSize():當(dāng)前線程池中的線程總數(shù)
BlockingQueuegetQueue() 當(dāng)前線程池的任務(wù)隊列,據(jù)此可以獲取積壓任務(wù)的總數(shù),getQueue.size()
監(jiān)控思路也很簡單,開啟一個定時線程 ScheduledThreadPoolExecutor,定期對這些線程池指標(biāo)進行采集,一般會采用一些開源工具如 Grafana + Prometheus + MicroMeter 來實現(xiàn)。
如何實現(xiàn)核心線程池的預(yù)熱
使用 ?prestartAllCoreThreads() 方法,這個方法會一次性創(chuàng)建 corePoolSize 個線程,無需等到提交任務(wù)時才創(chuàng)建,提交創(chuàng)建好線程的話,一有任務(wù)提交過來,這些線程就可以立即處理。
如何實現(xiàn)動態(tài)調(diào)整線程池參數(shù)
setCorePoolSize(int corePoolSize) 調(diào)整核心線程池大小
setMaximumPoolSize(int maximumPoolSize)
setKeepAliveTime() 設(shè)置線程的存活時間
解答開篇的問題
其它問題基本都在源碼剖析環(huán)節(jié)回答了,這里簡單說下其他問題
1、Tomcat 的線程池和 JDK 的線程池實現(xiàn)有啥區(qū)別, Dubbo 中有類似 Tomcat 的線程池實現(xiàn)嗎? Dubbo 中一個叫 EagerThreadPool 的東西,可以看看它的使用說明
從注釋里可以看出,如果核心線程都處于 busy 狀態(tài),如果有新的請求進來,EagerThreadPool 會選擇先創(chuàng)建線程,而不是將其放入任務(wù)隊列中,這樣可以更快地響應(yīng)這些請求。
Tomcat 實現(xiàn)也是與此類似的,只不過稍微有所不同,當(dāng) Tomcat 啟動時,會先創(chuàng)建 minSpareThreads 個線程,如果經(jīng)過一段時間收到請求時這些線程都處于忙碌狀態(tài),每次都會以 minSpareThreads 的步長創(chuàng)建線程,本質(zhì)上也是為了更快地響應(yīng)處理請求。具體的源碼可以看它的 ThreadPool 實現(xiàn),這里就不展開了。
2、我司網(wǎng)關(guān) dubbo 調(diào)用線程池曾經(jīng)出現(xiàn)過這樣的一個問題:壓測時接口可以正常返回,但接口 RT 很高,假設(shè)設(shè)置的核心線程大小為 500,最大線程為 800,緩沖隊列為 5000,你能從這個設(shè)置中發(fā)現(xiàn)出一些問題并對這些參數(shù)進行調(diào)優(yōu)嗎?這個參數(shù)明顯能看出問題來,首先任務(wù)隊列設(shè)置過大,任務(wù)達(dá)到核心線程后,如果再有請求進來會先進入任務(wù)隊列,隊列滿了之后才創(chuàng)建線程,創(chuàng)建線程也是需要不少開銷的,所以我們后來把核心線程設(shè)置成了與最大線程一樣,并且調(diào)用 prestartAllCoreThreads() 來預(yù)熱核心線程,就不用等請求來時再創(chuàng)建線程了。
線程池的幾個最佳實踐
1、線程池執(zhí)行的任務(wù)應(yīng)該是互相獨立的,如果互相依賴的話,可能導(dǎo)致死鎖,比如下面這樣的代碼
ExecutorService?pool?=?Executors.newSingleThreadExecutor(); pool.submit(()?->?{try?{String?qq=pool.submit(()->"QQ").get();System.out.println(qq);}?catch?(Exception?e)?{} });2、核心任務(wù)與非核心任務(wù)最好能用多個線程池隔離開來
曾經(jīng)我們業(yè)務(wù)上就出現(xiàn)這樣的一個故障:突然很多用戶反饋短信收不到了,排查才發(fā)現(xiàn)發(fā)短信是在一個線程池里,而另外的定時腳本也是用的這個線程池來執(zhí)行任務(wù),這個腳本一分鐘可能產(chǎn)生幾百上千條任務(wù),導(dǎo)致發(fā)短信的方法在線程池里基本沒機會執(zhí)行,后來我們用了兩個線程池把發(fā)短信和執(zhí)行腳本隔離開來解決了問題。
3、添加線程池監(jiān)控,動態(tài)設(shè)置線程池
如前文所述,線程池的各個參數(shù)很難一次性確定,既然難以確定,又要保證發(fā)現(xiàn)問題后及時解決,我們就需要為線程池增加監(jiān)控,監(jiān)控隊列大小,線程數(shù)量等,我們可以設(shè)置 3 分鐘內(nèi)比如隊列任務(wù)一直都是滿了的話,就觸發(fā)告警,這樣可以提前預(yù)警,如果線上因為線程池參數(shù)設(shè)置不合理而觸發(fā)了降級等操作,可以通過動態(tài)設(shè)置線程池的方式來實時修改核心線程數(shù),最大線程數(shù)等,將問題及時修復(fù)。
總結(jié)
本文詳細(xì)剖析了線程池的工作原理,相信大家對其工作機制應(yīng)該有了較深入的了解,也對開頭的幾個問題有了較清楚的認(rèn)識,本質(zhì)上設(shè)置線程池的目的是為了利用有效的資源最大化性能,最小化風(fēng)險,同時線程池的使用本質(zhì)上是為了更好地為用戶服務(wù),據(jù)此也不難明白 Tomcat, Dubbo 要另起爐灶來設(shè)置自己的線程池了。
最后歡迎大家加我私人微信,一起討論,共同進步,拉你進讀者群,2020 難過,我們一起抱團取暖!
巨人的肩膀
https://dzone.com/articles/how-much-memory-does-a-java-thread-take
https://segmentfault.com/a/1190000021047279
https://www.cnblogs.com/trust-freedom/p/6681948.html
深入理解線程池 https://tinyurl.com/y675j928
有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ
Java 并發(fā)編程實戰(zhàn)
Java線程池實現(xiàn)原理及其在美團業(yè)務(wù)中的實踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
最后歡迎大家加我好友,拉你進技術(shù)交流群,群里有很多 BAT 的大咖,可以提問,互相交流,內(nèi)推等,進群一起抱團取暖
巨人的肩膀
https://dzone.com/articles/how-much-memory-does-a-java-thread-take
https://segmentfault.com/a/1190000021047279
https://www.cnblogs.com/trust-freedom/p/6681948.html
深入理解線程池 https://tinyurl.com/y675j928
有的線程它死了,于是它變成一道面試題 https://mp.weixin.qq.com/s/wrTVGLDvhE-eb5lhygWEqQ
Java 并發(fā)編程實戰(zhàn)
Java線程池實現(xiàn)原理及其在美團業(yè)務(wù)中的實踐: https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww
線程池異常處理詳解,一文搞懂!https://www.cnblogs.com/ncy1/articles/11629933.html
23張圖!萬字詳解「鏈表」,從小白到大佬!
面試官:你說說互斥鎖、自旋鎖、讀寫鎖、悲觀鎖、樂觀鎖的應(yīng)用場景?
25 張圖,1.4 w字!徹底搞懂分布式事務(wù)原理
關(guān)注我,每天陪你進步一點點!
總結(jié)
以上是生活随笔為你收集整理的2万字,看完这篇才敢说自己真的懂线程池!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: switch 的性能提升了 3 倍,我只
- 下一篇: 面试官 | AJAX请求为什么不安全?