ThreadPoolExecutor源码解析(一)
1.ThreadPoolExcuter原理說(shuō)明
首先我們要知道為什么要使用ThreadPoolExcuter,具體可以看看文檔中的說(shuō)明:
線程池可以解決兩個(gè)不同問(wèn)題:由于減少了每個(gè)任務(wù)的調(diào)用開(kāi)銷,在執(zhí)行大量的異步任務(wù)時(shí),它通常能夠提供更好的性能,并且還可以提供綁定和管理資源(包括執(zhí)行集合任務(wù)時(shí)使用的線程)的方法。每個(gè) ThreadPoolExecutor還維護(hù)著一些基本的統(tǒng)計(jì)數(shù)據(jù),如完成的任務(wù)數(shù)。
線程池做的其實(shí)可以看得很簡(jiǎn)單,其實(shí)就是把你提交的任務(wù)(task)進(jìn)行調(diào)度管理運(yùn)行,但這個(gè)調(diào)度的過(guò)程以及其中的狀態(tài)控制是比較復(fù)雜的。
2.初始化參數(shù)介紹
可以直接看最完整的ThreadPoolExcuter的初始化函數(shù):
逐個(gè)介紹如下:
corePoolSize:核心線程數(shù),在ThreadPoolExcutor中有一個(gè)與它相關(guān)的配置:allowCoreThreadTimeOut(默認(rèn)為false),當(dāng)allowCoreThreadTimeOut為false時(shí),核心線程會(huì)一直存活,哪怕是一直空閑著。而當(dāng)allowCoreThreadTimeOut為true時(shí)核心線程空閑時(shí)間超過(guò)keepAliveTime時(shí)會(huì)被回收。
maximumPoolSize:最大線程數(shù),線程池能容納的最大線程數(shù),當(dāng)線程池中的線程達(dá)到最大時(shí),此時(shí)添加任務(wù)將會(huì)采用拒絕策略,默認(rèn)的拒絕策略是拋出一個(gè)運(yùn)行時(shí)錯(cuò)誤(RejectedExecutionException)。值得一提的是,當(dāng)初始化時(shí)用的工作隊(duì)列為L(zhǎng)inkedBlockingDeque時(shí),這個(gè)值將無(wú)效。
keepAliveTime:存活時(shí)間,當(dāng)非核心空閑超過(guò)這個(gè)時(shí)間將被回收,同時(shí)空閑核心線程是否回收受allowCoreThreadTimeOut影響。
unit:keepAliveTime的單位。
workQueue:任務(wù)隊(duì)列,常用有三種隊(duì)列,即SynchronousQueue,LinkedBlockingDeque(無(wú)界隊(duì)列),ArrayBlockingQueue(有界隊(duì)列)。
threadFactory:線程工廠,ThreadFactory是一個(gè)接口,用來(lái)創(chuàng)建worker。通過(guò)線程工廠可以對(duì)線程的一些屬性進(jìn)行定制。默認(rèn)直接新建線程。
RejectedExecutionHandler:也是一個(gè)接口,只有一個(gè)方法,當(dāng)線程池中的資源已經(jīng)全部使用,添加新線程被拒絕時(shí),會(huì)調(diào)用RejectedExecutionHandler的rejectedExecution法。
默認(rèn)是拋出一個(gè)運(yùn)行時(shí)異常。
這么多參數(shù)看起來(lái)好像很復(fù)雜,所以Java貼心得為我們準(zhǔn)備了便捷的API,即可以直接用Executors創(chuàng)建各種線程池。分別是:
//創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過(guò)處理需要,可靈活回收空閑線程,若無(wú)可回收,則新建線程。//通過(guò)設(shè)置corePoolSize為0,而maximumPoolSize為Integer.Max_VALUE(Int型數(shù)據(jù)最大值)實(shí)現(xiàn)。ExecutorService cache = Executors.newCachedThreadPool();//創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。//通過(guò)將corePoolSize和maximumPoolSize的值設(shè)置為一樣的值來(lái)實(shí)現(xiàn)。ExecutorService fixed = Executors.newFixedThreadPool(num);//創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。//通過(guò)將隊(duì)列參數(shù)workQueue設(shè)置為DelayWorkQueue來(lái)實(shí)現(xiàn)。ExecutorService schedule = Executors.newScheduledThreadPool(5);//創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來(lái)執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。//通過(guò)將corePoolSize和maximumPoolSize都設(shè)置為1來(lái)實(shí)現(xiàn)。ExecutorService single = Executors.newSingleThreadExecutor();? 這幾個(gè)API會(huì)根據(jù)具體的情況而使用預(yù)設(shè)定好默認(rèn)的初始化參數(shù)去創(chuàng)建一個(gè)ThreadPoolExecutor。
這里需要做一個(gè)額外說(shuō)明,在ThreadPoolExcuter中,worker和task是有區(qū)別的,task是用戶提交的任務(wù),而worker則是用來(lái)執(zhí)行task的線程。在初始化參數(shù)中,corePoolSize和maximumPoolSize都是針對(duì)worker的,而workQueue是用來(lái)存放task的。
3.worker介紹
前面有介紹了一下worker和task的區(qū)別,其中task是用戶提交的線程任務(wù),而worker則是ThreadPoolExecutor自己內(nèi)部實(shí)現(xiàn)的一個(gè)類了。
具體源碼如下:
/*** Woker主要維護(hù)著運(yùn)行task的worker的中斷控制信息,以及其他小記錄。這個(gè)類拓展AbstractQueuedSynchronizer* 而來(lái)簡(jiǎn)化獲取和釋放每一個(gè)任務(wù)執(zhí)行中的鎖。這可以防止中斷那些打算喚醒正在等待其他線程任務(wù)的任務(wù),而不是* 中斷正在運(yùn)行的任務(wù)。我們實(shí)現(xiàn)一個(gè)簡(jiǎn)單的不可重入鎖而不是ReentrantLo,因?yàn)槲覀儾幌氘?dāng)其調(diào)用setCorePoolSize* 這樣的方法的時(shí)候能獲得鎖。*///worker主要是對(duì)進(jìn)行中的任務(wù)進(jìn)行中斷控制,順帶著對(duì)其他行為進(jìn)行記錄private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. *///正在跑的線程,如果是null標(biāo)識(shí)factory失敗final Thread thread;/** Initial task to run. Possibly null. *///初始化一個(gè)任務(wù)以運(yùn)行Runnable firstTask;/** Per-thread task counter *///每個(gè)線程計(jì)數(shù)volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)* 用給定的first task和從threadFactory創(chuàng)建*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker *///主要調(diào)用了runWorkerpublic void run() {runWorker(this);}// Lock methods//// The value 0 represents the unlocked state.// The value 1 represents the locked state.//鎖方法//protected boolean isHeldExclusively() {return getState() != 0;}//嘗試獲取鎖protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}//嘗試釋放鎖protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}
? Worker其實(shí)可以看作高級(jí)一點(diǎn)的線程。其中繼承AbstractQueuedSynchronizer主要是為了實(shí)現(xiàn)鎖控制。ThreadPoolExecutor會(huì)持有并管理Worker,在Worker中firstTask其實(shí)就是存放task的,而thread則是存放當(dāng)前Worker本身的線程。
其中比較重要的就是run方法了,但這個(gè)方法其實(shí)又是去調(diào)用ThreadPoolExecutor里面的runWorker()方法,具體可以看下一節(jié)的介紹。
4.ctl介紹以及運(yùn)行狀態(tài)說(shuō)明
首先需要介紹線程池有五種運(yùn)行狀態(tài):
RUNNING(狀態(tài)值-1): 接收新任務(wù)并處理隊(duì)列中的任務(wù)
SHUTDOWN(狀態(tài)值0): 不接收新任務(wù)但會(huì)處理隊(duì)列中的任務(wù)。
STOP(狀態(tài)值1): 不接收新任務(wù),不處理隊(duì)列中的任務(wù),并中斷正在處理的任務(wù)
TIDYING(狀態(tài)值2): 所有任務(wù)已終止,workerCount為0,處于TIDYING狀態(tài)的線程將調(diào)用鉤子方法terminated()。
TERMINATED(狀態(tài)值3): terminated()方法完成。
然后我們可以看看ThreadPoolExcuter中的ctl這個(gè)變量。
ctl是ThreadPoolExcuter中比較有意思的一個(gè)實(shí)現(xiàn),它是一個(gè)AtomicInteger,這里不對(duì)AtomicInteger多做討論,只要知道可以把它看成有原子性的Integer就夠了,其實(shí)它具有原子性的原理是使用了CAS的技術(shù),這是一種樂(lè)觀鎖的具體實(shí)現(xiàn)。
ThreadPoolExcuter是將兩個(gè)內(nèi)部值打包成一個(gè)值,即將workerCount和runState(運(yùn)行狀態(tài))這兩個(gè)值打包在一個(gè)ctl中,因?yàn)閞unState有5個(gè)值,需要3位,所以有3位表示
runState,而其他29位表示為workerCount。
而運(yùn)行時(shí)要獲取其他數(shù)據(jù)時(shí),只需要對(duì)ctl進(jìn)行拆包即可。具體這部分代碼如下:
5.拒絕策略
當(dāng)執(zhí)行器(Executor)處于終止?fàn)顟B(tài),或者執(zhí)行器在max threads和工作隊(duì)列都是有界并且處于飽和的時(shí)候,新提交的任務(wù)會(huì)被拒絕。在任一情況下,執(zhí)行的任務(wù)將調(diào)用RejectedExecutionHandler的方法rejectedExecution(Runnable,?ThreadPoolExecutor)。有以下四種拒絕策略:
1.默認(rèn)的是ThreadPoolExecutor.AbortPolicy,在這種策略下,處理器會(huì)在拒絕后拋出一個(gè)運(yùn)行異常RejectedExecutionException。
2.在ThreadPoolExecutor.CallerRunsPolicy的策略下,線程會(huì)調(diào)用它直接的execute來(lái)運(yùn)行這個(gè)任務(wù)。這種方式提供簡(jiǎn)單的反饋控制機(jī)制來(lái)減緩新任務(wù)提交的速度。
3.在ThreadPoolExecutor.DiscardPolicy策略下,無(wú)法執(zhí)行的任務(wù)將被簡(jiǎn)單得刪除掉。
4.在ThreadPoolExecutor.DiscardOldestPolicy策略下,如果executor沒(méi)有處于終止?fàn)顟B(tài),在工作隊(duì)列頭的任務(wù)將被刪除,然后會(huì)重新執(zhí)行(可能會(huì)再次失敗,這會(huì)導(dǎo)致重復(fù)這個(gè)過(guò)程)。
?
總結(jié):本篇初步介紹了ThreadPoolExcuter的基本原理,解決了什么問(wèn)題。而后說(shuō)明了ThreadPoolExcuter中的初始化參數(shù),對(duì)其中的各個(gè)參數(shù)做初步介紹。再之后介紹ctl變量的作用,并初步介紹了任務(wù)提交失敗后的拒絕策略。
from:?https://www.cnblogs.com/listenfwind/p/9043479.html
總結(jié)
以上是生活随笔為你收集整理的ThreadPoolExecutor源码解析(一)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 利用模拟退火提高Kmeans的聚类精度
- 下一篇: Java-- String源码分析