Java:ThreadPoolExecutor解析
目錄
?
功能介紹
線程池相關(guān)類圖
源碼解析
基本概念
字段域
常量
線程構(gòu)造重要字段
線程控制重要字段
方法
執(zhí)行任務(wù)
ThreadPoolExecutor的關(guān)閉
功能介紹
線程池,顧名思義一個線程的池子,池子里存放了很多可以復用的線程,如果不用線程池類似的容器,每當我們需要創(chuàng)建新的線程時都需要去new Thread(),用完之后就被回收了,線程的啟動回收都需要用戶態(tài)到內(nèi)核態(tài)的交互,頻繁的創(chuàng)建開銷比較大。而且隨著線程數(shù)的增加,會引起CPU頻繁的上下文切換嚴重影響性能。這時候線程池類似的容器就發(fā)揮出了作用。線程池里面的線程不但可以復用,而且還可以控制線程并發(fā)的數(shù)量,是CPU的性能達到最優(yōu)。
線程池相關(guān)類圖
源碼解析
基本概念
工作線程:即用于執(zhí)行任務(wù)的線程。
任務(wù):將要執(zhí)行的任務(wù)
緩存隊列:工作線程全部被占用時,緩存任務(wù)的隊列。
字段域
常量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; //0001 1111 1111 1111 1111 1111 1111 1111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //1110 0000 0000 0000 0000 0000 0000 0000 //能接受新任務(wù),隊列中的任務(wù)可繼續(xù)運行 private static final int RUNNING = -1 << COUNT_BITS; //0000 0000 0000 0000 0000 0000 0000 0000 //不再接受新任務(wù),隊列中的任務(wù)仍可繼續(xù)執(zhí)行 private static final int SHUTDOWN = 0 << COUNT_BITS; //0010 0000 0000 0000 0000 0000 0000 0000 //不再接受新任務(wù),不再執(zhí)行隊列中的任務(wù),中斷所有執(zhí)行中的任務(wù)(發(fā)中斷消息) private static final int STOP = 1 << COUNT_BITS; //0100 0000 0000 0000 0000 0000 0000 0000 //所有任務(wù)均已終止,workerCount的值為0,轉(zhuǎn)到TIDYING狀態(tài)的線程即將要執(zhí)行terminated()鉤子方法. private static final int TIDYING = 2 << COUNT_BITS; //0110 0000 0000 0000 0000 0000 0000 0000 //terminated()方法執(zhí)行結(jié)束 private static final int TERMINATED = 3 << COUNT_BITS;?線程池的狀態(tài)控制由AtomicInteger類型變量ctl 控制,其值為32位整形,其中前3位用于線程池狀態(tài),后29位用于線程數(shù)量控制。
線程池具有5個狀態(tài):
RUNNING = 111 SHUTDOWN = 000 STOP = 001 TIDYING = 010 TERMINATED = 011?各狀態(tài)之間可能的轉(zhuǎn)變有以下幾種:
RUNNING -> SHUTDOWN調(diào)用了shutdown方法,線程池實現(xiàn)了finalize方法。在finalize內(nèi)調(diào)用了shutdown方法。因此shutdown可能是在finalize中被隱式調(diào)用的 (RUNNING or SHUTDOWN) -> STOP調(diào)用了shutdownNow方法 SHUTDOWN -> TIDYING當隊列和線程池均為空的時候 STOP -> TIDYING當線程池為空的時候 TIDYING -> TERMINATEDterminated()鉤子方法調(diào)用完畢?狀態(tài)的解析
//初始化線程數(shù) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //獲取當前線程的運行狀態(tài) //~CAPACITY = 1110 0000 0000 0000 0000 0000 0000 0000 private static int runStateOf(int c) { return c & ~CAPACITY; } //獲取當前的線程數(shù) //CAPACITY = 0001 1111 1111 1111 1111 1111 1111 1111 private static int workerCountOf(int c) { return c & CAPACITY; } //或操作 // 111x xxxx xxxx xxxx xxxx xxxx xxxx xxxx | 0000 0000 0000 0000 0000 0000 0000 0100 // = 1110 0000 0000 0000 0000 0000 0000 0100 private static int ctlOf(int rs, int wc) { return rs | wc; }private static boolean runStateLessThan(int c, int s) {return c < s;}private static boolean runStateAtLeast(int c, int s) {return c >= s;}private static boolean isRunning(int c) {return c < SHUTDOWN;}?狀態(tài)的設(shè)計理念
- 將運行狀態(tài)設(shè)置為小于0的數(shù),便于判斷當前線程池是否處于running狀態(tài);
- 僅使用32位存儲線程池狀態(tài)與線程池內(nèi)的線程數(shù),若需要獲取線程池信息,只需要一個int即可
- 采用位運算,提高了執(zhí)行效率;
- 同時,線程池狀態(tài)還有擴增空間(23=8,目前只有5種狀態(tài)),而線程池最大容量2^29,也可保證在絕大部分應用中是不會溢出的。而在源碼中也聲明了:如果在未來這個也成為一個問題,那么可以擴增為AtomicLong。
線程構(gòu)造重要字段
- corePoolSize:核心池的大小(即任務(wù)線程的個數(shù)),這個參數(shù)與后面講述的線程池的實現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放到緩存隊列當中;緩存隊列即可以是有界的,也可以是無界的,僅當緩存隊列不可加入任務(wù),并且任務(wù)線程數(shù)不大于maximumPoolSize時,才又創(chuàng)建任務(wù)線程。
- maximumPoolSize:線程池最大線程數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;
- keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize:即當線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize;但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;
- unit:參數(shù)keepAliveTime的時間單位
線程控制重要字段
//待執(zhí)行線程隊列 private final BlockingQueue<Runnable> workQueue; //鎖,基于重入鎖,線程池核心之一 private final ReentrantLock mainLock = new ReentrantLock(); //線程隊列,這是該線程池內(nèi)已有線程 //注意與workQueue的區(qū)別 private final HashSet<Worker> workers = new HashSet<Worker>(); //多線程協(xié)調(diào)通信 private final Condition termination = mainLock.newCondition(); //拒絕handler,用于線程池不接受新加線程時的處理方式 //分為系統(tǒng)拒絕(線程池要關(guān)閉等),與線程池飽和(已達線程池最大容量) private volatile RejectedExecutionHandler handler; //線程工廠,新建線程池時帶入 private volatile ThreadFactory threadFactory; //默認拒絕向線程池中新加線程的方式:丟棄 private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();方法
執(zhí)行任務(wù)
使用ThreadPoolExecutor執(zhí)行任務(wù)的時候,可以使用execute或submit方法,submit方法如下:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}submit方法內(nèi)部使用了execute方法,而且submit方法是有返回值的。在調(diào)用execute方法之前,使用FutureTask包裝一個Runnable,這個FutureTask就是返回值。
execute()方法
源碼如下:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) { // 如果工作線程的數(shù)量小于corePoolSize,表示可以創(chuàng)建線程直接用于運行任務(wù)。if (addWorker(command, true)) return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) { //工作線程的數(shù)量>=corePoolSize,則不能再創(chuàng)建工作線程了,需要把任務(wù)加入緩存隊列中去。int recheck = ctl.get();//檢查當前線程池狀態(tài)是否不在Running狀態(tài)了//若是,將線程cmd從等待隊列內(nèi)移除//這個時候存在一種case,線程池不處于running狀態(tài)//但是remove失敗了,這個時候看具體的queue處理了//線程池還是很忠實的去嘗試interruptif (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}//如果緩存隊列加入不了任務(wù)了,則又可以創(chuàng)建新的工作線程了。else if (!addWorker(command, false)) //創(chuàng)建工作線程失敗,則執(zhí)行reject策略。reject(command); }提交一個新的任務(wù)的3種情況如下:
- 如果當前正在執(zhí)行的Worker數(shù)量比corePoolSize(基本大小)要小。直接創(chuàng)建一個新的Worker執(zhí)行任務(wù),會調(diào)用addWorker方法
- 如果當前正在執(zhí)行的Worker數(shù)量大于等于corePoolSize(基本大小)。將任務(wù)放到阻塞隊列里,如果阻塞隊列沒滿并且狀態(tài)是RUNNING的話,直接丟到阻塞隊列,否則執(zhí)行第3步。丟到阻塞隊列之后,還需要再做一次驗證(丟到阻塞隊列之后可能另外一個線程關(guān)閉了線程池或者剛剛加入到隊列的線程死了)。如果這個時候線程池不在RUNNING狀態(tài),把剛剛丟入隊列的任務(wù)remove掉,調(diào)用reject方法,否則查看Worker數(shù)量,如果Worker數(shù)量為0,起一個新的Worker去阻塞隊列里拿任務(wù)執(zhí)行
- 丟到阻塞失敗的話,會調(diào)用addWorker方法嘗試起一個新的Worker去阻塞隊列拿任務(wù)并執(zhí)行任務(wù),如果這個新的Worker創(chuàng)建失敗,調(diào)用reject方法
注意:
每次判斷狀態(tài)時,都必須重新獲取狀態(tài)。
addWorker()方法
addWorker關(guān)系著如何起一個線程,Worker是一個AQS的實現(xiàn)類(參考:AbstractQueuedSynchronizer源碼解析),同時也是一個實現(xiàn)Runnable的類,使用獨占鎖,它的構(gòu)造函數(shù)只接受一個Runnable參數(shù),內(nèi)部保存著這個Runnable屬性,還有一個thread線程屬性用于包裝這個Runnable(這個thread屬性使用ThreadFactory構(gòu)造,在構(gòu)造函數(shù)內(nèi)完成thread線程的構(gòu)造),另外還有一個completedTasks計數(shù)器表示這個Worker完成的任務(wù)數(shù)。Worker類復寫了run方法,使用ThreadPoolExecutor的runWorker方法(在addWorker方法里調(diào)用),直接啟動Worker的話,會調(diào)用ThreadPoolExecutor的runWork方法。需要特別注意的是這個Worker是實現(xiàn)了Runnable接口的,thread線程屬性使用ThreadFactory構(gòu)造Thread的時候,構(gòu)造的Thread中使用的Runnable其實就是Worker。下面的Worker的源碼:
private final class Worker extends AbstractQueuedSynchronizerimplements Runnable {private static final long serialVersionUID = 6138294804551838833L;/** worker綁定的線程.null表示失敗 */final Thread thread;/** 初始化時指定的任務(wù),可為null. */Runnable firstTask;/** 完成任務(wù)數(shù) */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {//把狀態(tài)位設(shè)置成-1,這樣任何線程都不能得到Worker的鎖,除非調(diào)用了unlock方法。//這個unlock方法會在runWorker方法中一開始就調(diào)用,這是為了確保Worker構(gòu)造出來之后,沒有任何線程能夠得到它的鎖,//除非調(diào)用了runWorker之后,其他線程才能獲得Worker的鎖setState(-1); this.firstTask = firstTask;// 使用ThreadFactory構(gòu)造Thread,這個構(gòu)造的Thread內(nèi)部的Runnable就是本身,也就是Worker。//所以得到Worker的thread并start的時候,會執(zhí)行Worker的run方法,也就是執(zhí)行ThreadPoolExecutor的runWorker方法this.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);}//0表示unlock,1表示lockprotected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}} }addWork代碼
//嘗試向線程池內(nèi)新增一個線程 private boolean addWorker(Runnable firstTask, boolean core) {//用于跳出外層循環(huán)的標簽retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.//若線程池處于非運行狀態(tài)//且//或rs不處于SHUTDOWN狀態(tài)(STOP、TIDYING、TERMINATED 之一)//或firstTask不為空 (非運行狀態(tài),不可以再增加Task了,所以firstTask不為空要返回false)//或緩沖隊列為空 (size== 0,表示不可以插入元素了)//那么返回false,表明新增一個線程失敗(執(zhí)行firstTask 也失敗)if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;//此時線程池處于running狀態(tài),firstTask不為空//且緩沖隊列不為空,此時需要新增一個線程for (;;) {//獲取線程池當前線程數(shù)量int wc = workerCountOf(c);//若線程池超過最大容量,或大于設(shè)定的容量//corePoolSize與maximumPoolSize均為傳入的參數(shù)//那么直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;//線程池未過限,那么采用cas機制,將線程池計數(shù)器擴增1,跳出標簽if (compareAndIncrementWorkerCount(c))break retry;//獲取當前線程池信息c = ctl.get(); // Re-read ctl//若線程池狀態(tài)有變更,從標簽處重新循環(huán)if (runStateOf(c) != rs)continue retry;//若線程池狀態(tài)未變化,繼續(xù)內(nèi)層的for循環(huán)}}//上面若將線程池計數(shù)器加1了//這里就要對線程池擴增了(即增加一個工作線程)boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {//創(chuàng)建一個線程實例w = new Worker(firstTask);//獲取線程//不在創(chuàng)建實例時直接run該線程,是避免構(gòu)造函數(shù)未執(zhí)行完,就run導致的異常final Thread t = w.thread;if (t != null) {//重入鎖final ReentrantLock mainLock = this.mainLock;//鎖上,走起mainLock.lock();try {//獲取線程池狀態(tài)int rs = runStateOf(ctl.get());//若線程池狀態(tài)為running狀態(tài)//或為SHUTDOWN且傳入的線程為空if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {//若新起的線程狀態(tài)為活躍狀態(tài)if (t.isAlive()) // precheck that t is startable//拋異常throw new IllegalThreadStateException();//否則向運行線程池內(nèi)加上該線程workers.add(w);int s = workers.size();//判斷當前線程池是否為線程池最大//是的話交換,做統(tǒng)計用if (s > largestPoolSize)largestPoolSize = s;//新增標志位置為成功workerAdded = true;}} finally {//重入鎖解鎖mainLock.unlock();}//判斷線程是否加入成功if (workerAdded) {//加入成功//啟動當前線程,將當前線程交有os管理t.start();//設(shè)置標志位workerStarted = true;}}} finally {//若未啟動成功if (! workerStarted)//回滾當前新起線程操作//移除當前新增失敗的線程//將線程池計數(shù)器減1//嘗試中斷線程池或者中斷當前線程addWorkerFailed(w);}//返回標志位,是否新增線程成功return workerStarted; }邏輯圖如下:
上圖來源于:https://blog.csdn.net/varyall/article/details/82392048
runWorker()方法
Worker中的線程start的時候,調(diào)用Worker本身run方法,又調(diào)用外部類ThreadPoolExecutor的runWorker方法,runWorker方法代碼如下:
final void runWorker(Worker w) {Thread wt = Thread.currentThread(); // 得到當前線程Runnable task = w.firstTask; // 得到Worker中的任務(wù)task,也就是用戶傳入的taskw.firstTask = null; // 將Worker中的任務(wù)置空w.unlock(); // allow interrupts。 boolean completedAbruptly = true;try {// 如果worker中的任務(wù)不為空,繼續(xù)執(zhí)行,//如果worker中的任務(wù)為空,則使用getTask獲得任務(wù)。//一直死循環(huán),除非得到的任務(wù)為空才退出while (task != null || (task = getTask()) != null) {w.lock(); // 在執(zhí)行任務(wù)之前先做一些處理。 //1. 如果線程池已經(jīng)處于STOP狀態(tài)并且當前線程沒有被中斷,中斷線程 //2. 如果線程池還處于RUNNING或SHUTDOWN狀態(tài),并且當前線程已經(jīng)被中斷了,重新檢查一下線程池狀態(tài),如果處于STOP狀態(tài)并且沒有被中斷,那么中斷線程if ((runStateAtLeast(ctl.get(), STOP) //如果線程池已經(jīng)處于STOP狀態(tài)或者之后的狀態(tài)||(Thread.interrupted() //本線程已處于中斷狀態(tài)&&runStateAtLeast(ctl.get(), STOP) //再檢查一次)) &&!wt.isInterrupted() //worker未狀態(tài))wt.interrupt();try {beforeExecute(wt, task); // 任務(wù)執(zhí)行前需要做什么,ThreadPoolExecutor是個空實現(xiàn)Throwable thrown = null;try {task.run(); // 真正的開始執(zhí)行任務(wù),調(diào)用的是run方法,而不是start方法。這里run的時候可能會被中斷,比如線程池調(diào)用了shutdownNow方法} catch (RuntimeException x) { // 任務(wù)執(zhí)行發(fā)生的異常全部拋出,不在runWorker中處理thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown); // 任務(wù)執(zhí)行結(jié)束需要做什么,ThreadPoolExecutor是個空實現(xiàn)}} finally {task = null;w.completedTasks++; // 記錄執(zhí)行任務(wù)的個數(shù)w.unlock(); // 執(zhí)行完任務(wù)之后,解鎖,Worker變成閑置Worker}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 回收Worker方法} }getTask()方法:
用于緩存隊列中獲取一個Task。阻塞隊列參考:Java并發(fā)包--阻塞隊列(BlockingQueue)
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);//如果線程池處于shutdown狀態(tài),//并且隊列為空,或者線程池處于stop或者terminate狀態(tài),//則:線程池數(shù)量-1,返回null,回收線程if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}//標識當前線程在空閑時,是否應該超時回收boolean timed; for (;;) {int wc = workerCountOf(c);//如果allowCoreThreadTimeOut 為ture//或者當前線程數(shù)量大于核心線程池數(shù)目,//則需要超時回收timed = allowCoreThreadTimeOut || wc > corePoolSize;//(1)//如果線程數(shù)目小于最大線程數(shù)目,//且不允許超時回收或者未超時,//則跳出循環(huán),繼續(xù)去阻塞隊列中取任務(wù)(2)if (wc <= maximumPoolSize && ! (timedOut && timed))break;//如果上面if沒有成立,則當前線程數(shù)-1,返回null,回收該線程if (compareAndDecrementWorkerCount(c))return null;//如果上面if沒有成立,則CAS修改ctl失敗,重讀,cas循環(huán)重新嘗試修改c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}(2)try {//如果允許空閑回收,則調(diào)用阻塞隊列的poll,//否則take,一直等到隊列中有可取任務(wù)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();//取到任務(wù),返回任務(wù),//否則超時timedOut = true;進入下一個循環(huán),//并且在(1)處會不成立,進而進入到cas修改ctl的程序中if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }processWorkerExit()方法
如果getTask返回的是null,那說明阻塞隊列已經(jīng)沒有任務(wù)并且當前調(diào)用getTask的Worker需要被回收,那么會調(diào)用processWorkerExit方法進行回收:
private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 如果Worker沒有正常結(jié)束流程調(diào)用processWorkerExit方法,worker數(shù)量減一。如果是正常結(jié)束的話,在getTask方法里worker數(shù)量已經(jīng)減一了decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 加鎖,防止并發(fā)問題try {completedTaskCount += w.completedTasks; // 記錄總的完成任務(wù)數(shù)workers.remove(w); // 線程池的worker集合刪除掉需要回收的Worker} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結(jié)束線程池int c = ctl.get();if (runStateLessThan(c, STOP)) { // 如果線程池還處于RUNNING或者SHUTDOWN狀態(tài)// Worker是正常結(jié)束流程的話if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // 不需要新開一個Worker}// 新開一個Worker代替原先的Worker// 新開一個Worker需要滿足以下3個條件中的任意一個:// 1. 用戶執(zhí)行的任務(wù)發(fā)生了異常// 2. Worker數(shù)量比線程池基本大小要小// 3. 阻塞隊列不空但是沒有任何Worker在工作addWorker(null, false);} }tryTerminate()方法
在回收Worker的時候線程池會嘗試結(jié)束自己的運行,tryTerminate方法:
final void tryTerminate() {for (;;) {int c = ctl.get();// 滿足3個條件中的任意一個,不終止線程池// 1. 線程池還在運行,不能終止// 2. 線程池處于TIDYING或TERMINATED狀態(tài),說明已經(jīng)在關(guān)閉了,不允許繼續(xù)處理// 3. 線程池處于SHUTDOWN狀態(tài)并且阻塞隊列不為空,這時候還需要處理阻塞隊列的任務(wù),不能終止線程池if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 走到這一步說明線程池已經(jīng)不在運行,阻塞隊列已經(jīng)沒有任務(wù),但是還要回收正在工作的Workerif (workerCountOf(c) != 0) {// 由于線程池不運行了,調(diào)用了線程池的關(guān)閉方法,在解釋線程池的關(guān)閉原理的時候會說道這個方法interruptIdleWorkers(ONLY_ONE); // 中斷閑置Worker,直到回收全部的Worker。這里沒有那么暴力,只中斷一個,中斷之后退出方法,中斷了Worker之后,Worker會回收,然后還是會調(diào)用tryTerminate方法,如果還有閑置線程,那么繼續(xù)中斷return;}// 走到這里說明worker已經(jīng)全部回收了,并且線程池已經(jīng)不在運行,阻塞隊列已經(jīng)沒有任務(wù)。可以準備結(jié)束線程池了final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 加鎖,防止并發(fā)try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { // cas操作,將線程池狀態(tài)改成TIDYINGtry {terminated(); // 調(diào)用terminated方法} finally {ctl.set(ctlOf(TERMINATED, 0)); // terminated方法調(diào)用完畢之后,狀態(tài)變?yōu)門ERMINATEDtermination.signalAll();}return;}} finally {mainLock.unlock(); // 解鎖}// else retry on failed CAS} }ThreadPoolExecutor的關(guān)閉
shutdown方法,關(guān)閉線程池,關(guān)閉之后阻塞隊列里的任務(wù)不受影響,會繼續(xù)被Worker處理,但是新的任務(wù)不會被接受:
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 關(guān)閉的時候需要加鎖,防止并發(fā)try {checkShutdownAccess(); // 檢查關(guān)閉線程池的權(quán)限advanceRunState(SHUTDOWN); // 把線程池狀態(tài)更新到SHUTDOWNinterruptIdleWorkers(); // 中斷閑置的WorkeronShutdown(); // 鉤子方法,默認不處理。ScheduledThreadPoolExecutor會做一些處理} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結(jié)束線程池,上面已經(jīng)分析過了 }interruptIdleWorkers方法,注意,這個方法中斷的是閑置Worker,中斷閑置Worker之后,getTask方法會返回null,然后Worker會被回收。那什么是閑置Worker呢?
閑置Worker是這樣解釋的:Worker運行的時候會去阻塞隊列拿數(shù)據(jù)(getTask方法),拿的時候如果沒有設(shè)置超時時間,那么會一直阻塞直到等待阻塞隊列進數(shù)據(jù),即worker沒有獲取到任務(wù),這樣的Worker就被稱為閑置Worker。由于Worker也是一個AQS,在runWorker方法里會有一對lock和unlock操作,這對lock操作是為了確保Worker不是一個閑置Worker。
所以Worker被設(shè)計成一個AQS是為了根據(jù)Worker的鎖來判斷是否是閑置線程,是否可以被強制中斷。
下面我們看下interruptIdleWorkers方法:
// 調(diào)用他的一個重載方法,傳入了參數(shù)false,表示要中斷所有的正在運行的閑置Worker,如果為true表示只打斷一個閑置Worker private void interruptIdleWorkers() {interruptIdleWorkers(false); }private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 中斷閑置Worker需要加鎖,防止并發(fā)try {for (Worker w : workers) { Thread t = w.thread; // 拿到worker中的線程if (!t.isInterrupted() // Worker中的線程沒有被中斷 && w.tryLock() //并且Worker可以獲取鎖,這里Worker能獲取鎖說明Worker是個閑置Worker,在阻塞隊列里拿數(shù)據(jù)一直被阻塞,沒有數(shù)據(jù)進來。如果沒有獲取到Worker鎖,說明Worker還在執(zhí)行任務(wù),不進行中斷(shutdown方法不會中斷正在執(zhí)行的任務(wù)) ) { try {t.interrupt(); // 中斷Worker線程} catch (SecurityException ignore) {} finally {w.unlock(); // 釋放Worker鎖}}if (onlyOne) // 如果只打斷1個Worker的話,直接break退出,否則,遍歷所有的Workerbreak;}} finally {mainLock.unlock(); // 解鎖} }shutdown方法
將線程池狀態(tài)改成SHUTDOWN,線程池還能繼續(xù)處理阻塞隊列里的任務(wù),并且會回收一些閑置的Worker。
但是shutdownNow方法不一樣,它會把線程池狀態(tài)改成STOP狀態(tài),這樣不會處理阻塞隊列里的任務(wù),也不會處理新的任務(wù):
// shutdownNow方法會有返回值的,返回的是一個任務(wù)列表,而shutdown方法沒有返回值 public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // shutdownNow操作也需要加鎖,防止并發(fā)try {checkShutdownAccess(); // 檢查關(guān)閉線程池的權(quán)限advanceRunState(STOP); // 把線程池狀態(tài)更新到STOPinterruptWorkers(); // 中斷Worker的運行tasks = drainQueue();} finally {mainLock.unlock(); // 解鎖}tryTerminate(); // 嘗試結(jié)束線程池,上面已經(jīng)分析過了return tasks; }shutdownNow的中斷和shutdown方法不一樣,調(diào)用的是interruptWorkers方法:
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock(); // 中斷Worker需要加鎖,防止并發(fā)try {for (Worker w : workers)w.interruptIfStarted(); // 中斷Worker的執(zhí)行} finally {mainLock.unlock(); // 解鎖} }Worker的interruptIfStarted方法中斷Worker的執(zhí)行:
void interruptIfStarted() {Thread t;// Worker無論是否被持有鎖,只要還沒被中斷,那就中斷Workerif (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt(); // 強行中斷Worker的執(zhí)行} catch (SecurityException ignore) {}} }線程池關(guān)閉總結(jié):
線程池的關(guān)閉主要是兩個方法,shutdown和shutdownNow方法。
shutdown方法會更新狀態(tài)到SHUTDOWN,不會影響阻塞隊列里任務(wù)的執(zhí)行,但是不會執(zhí)行新進來的任務(wù)。同時也會回收閑置的Worker,閑置Worker的定義上面已經(jīng)說過了。
shutdownNow方法會更新狀態(tài)到STOP,會影響阻塞隊列的任務(wù)執(zhí)行,也不會執(zhí)行新進來的任務(wù)。同時會回收所有的Worker。
?
參考:
https://fangjian0423.github.io/2016/03/22/java-threadpool-analysis/
?
總結(jié)
以上是生活随笔為你收集整理的Java:ThreadPoolExecutor解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: FutureTask源码
- 下一篇: TimingWheel 时间轮详解