Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)
為什么80%的碼農都做不了架構師?>>> ??
這個系列一直沒再寫,很多原因,中間經歷了換工作,熟悉項目,熟悉新團隊等等一系列的事情。并發課題對于Java來說是一個又重要又難的一大塊,除非氣定神閑、精力滿滿,否則我本身是不敢隨便寫這個話題的。隨便出一個生澀、浮于表面的文章,我覺得意義不大。所以一直就擱置到現在。這一次開啟,有一個小小的契機:我自己面試中,已經被問爛了的構造函數的幾個參數有什么意義,這種問題,發現其實很多人并不了解。就著這次的機會,我就重開這個課題。
一、基本的一些準備知識
李老爺子的線程池堪稱經典,老爺子也因此風靡全球開發者圈子,閱讀了源碼,你才能感受到什么叫做編程思想,我們普普通通的CRUD簡直都弱爆了!老爺子牛逼點也在于,源碼中的注釋非常完備,這不得不佩服:思想牛逼一方面,能把思想完善、由淺入深的表述出來,我覺得更牛逼!其中對于這個ThreadPoolExecutor的基礎知識的了解,我覺得完全可以看注釋就可以學全了。要想了解線程池源碼,我們先要了解如下幾個方面:
- 線程池的幾種狀態
- 線程池的狀態表述
- 狀態的使用的方式
- 線程池的構造函數
1、線程池的幾種狀態
最關鍵的是幾種扭轉的狀態,讓我們直接上老爺子的注釋:
/* The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasks* SHUTDOWN: Don't accept new tasks, but process queued tasks* STOP: Don't accept new tasks, don't process queued tasks,* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook method* TERMINATED: terminated() has completed** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:** * (下面是幾種轉態轉換的根本的基本方式,很簡單的英文,不用翻譯)* RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.*/- RUNNING:接受新的任務,并且也繼續運行阻塞隊列里面的任務
- SHUTDOWN:不接受新的任務了,但是可以繼續執行阻塞隊列里面的任務
- STOP:不接受新的任務了,也不運行阻塞隊列里面的任務了,并且去打斷正在執行的任務
- TIDYING:所有的任務都已經終止了,workerCount(任務數)是0,線程池運行到了這個狀態之后,會去調terminated()這個方法
- TERMINATED:terminated()這個方法執行完成
2、線程池的狀態表述
同樣,上源碼:
// ctl這是一個很重要的參數,使用位標記方式,將當前線程池狀態和當前線程池創建的任務多少雜糅到了一起 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 預留三位 private static final int COUNT_BITS = Integer.SIZE - 3; // 線程池最大線程大小:(2^29)-1 (about 500 million) private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 線程池狀態位,使用int的高三位進行儲存 private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;// 通過ctl值計算出運行線程池狀態值 private static int runStateOf(int c) { return c & ~CAPACITY; } // 通過ctl值計算出線程池當前任務多少的值 private static int workerCountOf(int c) { return c & CAPACITY; } // 通過運行狀態和任務多少的兩個值,生成ctl這個包裝的值 private static int ctlOf(int rs, int wc) { return rs | wc; }思想也很簡單:大家熟知的int類型,是占四字節,32位的。為了狀態操作的高效與空間節約,老爺子使用了位操作來控制。其中32位的高三位用來存儲線程池的狀態;低29位用來控制當前線程池有多少個線程。上面的源碼就是對位操作的基本實現(都是基本的位操作,我這里不在累贅)
3、狀態的使用的方式
這里會給出幾個源碼中,對狀態和線程數量操控的方式:
// (c:ctl,s:state)當前線程池的狀態,是不是小于給定的狀態 private static boolean runStateLessThan(int c, int s) {return c < s; }// (c:ctl,s:state)當前線程池的狀態,是不是大于等于給定的狀態 private static boolean runStateAtLeast(int c, int s) {return c >= s; }// 當前線程池的狀態是RUNNING的嗎 private static boolean isRunning(int c) {return c < SHUTDOWN; }// 使用CAS原理對當前線程池線程數量值加一 private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1); }// 使用CAS原理對當前線程池線程數量值減一 private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1); }// 使用CAS原理對當前線程池線程數量值減一,直到成功為止 private void decrementWorkerCount() {do {} while (! compareAndDecrementWorkerCount(ctl.get())); }下面的源碼是對線程狀態修改源碼:
private void advanceRunState(int targetState) {// 這是一個死循環,直到修改成功才breakfor (;;) {int c = ctl.get();if (runStateAtLeast(c, targetState) ||ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))break;} }這里有兩個判斷條件,只要一個成功就會break循環
- runStateAtLeast:如果當前狀態和要設置的狀態相等,或者比要設置的狀態大。說明線程池狀態的不可逆,說明,如果一個線程池已經是SHUTDOWN了,是不能設置回RUNNING狀態的
- compareAndSet:CAS設置ctl值。根據短路原理,到了這個方法執行已經說明當前狀態是小于要設置狀態了,所以可以修改ctl的狀態位值。如果設置失敗,返回false,繼續死循環。成功,break
3、線程池的構造函數
常用的JDK推薦的,或者各大“api使用”書籍介紹的,無非都是下面的幾個方法,進行創建線程池:
- Executors.newCachedThreadPool
- Executors.newFixedThreadPool
- Executors.newScheduledThreadPool
- Executors.newSingleThreadExecutor
可是當我們深入源碼,才發現:這幾個方法的內部無非都調用了ThreadPoolExecutor的構造函數,即使是newScheduledThreadPool這個方法,表面調用了ScheduledThreadPoolExecutor類,可是深入源碼才發現:ScheduledThreadPoolExecutor類繼承了ThreadPoolExecutor,并且構造函數使用了super進行了構建。這就給我們了一個很好的切入口:只要研究ThreadPoolExecutor構造函數就行。進一步,還會發現,ThreadPoolExecutor有四個構造函數,入參不一樣,也都不約而同,最終調用了入參最多的那個(入參少的時候使用默認值),我們看看ThreadPoolExecutor中入參最多的構造函數的源碼:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 對入參進行合法性校驗if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();// 獲取系統安全管理器(不做分析)this.acc = System.getSecurityManager() == null ?null : AccessController.getContext();// 核心幾大參數的賦值操作this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler; }- corePoolSize:核心運行線程數
- maximumPoolSize:最大運行運行程
- workQueue:阻塞隊列
- keepAliveTime:當線程大于核心線程數時,且阻塞隊列沒有元素,最大等待時間
- threadFactory:生成線程的工廠類
- handler:超出線程池最大承受能力之后的失敗策略方法對象
對于線程池表現出來的各種特性,就是通過這幾個參數控制的,所以很關鍵!
二、線程池的基本執行圖解
對于線程池源碼,我們先主要從execute執行方法入手進行分析,下面主要用一個圖進行大致流程的展示:
配合上代碼,我們先指出對應代碼的大致位置,我們有個大體的概念比較好:
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 下面大約就是①的過程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// 下面大約就是②的過程if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 下面大約就是③的過程else if (!addWorker(command, false))// 下面大約就是④的過程reject(command); }三、線程池細節源碼分析
1、addWorker方法
a、addWorker,我們先來看看
private boolean addWorker(Runnable firstTask, boolean core) {// 死循環,在某些條件下,會返回揮著breakretry:for (;;) {int c = ctl.get();// 當下線程池運行狀態int rs = runStateOf(c);// 下面是對線程池狀態的一些列判斷// 這個判斷稍微有點繞,返回false的條件是:// 線程池是SHUTDOWN、STOP、TIDYING、TERMINATED其中的任意一個狀態// 且(線程池狀態為STOP、TIDYING、TERMINATED 或者 firstTask不為空 或者 阻塞隊列為空)// 同樣是返回false,添加失敗if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 下面是對線程池當下線程數的一系列判斷int wc = workerCountOf(c);// 線程數如果大于等于最大線程池允許數量((2^29)-1)或者大于等于設置的// 核心線程數或者最大線程數// 同樣是返回false,添加失敗if (wc >= CAPACITY ||// 這里也是一個玄妙之處:// 如果傳入的core為true情況,可見線程數量依賴值為核心線程數// 如果為false,數量依賴于最大的線程數。通過這個core值,就可以// 控制什么時候,依賴什么值進行創建線程wc >= (core ? corePoolSize : maximumPoolSize))return false;// 下面是CAS的經典操作:// 這個第一個if如果設置成功,就結束整體的外部循環。沒成功說明有竟態if (compareAndIncrementWorkerCount(c))break retry;// 再次獲取一遍ctl,算是double checkc = ctl.get();// 這里判斷,如果為true,說明線程池當下狀態已經被修改// 要重新通過外層循環的狀態判斷來確定返回值,所以continue了if (runStateOf(c) != rs)continue retry;// 到了這里,說明線程池狀態沒有被翻轉,那就是說當前線程數因為竟態// 原因沒有設置成功,那直接內部循環在執行一次,繼續進行CAS的設置}}// 下面是啟動線程的主要代碼// 線程是否啟動成功boolean workerStarted = false;// 線程是否添加成功boolean workerAdded = false;// 封裝傳入的線程對象Worker,這個也是很關鍵的對象,接下來會分析Worker w = null;try {// 封裝線程的初始化工作,下面會分析w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {// 當下線程池的主鎖,最大的一把鎖,上鎖期間主要對線程池容器進行維護// 這個容器是一個HashSet,保存當前運行的封裝線程Workerfinal ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 再次獲取線程池當前狀態,因為很有可能期間被人更改了int rs = runStateOf(ctl.get());// rs < SHUTDOWN:線程池是RUNNING狀態// rs == SHUTDOWN && firstTask == null:// 線程池是SHUNTDOWN且firstTask為空,這種情況主要是因為// 線程池再SHUNDOWN狀態了,可是阻塞隊列還有沒運行完的線程if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)// 保持一個線程最大線程池狀態largestPoolSize = s;// 到這里線程添加到容器成功workerAdded = true;}} finally {mainLock.unlock();}// 如果添加容器成功,就啟動封裝的線程,且設置啟動標識位為trueif (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果封裝線程啟動失敗,會進行一系列的失敗處理if (! workerStarted)addWorkerFailed(w);}return workerStarted; }b、下面是對addWorkerFailed方法的解說
private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;// 同樣的,獲取主鎖mainLock.lock();try {// 不為空的情況將封裝線程從容器中移除// 為空的情況,主要是new Worker的時候報錯if (w != null)workers.remove(w);// 循環登陸,減少一個線程數decrementWorkerCount();// 試著看看,能不能結束線程池,就是把線程池TERMINASTE了tryTerminate();} finally {mainLock.unlock();} }c、下面是tryTerminate方法的解說
final void tryTerminate() {// 發現沒,又是個死循環,老爺子很喜歡這種方式啊,而且是用for!for (;;) {int c = ctl.get();// 三種情況直接方法返回:// 1、正處在RUNNING狀態的線程池// 2、線程池的狀態是TIDYING或者是TERMINATE// 3、線程池是SHUNDOWN狀態的,但是阻塞隊列不為空if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// 走到這里,線程池的狀態可能是:SHUTDOWN(且阻塞隊列空)、STOP// 如果此時線程數不為0的話,要進行打斷操作了if (workerCountOf(c) != 0) { // 這里入參的意思是只打斷容器里第一個封裝線程里面的線程interruptIdleWorkers(ONLY_ONE);return;}// 執行到這里,說明線程池的狀態是:SHUTDOWN(阻塞隊列為空)、STOP// 此時線程數為0,說明線程池可以進行終結操作了final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// CAS先將線程池設置成TIDYING的狀態if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 執行用戶實現的terminated方法terminated();} finally {// 無論怎么樣都會將線程池設置成TERMINATED狀態ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}// 到這里說明終結成功,不過根據Java原理,返回前// 先執行finally里面的解主鎖的方法return;}} finally {mainLock.unlock();}// 如果能執行到這里,說明CAS設置TIDYING狀態失敗// 說明是竟態狀態} } private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;//線程沒有被打斷且獲取到封裝線程的鎖if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}} finally {mainLock.unlock();} } // 用戶自己實現的結束方法 protected void terminated() { }到這里,已經講完了一個很主要的內部方法:addWorker。下面我們對封裝線程對象Worker進行講解
2、Worker對象
這個東西,是一個很很很很很很很很經典的Java并發模型:AQS。這片文章不做AQS的講解,放到后續
a、具體的Worker對象張什么樣
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{private static final long serialVersionUID = 6138294804551838833L;// 這個就是最終啟動的線程,看到了吧final Thread thread;// 我們傳入的Runnable對象被放到了這里Runnable firstTask;// 這里記錄完成的任務數。// 這里說明下一個理念:一個Worker,是最終被運行的Runnanle對象// 在很大的情況下(下面做分析)Worker這個線程會一直存在// 存在的意義是不斷讀取阻塞隊列里面存儲的我們傳進來的Runnable對象// 然后運行。所以我們實現的Runnable對象的run方法,最終不是被// start方法調用執行的,而是通過直接調用執行的!volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // AQS對象狀態!也是一大難的東西!this.firstTask = firstTask;// 這里的getThreadFactory方法使用的就是我們傳入的threadFactory// 對象this.thread = getThreadFactory().newThread(this);}public void run() {// 看到了吧,這里執行了外層對象的方法,去直接調用傳入的// Runnable中的run方法,等下解說runWorker(this);}// 下面的幾個函數都是AQS必須要實現的方法,這里不累贅protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}}public ThreadFactory getThreadFactory() {return threadFactory; }b、默認的線程工廠DefaultThreadFactory:
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();// 這里記錄了線程名的前綴,可見會將線程池序號進行遞增操作namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {// 這里就是生成喜聞樂見的Thread對象了,結合上面這里的r就是我們的Worker對象Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;} }3、runWorker方法
a、接下來又是一個關鍵性方法runWorker
final void runWorker(Worker w) {// 獲取當前運行著的Worker線程Thread wt = Thread.currentThread();// 這個就是我們當下傳入的RunnableRunnable task = w.firstTask;// 置空的意思是:Worker其實是一個殼子,以后會一直運行著,不斷執行其他阻塞隊列// 里面的Runnable對象的run方法w.firstTask = null;// 這里做解鎖操作,是表示下面所有操作是可以被打斷的// 另外AQS默認情況下不做unlock操作,lock會阻塞w.unlock(); // 這個標志位表示線程執行過程中有沒有被打斷,或者運行異常boolean completedAbruptly = true;try {// 這個While循環里面的語句相當關鍵,包含了線程池執行流程的樞紐!// 我先大致說一下,下面會詳細分析getTask方法:// 主要就是判斷如果當前Worker里面的Runnable對象不為空// 就會執行這個對象的run方法;執行完之后,還會回到這個循環// 再下面的finally塊里面將task置空了,所以就去調用getTask方法// 而getTask方法是一個很大可能阻塞的方法,阻塞的原因就是等待// 阻塞隊列里面放入對象!所以也就形成了,一個Worker對象,循環// 不停的執行傳入的Runnable對象run方法。這也就構成了corePoolSize// 與maxPoolSize兩個參數控制系統級別的線程多少的目的!也就是說,// 這就是線程池里面,“池”這個概念的由來~while (task != null || (task = getTask()) != null) {w.lock();// 這里主要是判斷是否要打斷當前Worker所在的線程// 要滿足兩個個條件:// 1、當前線程池是STOP、TIDYING、TERMINATED// 2、當前線程是沒有被打斷的情況// 其中Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)// 主要用于清除線程終端標志,因為很大可能線程池剛剛轉換成STOPif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 在執行線程體之前執行的方法,用戶實現beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {// 請看下面幾個異常,都是直接拋了出去// 而并沒有處理,所以處理內部異常也是// 線程池的一個關鍵點thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 在執行完線程體之后的方法,用戶實現// 異常同時也傳入這里了,所以可以自己實現一個子類// 自己實現這個方法,進行異常處理afterExecute(task, thrown);}} finally {// 這個地方肯定會被執行,所以無論run方法怎么樣// Worker運行完成線程數都會加一task = null;w.completedTasks++;// 這里進行解鎖操作w.unlock();}}// 注意代碼執行到了這里說明while循環跳出來了// 大致有如下幾種情況:// 1、阻塞隊列里面沒值了// 2、線程池狀態翻轉,便成了大于等于SHUTDOWN狀態的了// 由于是正常結束,所以異常結束標志是falsecompletedAbruptly = false;} finally {// 這里肯定會被執行,但是有兩種情況跳入這個代碼塊// 1、run方法沒有拋異常,completedAbruptly為false// 2、run方法拋異常,completedAbruptly為true// 下面也會進行解說processWorkerExit(w, completedAbruptly);} }b、我們來看核心的getTask方法
private Runnable getTask() {// 這個標志位主要用于后面的poll方法是否超時boolean timedOut = false; // 又來了,李老爺子!是一個死循環判斷!for (;;) {int c = ctl.get();// 獲取當前線程池運行狀態int rs = runStateOf(c);// 如果同時符合下面兩種情況,直接返回null,并減少線程數量// 1、線程池狀態是:SHUTDOWN、STOP、TIDYING、TERMINTED// 2、線程池的狀態是STOP、TIDYING、TERMINTED或者隊列為空// 這預示著線程池要進行關閉操作了,此Worker要結束聲明周期!if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 這里是循環指導CAS成功設置decrementWorkerCount();return null;}// 這里獲取當前線程池的線程數int wc = workerCountOf(c);// 這個標識位要解釋解釋:// 1、allowCoreThreadTimeOut成員變量,可設置// 2、wc > corePoolSize線程數是否大于核心線程數// 簡單說就是:這個標志位控制線程池的收縮!// 很關鍵是不是!// 正常情況下只要超出核心線程數的線程才要進行收縮的// 收縮的條件是根據傳入的阻塞隊列超時時間// 但是我們可以通過設置allowCoreThreadTimeOut為true// 這樣核心線程也可以收縮!boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 這里maximumPoolSize不能為零和負數// 這里判斷很復雜,簡單理解就是:// 如果線程池線程數超出了設置的最大線程數或者阻塞隊列被打斷了// 且當前Worker所在線程不是最后一個線程或者阻塞隊列空了。// 這里如果wc>maximumPoolSize,那一定大于1,那就說明// 一定會執行if方法體;如果小于等于maximumPoolSize情況,// 那就說明是線程合理的收縮,這種時候,只有allowCoreThreadTimeOut// 被置位或者線程數大于核心線程數,當然如果要是只有一個線程數且隊列不為空// 的情況也不能收縮,要保證有封裝線程能執行阻塞隊列里面線程if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))// 這里如果設置成功說明可以終結這個Worker了return null;// 這里是continue,因為有竟態continue;}try {// 注意這里的timed的取值,timed為true的時候是:// 1、allowCoreThreadTimeOut被置位// 2、或者線程數大于核心線程數// 其他情況是直接take方法,直接阻塞的。除非被打斷Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)// 正常情況是拿到了Runnable,直接返回了return r;// 這種是阻塞隊列超時了timedOut = true;} catch (InterruptedException retry) {// 打斷情況并非阻塞隊列超時,所以這里設置成falsetimedOut = false;}} }c、下面是對processWorkerExit分析
private void processWorkerExit(Worker w, boolean completedAbruptly) {// 這個判斷說明當前Worker所在的線程執行Runnable中的run方法拋了異常// 所以這個時候,要將線程數減一if (completedAbruptly) decrementWorkerCount();// 獲取主鎖final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 將當前Worker存在期間一共執行了多少個Runnable累加到// 線程池的統計字段上面completedTaskCount += w.completedTasks;// 將封裝線程從容器中移除workers.remove(w);} finally {mainLock.unlock();}// 上面的方法在這里執行了,分析請看上面tryTerminate();int c = ctl.get();// 如果現在線程池的狀態是:RUNNING、SHUTDOWN,執行if代碼塊if (runStateLessThan(c, STOP)) {// 如果沒有拋異常情況,執行這個if代碼塊if (!completedAbruptly) {// 這個代碼塊,主要是要保證如果阻塞隊列中還有Runnable線程// 而又走到了即將結束當前WOrker的代碼,線程池要保證,至少還有// 運行著的Worker對阻塞隊列中的線程進行處理,執行int min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 為0的情況表示允許核心線程收縮,或者核心線程直接設成了0// 阻塞隊列不為空要保證最小可用的Worker為1if (min == 0 && ! workQueue.isEmpty())min = 1;// 判斷當前線程數是不是比最小的還要小if (workerCountOf(c) >= min)// 這里表明,有足夠的Worker去執行return; // 代碼運行到這里,表明沒有足夠的Worker了,下面去創建}// 這里添加一個Worker的原因是:// RUNNING和SHUTDOWN狀態都是允許繼續執行阻塞隊列中的線程的// 所以這里創建一個firstTask為null,依賴getTast去獲取隊列中的// 線程去執行。false的原因是創建依據maximumPoolSizeaddWorker(null, false);} }四、結尾
到此為止,線程池的主要源碼,都分析了,剩下,還有幾個附加功能源碼,留著接下來有精力再一點點回補吧。當然,對于下一步的深入,就要到AQS的分析了。可見,這里的Worker本身就是一個AQS,在Worker上面調用lock或是unlock方法,都是進入一個內部的阻塞隊列的管理的。其中最最底層,還會涉及到操作系統中線程的同步原語:mutex!接下來,我會分析那個,敬請期待!
轉載于:https://my.oschina.net/UBW/blog/2055052
總結
以上是生活随笔為你收集整理的Java并发编程实战(chapter_3)(线程池ThreadPoolExecutor源码分析)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jstat和jmap使用
- 下一篇: IDEA快捷键(修改成eclipse版)