Java Review - 并发编程_ThreadPoolExecutor原理源码剖析
文章目錄
- 線程池主要解決兩個問題
- 類關系圖
- ctl 含義 ---- 記錄線程池狀態和線程池中線程個數
- 線程池狀態 及轉換
- 線程池參數
- 線程池類型
- mainLock & termination
- Worker
- 源碼分析
- public void execute(Runnable command)
- 新增線程addWorkder源碼分析
- 工作線程Worker的執行
- getTask()
- processWorkerExit
- shutdown
- shutdownNow
- awaitTermination
- 小結
線程池主要解決兩個問題
-
一是當執行大量異步任務時線程池能夠提供較好的性能。在不使用線程池時,每當需要執行異步任務時直接new一個線程來運行,而線程的創建和銷毀是需要開銷的。線程池里面的線程是可復用的,不需要每次執行異步任務時都重新創建和銷毀線程。
-
二是線程池提供了一種資源限制和管理的手段,比如可以限制線程的個數,動態新增線程等。每個ThreadPoolExecutor也保留了一些基本的統計數據,比如當前線程池完成的任務數目等。
另外,線程池也提供了許多可調參數和可擴展性接口,以滿足不同情境的需要,程序員可以使用更方便的Executors的工廠方法,比如newCachedThreadPool(線程池線程個數最多可達Integer.MAX_VALUE,線程自動回收)、newFixedThreadPool(固定大小的線程池)和newSingleThreadExecutor(單個線程)等來創建線程池,當然用戶還可以自定義。
類關系圖
在上圖中,Executors其實是個工具類,里面提供了好多靜態方法,這些方法根據用戶選擇返回不同的線程池實例。
ctl 含義 ---- 記錄線程池狀態和線程池中線程個數
ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是一個Integer的原子變量,用來記錄線程池狀態和線程池中線程個數,類似于ReentrantReadWriteLock使用一個變量來保存兩種信息。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));這里假設Integer類型是32位二進制表示,則其中高3位用來表示線程池狀態,后面29位用來記錄線程池線程個數。
/用來標記線程池狀態(高3位),線程個數(低29位) //默認是RUNNING狀態,線程個數為0private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));//線程個數掩碼位數 private static final int COUNT_BITS = Integer.SIZE - 3;//線程最大個數(低29位)00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1;線程池狀態:
//(高3位):11100000000000000000000000000000 private static final int RUNNING = -1 << COUNT_BITS;//(高3位):00000000000000000000000000000000 private static final int SHUTDOWN = 0 << COUNT_BITS;//(高3位):00100000000000000000000000000000 private static final int STOP = 1 << COUNT_BITS;//(高3位):01000000000000000000000000000000 private static final int TIDYING = 2 << COUNT_BITS;//(高3位):01100000000000000000000000000000 private static final int TERMINATED = 3 << COUNT_BITS;// 獲取高三位 運行狀態 private static int runStateOf(int c) { return c & ~CAPACITY; }//獲取低29位 線程個數 private static int workerCountOf(int c) { return c & CAPACITY; }//計算ctl新值,線程狀態 與 線程個數 private static int ctlOf(int rs, int wc) { return rs | wc; }線程池狀態 及轉換
-
RUNNING:接受新任務并且處理阻塞隊列里的任務。
-
SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務。
-
STOP:拒絕新任務并且拋棄阻塞隊列里的任務,同時會中斷正在處理的任務。
-
TIDYING:所有任務都執行完(包含阻塞隊列里面的任務)后當前線程池活動線程數為0,將要調用terminated方法。
-
TERMINATED:終止狀態。terminated方法調用完成以后的狀態。
線程池狀態轉換列舉如下。
-
RUNNING -> SHUTDOWN :顯式調用shutdown()方法,或者隱式調用了finalize()、方法里面的shutdown()方法。
-
RUNNING 或 SHUTDOWN)-> STOP :顯式調用 shutdownNow()方法時。
-
SHUTDOWN -> TIDYING :當線程池和任務隊列都為空時。
-
STOP -> TIDYING :當線程池為空時。
-
TIDYING -> TERMINATED: 當 terminated() hook 方法執行完成時
線程池參數
-
corePoolSize:線程池核心線程個數。
-
workQueue:用于保存等待執行的任務的阻塞隊列, 比如基于數組的有界ArrayBlockingQueue、基于鏈表的無界LinkedBlockingQueue、最多只有一個元素的同步隊列SynchronousQueue及優先級隊列PriorityBlockingQueue等。
-
maximunPoolSize:線程池最大線程數量。
-
ThreadFactory:創建線程的工廠。
-
RejectedExecutionHandler:飽和策略,當隊列滿并且線程個數達到maximunPoolSize后采取的策略。
比如
AbortPolicy(拋出異常)、
CallerRunsPolicy(使用調用者所在線程來運行任務)、
DiscardOldestPolicy(調用poll丟棄一個任務,執行當前任務)
DiscardPolicy(默默丟棄,不拋出異常) -
keeyAliveTime:存活時間。如果當前線程池中的線程數量比核心線程數量多,并且是閑置狀態,則這些閑置的線程能存活的最大時間。
-
TimeUnit:存活時間的時間單位
線程池類型
- newFixedThreadPool :創建一個核心線程個數和最大線程個數都為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收
- newSingleThreadExecutor: 創建一個核心線程個數和最大線程個數都為1的線程池,并且阻塞隊列長度為Integer.MAX_VALUE。keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收。
- newCachedThreadPool :創建一個按需創建線程的線程池,初始線程個數為0,最多線程個數為Integer.MAX_VALUE,并且阻塞隊列為同步隊列。keeyAliveTime=60說明只要當前線程在60s內空閑則回收。這個類型的特殊之處在于,加入同步隊列的任務會被馬上執行,同步隊列里面最多只有一個任務。
mainLock & termination
/*** Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.*/private final ReentrantLock mainLock = new ReentrantLock(); /*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();-
mainLock是獨占鎖,用來控制新增Worker線程操作的原子性。
-
termination是該鎖對應的條件隊列,在線程調用awaitTermination時用來存放阻塞的線程。
Worker
Worker繼承AQS和Runnable接口,是具體承載任務的對象。Worker繼承了AQS,自己實現了簡單不可重入獨占鎖,其中state=0表示鎖未被獲取狀態,state=1表示鎖已經被獲取的狀態,state=-1是創建Worker時默認的狀態,創建時狀態設置為-1是為了避免該線程在運行runWorker()方法前被中斷。其中變量firstTask記錄該工作線程執行的第一個任務,thread是具體執行任務的線程。
DefaultThreadFactory是線程工廠,newThread方法是對線程的一個修飾。其中poolNumber是個靜態的原子變量,用來統計線程工廠的個數,threadNumber用來記錄每個線程工廠創建了多少線程,這兩個值也作為線程池和線程的名稱的一部分。
源碼分析
public void execute(Runnable command)
execute方法的作用是提交任務command到線程池進行執行。用戶線程提交任務到線程池的模型圖如下圖所示。
從該圖可以看出,ThreadPoolExecutor的實現實際是一個生產消費模型,當用戶添加任務到線程池時相當于生產者生產元素,workers線程工作集中的線程直接執行任務或者從任務隊列里面獲取任務時則相當于消費者消費元素。
用戶線程提交任務的execute方法的具體代碼如下
public void execute(Runnable command) {// 1 任務為null ,拋出 npe異常if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/// 2 獲取當前線程池的狀態 + 線程個數變量的組合值 int c = ctl.get();// 3 當前線程池中的個數是否小于corePoolSize ,小于的話則開啟新的線程 if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 4 如果線程池處于running狀態,則添加任務到阻塞隊列if (isRunning(c) && workQueue.offer(command)) {// 4.1 二次檢查int recheck = ctl.get();// 4.2 如果當前線程池的狀態不是running 則從隊列中移除任務,并執行拒絕策略if (! isRunning(recheck) && remove(command))reject(command);// 4.3 否則當線程數數量為空,則添加一個線程 else if (workerCountOf(recheck) == 0)addWorker(null, false);} // 5 如果隊列滿,則新增線程,新增線程失敗,觸發拒絕策略 else if (!addWorker(command, false))reject(command);}-
代碼(3)判斷如果當前線程池中線程個數小于corePoolSize,會向workers里面新增一個核心線程(core線程)執行該任務。
-
如果當前線程池中線程個數大于等于corePoolSize則執行代碼(4)。如果當前線程池處于RUNNING狀態則添加當前任務到任務隊列。這里需要判斷線程池狀態是因為有可能線程池已經處于非RUNNING狀態,而在非RUNNING狀態下是要拋棄新任務的。
-
如果向任務隊列添加任務成功,則代碼(4.2)對線程池狀態進行二次校驗,這是因為添加任務到任務隊列后,執行代碼(4.2)前有可能線程池的狀態已經變化了。這里進行二次校驗,如果當前線程池狀態不是RUNNING了則把任務從任務隊列移除,移除后執行拒絕策略;如果二次校驗通過,則執行代碼(4.3)重新判斷當前線程池里面是否還有線程,如果沒有則新增一個線程。
-
如果代碼(4)添加任務失敗,則說明任務隊列已滿,那么執行代碼(5)嘗試新開啟線程(如上的thread3和thread4)來執行該任務,如果當前線程池中線程個數>maximumPoolSize則執行拒絕策略。
新增線程addWorkder源碼分析
/*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary. // 6 檢查隊列是否只在必要的時候為空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 7 循環CAS增加線程個數 for (;;) {int wc = workerCountOf(c);// 7.1 如果線程個數超過限制 則返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 7.2 cas增加線程個數,同時只能有1個線程成功 if (compareAndIncrementWorkerCount(c))break retry;// 7.3 cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試重新獲取線程池狀態,否者內層循環重新cas。c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}// 8 到這里,說明CAS成功了boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 8.1 創建Workerw = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 8.2 加獨占鎖,為了workers同步,因為可能多個線程調用了線程池的execute方法。final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.//8.3 重新檢查線程池的狀態,避免在獲取鎖前調用了shutdown接口int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// 8.4 添加任務workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 8.5 添加成功,則啟動線程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}主要分兩個部分:
- 第一部分雙重循環的目的是通過CAS操作增加線程數;
- 第二部分主要是把并發安全的任務添加到workers里面,并且啟動任務執行。
首先來分析第一部分的代碼6
// 6 檢查隊列是否只在必要的時候為空if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;展開!運算后等價于
rs >= SHUTDOWN &&(rs != SHUTDOWN ||firstTask != null ||workQueue.isEmpty())也就是說下面幾種情況下會返回false:
- 當前線程池狀態為STOP,TIDYING,TERMINATED
- 當前線程池狀態為SHUTDOWN并且已經有了第一個任務
- 當前線程池狀態為SHUTDOWN并且任務隊列為空
內層循環的作用是使用CAS操作增加線程數,代碼(7.1)判斷如果線程個數超限則返回false,否則執行代碼(7.2)CAS操作設置線程個數,CAS成功則退出雙循環,CAS失敗則執行代碼(7.3)看當前線程池的狀態是否變化了,如果變了,則再次進入外層循環重新獲取線程池狀態,否則進入內層循環繼續進行CAS嘗試。
執行到第二部分的代碼(8)時說明使用CAS成功地增加了線程個數,但是現在任務還沒開始執行。這里使用全局的獨占鎖來控制把新增的Worker添加到工作集workers中。代碼(8.1)創建了一個工作線程Worker。
代碼(8.2)獲取了獨占鎖,代碼(8.3)重新檢查線程池狀態,這是為了避免在獲取鎖前其他線程調用了shutdown關閉了線程池。如果線程池已經被關閉,則釋放鎖,新增線程失敗,否則執行代碼(8.4)添加工作線程到線程工作集,然后釋放鎖。代碼(8.5)判斷如果新增工作線程成功,則啟動工作線程。
工作線程Worker的執行
用戶線程提交任務到線程池后,由Worker來執行。先看下Worker的構造函數。
/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;//創建一個線程this.thread = getThreadFactory().newThread(this);}在構造函數內首先設置Worker的狀態為-1,這是為了避免當前Worker在調用runWorker方法前被中斷(當其他線程調用了線程池的shutdownNow時,如果Worker狀態>=0則會中斷該線程)。這里設置了線程的狀態為-1,所以該線程就不會被中斷了。在如下runWorker代碼中,運行代碼(9)時會調用unlock方法,該方法把status設置為了0,所以這時候調用shutdownNow會中斷Worker線程。
/** Delegates main run loop to outer runWorker */public void run() {runWorker(this);} final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 9 將state 置為0 ,允許終端w.unlock(); // allow interruptsboolean completedAbruptly = true;try {// 10 while (task != null || (task = getTask()) != null) {// 10.1 w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 10.2 執行任務前干一些事情beforeExecute(wt, task);Throwable thrown = null;try {// 10.3 執行任務task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 10.4 執行任務完成后干一些事情afterExecute(task, thrown);}} finally {task = null;// 10.5 統計當前Worker完成了多少任務w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 11 執行清理工作 processWorkerExit(w, completedAbruptly);}}-
在如上代碼(10)中,如果當前task==null或者調用getTask從任務隊列獲取的任務返回null,則跳轉到代碼(11)執行。
-
如果task不為null則執行代碼(10.1)獲取工作線程內部持有的獨占鎖,然后執行擴展接口代碼(10.2)在具體任務執行前做一些事情。代碼(10.3)具體執行任務,代碼(10.4)在任務執行完畢后做一些事情,代碼(10.5)統計當前Worker完成了多少個任務,并釋放鎖。
-
這里在執行具體任務期間加鎖,是為了避免在任務運行期間,其他線程調用了shutdown后正在執行的任務被中斷(shutdown只會中斷當前被阻塞掛起的線程)
getTask()
如果當前task為空,則直接執行,否者調用getTask從任務隊列獲取一個任務執行,如果任務隊列為空,則worker退出。
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果當前線程池狀態>=STOP 或者線程池狀態為shutdown并且工作隊列為空則,減少工作線程個數if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}boolean timed; // Are workers subject to culling?for (;;) {int wc = workerCountOf(c);timed = allowCoreThreadTimeOut || wc > corePoolSize;if (wc <= maximumPoolSize && ! (timedOut && timed))break;if (compareAndDecrementWorkerCount(c))return null;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}try {//根據timed選擇調用poll還是阻塞的takeRunnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}} }processWorkerExit
代碼(11)執行清理任務,其代碼如下。
/*** Performs cleanup and bookkeeping for a dying worker. Called* only from worker threads. Unless completedAbruptly is set,* assumes that workerCount has already been adjusted to account* for exit. This method removes thread from worker set, and* possibly terminates the pool or replaces the worker if either* it exited due to user task exception or if fewer than* corePoolSize workers are running or queue is non-empty but* there are no workers.** @param w the worker* @param completedAbruptly if the worker died due to user exception*/private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // If abrupt, then workerCount wasn't adjusteddecrementWorkerCount();//統計整個線程池完成的任務個數final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w);} finally {mainLock.unlock();}//嘗試設置線程池狀態為TERMINATED,如果當前是shutdonw狀態并且工作隊列為空//或者當前是stop狀態當前線程池里面沒有活動線程tryTerminate();//如果當前線程個數小于核心個數,則增加int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return; // replacement not needed}addWorker(null, false);} }shutdown
調用shutdown后,線程池就不會在接受新的任務了,但是工作隊列里面的任務還是要執行的,但是該方法立刻返回的,并不等待隊列任務完成在返回。
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 12 advanceRunState(SHUTDOWN);// 13 interruptIdleWorkers();// 14 onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();// 15 }-
代碼(12)檢查看是否設置了安全管理器,是則看當前調用shutdown命令的線程是否有關閉線程的權限,如果有權限則還要看調用線程是否有中斷工作線程的權限,如果沒有權限則拋出SecurityException或者NullPointerException異常。
-
其中代碼(13)的內容如下,如果當前線程池狀態>=SHUTDOWN則直接返回,否則設置為SHUTDOWN狀態。
- 代碼(14)的內容如下,其設置所有空閑線程的中斷標志。這里首先加了全局鎖,同時只有一個線程可以調用shutdown方法設置中斷標志。然后嘗試獲取Worker自己的鎖,獲取成功則設置中斷標志。由于正在執行的任務已經獲取了鎖,所以正在執行的任務沒有被中斷。這里中斷的是阻塞到getTask()方法并企圖從隊列里面獲取任務的線程,也就是空閑線程。
在如上代碼中,首先使用CAS設置當前線程池狀態為TIDYING,如果設置成功則執行擴展接口terminated在線程池狀態變為TERMINATED前做一些事情,然后設置當前線程池狀態為TERMINATED。最后調用 termination.signalAll()激活因調用條件變量termination的await系列方法而被阻塞的所有線程
shutdownNow
調用shutdownNow方法后,線程池就不會再接受新的任務了,并且會丟棄工作隊列里面的任務,正在執行的任務會被中斷,該方法會立刻返回,并不等待激活的任務執行完成。返回值為這時候隊列里面被丟棄的任務列表。
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess(); // 16advanceRunState(STOP);// 17 interruptWorkers();//18 tasks = drainQueue();//19 } finally {mainLock.unlock();}tryTerminate();return tasks;}在如上代碼中,首先調用代碼(16)檢查權限,然后調用代碼(17)設置當前線程池狀態為STOP,隨后執行代碼(18)中斷所有的工作線程。這里需要注意的是,中斷的所有線程包含空閑線程和正在執行任務的線程。
private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted();} finally {mainLock.unlock();}}然后代碼(19)將當前任務隊列里面的任務移動到tasks列表。
awaitTermination
等待線程池狀態變為TERMINATED則返回,或者時間超時。由于整個過程獨占鎖,所以一般調用shutdown或者shutdownNow后使用。
public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}}-
如上代碼首先獲取獨占鎖,然后在無限循環內部判斷當前線程池狀態是否至少是TERMINATED狀態,如果是則直接返回,否則說明當前線程池里面還有線程在執行 ,則看設置的超時時間nanos是否小于0,小于0則說明不需要等待,那就直接返回,如果大于0則調用條件變量termination的awaitNanos方法等待nanos時間,期望在這段時間內線程池狀態變為TERMINATED。
-
在shutdown方法時提到過,當線程池狀態變為TERMINATED時,會調用termination.signalAll()用來激活調用條件變量termination的await系列方法被阻塞的所有線程,所以如果在調用awaitTermination之后又調用了shutdown方法,并且在shutdown內部將線程池狀態設置為TERMINATED,則termination.awaitNanos方法會返回。
-
另外在工作線程Worker的runWorker方法內,當工作線程運行結束后,會調用processWorkerExit方法,在processWorkerExit方法內部也會調用tryTerminate方法測試當前是否應該把線程池狀態設置為TERMINATED,如果是,則也會調用termination.signalAll()用來激活調用線程池的awaitTermination方法而被阻塞的線程。
-
而且當等待時間超時后,termination.awaitNanos也會返回,這時候會重新檢查當前線程池狀態是否為TERMINATED,如果是則直接返回,否則繼續阻塞掛起自己。
小結
線程池巧妙地使用一個Integer類型的原子變量來記錄線程池狀態和線程池中的線程個數。通過線程池狀態來控制任務的執行,每個Worker線程可以處理多個任務。線程池通過線程的復用減少了線程創建和銷毀的開銷。
總結
以上是生活随笔為你收集整理的Java Review - 并发编程_ThreadPoolExecutor原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_D
- 下一篇: Java Review - 并发编程_S