写的很好!细数 Java 线程池的原理
今天我們就來詳細講解一下Java的線程池,首先我們從最核心的ThreadPoolExecutor類中的方法講起,然后再講述它的實現原理,接著給出了它的使用示例,最后討論了一下如何合理配置線程池的大小。
Java 中的 ThreadPoolExecutor 類
java.uitl.concurrent.ThreadPoolExecutor?類是線程池中最核心的一個類,因此如果要透徹地了解Java 中的線程池,必須先了解這個類。下面我們來看一下 ThreadPoolExecutor 類的具體實現源碼。
在 ThreadPoolExecutor 類中提供了四個構造方法:
public?class?ThreadPoolExecutor?extends?AbstractExecutorService?{.....public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,ThreadFactory?threadFactory);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,RejectedExecutionHandler?handler);public?ThreadPoolExecutor(int?corePoolSize,int?maximumPoolSize,long?keepAliveTime,TimeUnit?unit,BlockingQueue<Runnable>?workQueue,ThreadFactory?threadFactory,RejectedExecutionHandler?handler);... }從上面的代碼可以得知,ThreadPoolExecutor 繼承了?AbstractExecutorService?類,并提供了四個構造器,事實上,通過觀察每個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工作。
下面解釋下一下構造器中各個參數的含義:
-
corePoolSize:核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()?或者?prestartCoreThread()方法,從這 2 個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建 corePoolSize 個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到 corePoolSize 后,就會把到達的任務放到緩存隊列當中;
-
maximumPoolSize:線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程;
-
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于 corePoolSize 時,keepAliveTime 才會起作用,直到線程池中的線程數不大于 corePoolSize,即當線程池中的線程數大于 corePoolSize 時,如果一個線程空閑的時間達到 keepAliveTime,則會終止,直到線程池中的線程數不超過 corePoolSize。但是如果調用了?allowCoreThreadTimeOut(boolean)?方法,在線程池中的線程數不大于 corePoolSize 時,keepAliveTime 參數也會起作用,直到線程池中的線程數為0;
-
unit:參數 keepAliveTime 的時間單位,有 7 種取值,在?TimeUnit?類中有 7 種靜態屬性:
-
workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:
ArrayBlockingQueue 和 PriorityBlockingQueue 使用較少,一般使用 LinkedBlockingQueue 和 Synchronous。線程池的排隊策略與 BlockingQueue 有關。
-
threadFactory:線程工廠,主要用來創建線程;
-
handler:表示當拒絕處理任務時的策略,有以下四種取值:
具體參數的配置與線程池的關系將在下一節講述。
從上面給出的?ThreadPoolExecutor?類的代碼可以知道,ThreadPoolExecutor 繼承了AbstractExecutorService,我們來看一下 AbstractExecutorService 的實現:
public?abstract?class?AbstractExecutorService?implements?ExecutorService?{protected?<T>?RunnableFuture<T>?newTaskFor(Runnable?runnable,?T?value)?{?};protected?<T>?RunnableFuture<T>?newTaskFor(Callable<T>?callable)?{?};public?Future<?>?submit(Runnable?task)?{};public?<T>?Future<T>?submit(Runnable?task,?T?result)?{?};public?<T>?Future<T>?submit(Callable<T>?task)?{?};private?<T>?T?doInvokeAny(Collection<??extends?Callable<T>>?tasks,boolean?timed,?long?nanos)throws?InterruptedException,?ExecutionException,?TimeoutException?{};public?<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException,?ExecutionException?{};public?<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException,?ExecutionException,?TimeoutException?{};public?<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException?{};public?<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException?{}; }?AbstractExecutorService 是一個抽象類,它實現了 ExecutorService 接口。
我們接著看 ExecutorService 接口的實現:
public?interface?ExecutorService?extends?Executor?{void?shutdown();boolean?isShutdown();boolean?isTerminated();boolean?awaitTermination(long?timeout,?TimeUnit?unit)throws?InterruptedException;<T>?Future<T>?submit(Callable<T>?task);<T>?Future<T>?submit(Runnable?task,?T?result);Future<?>?submit(Runnable?task);<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException;<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException;<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks)throws?InterruptedException,?ExecutionException;<T>?T?invokeAny(Collection<??extends?Callable<T>>?tasks,long?timeout,?TimeUnit?unit)throws?InterruptedException,?ExecutionException,?TimeoutException; }而 ExecutorService 又是繼承了 Executor 接口,我們看一下 Executor 接口的實現:
public?interface?Executor?{void?execute(Runnable?command); }到這里,大家應該明白了?ThreadPoolExecutor、AbstractExecutorService、ExecutorService 和 Executor幾個之間的關系了。
Executor 是一個頂層接口,在它里面只聲明了一個方法?execute(Runnable),返回值為 void,參數為Runnable 類型,從字面意思可以理解,就是用來執行傳進去的任務的;
然后 ExecutorService 接口繼承了 Executor 接口,并聲明了一些方法:submit、invokeAll、invokeAny 以及shutDown 等;
抽象類AbstractExecutorService實現了 ExecutorService 接口,基本實現了 ExecutorService 中聲明的所有方法;
然后ThreadPoolExecutor?繼承了類 AbstractExecutorService。
在 ThreadPoolExecutor 類中有幾個非常重要的方法:
execute() submit() shutdown() shutdownNow()execute()?方法實際上是 Executor 中聲明的方法,在 ThreadPoolExecutor 進行了具體的實現,這個方法是ThreadPoolExecutor 的核心方法,通過這個方法可以向線程池提交一個任務,交由線程池去執行。
submit()方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現,在ThreadPoolExecutor 中并沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和 execute() 方法不同,它能夠返回任務執行的結果,去看 submit() 方法的實現,會發現它實際上還是調用的 execute() 方法,只不過它利用了?Future?來獲取任務執行結果(Future相關內容將在下一篇講述)。
shutdown()和shutdownNow()是用來關閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount() 等獲取與線程池相關屬性的方法,有興趣的朋友可以自行查閱 API。
深入剖析線程池實現原理
在上一節我們從宏觀上介紹了 ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現原理,將從下面幾個方面講解:
1.線程池狀態
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊策略
5.任務拒絕策略
6.線程池的關閉
7.線程池容量的動態調整
線程池狀態
在 ThreadPoolExecutor 中定義了一個 volatile 變量,另外定義了幾個 static final 變量表示線程池的各個狀態:
volatile?int?runState; static?final?int?RUNNING????=?0; static?final?int?SHUTDOWN???=?1; static?final?int?STOP???????=?2; static?final?int?TERMINATED?=?3;runState 表示當前線程池的狀態,它是一個 volatile 變量用來保證線程之間的可見性;
下面的幾個 static final 變量表示 runState 可能的幾個取值。
當創建線程池后,初始時,線程池處于?RUNNING?狀態;
如果調用了 shutdown() 方法,則線程池處于?SHUTDOWN?狀態,此時線程池不能夠接受新的任務,它會等待所有任務執行完畢;
如果調用了shutdownNow()方法,則線程池處于STOP狀態,此時線程池不能接受新的任務,并且會去嘗試終止正在執行的任務;
當線程池處于 SHUTDOWN 或 STOP 狀態,并且所有工作線程已經銷毀,任務緩存隊列已經清空或執行結束后,線程池被設置為TERMINATED狀態。
任務的執行
在了解將任務提交給線程池到任務執行完畢整個過程之前,我們先來看一下 ThreadPoolExecutor 類中其他的一些比較重要成員變量:
private?final?BlockingQueue<Runnable>?workQueue;??????????????//任務緩存隊列,用來存放等待執行的任務 private?final?ReentrantLock?mainLock?=?new?ReentrantLock();???//線程池的主要狀態鎖,對線程池狀態(比如線程池大小//、runState等)的改變都要使用這個鎖 private?final?HashSet<Worker>?workers?=?new?HashSet<Worker>();??//用來存放工作集private?volatile?long??keepAliveTime;????//線程存貨時間??? private?volatile?boolean?allowCoreThreadTimeOut;???//是否允許為核心線程設置存活時間 private?volatile?int???corePoolSize;?????//核心池的大小(即線程池中的線程數目大于這個參數時,提交的任務會被放進任務緩存隊列) private?volatile?int???maximumPoolSize;???//線程池最大能容忍的線程數private?volatile?int???poolSize;???????//線程池中當前的線程數private?volatile?RejectedExecutionHandler?handler;?//任務拒絕策略private?volatile?ThreadFactory?threadFactory;???//線程工廠,用來創建線程private?int?largestPoolSize;???//用來記錄線程池中曾經出現過的最大線程數private?long?completedTaskCount;???//用來記錄已經執行完畢的任務個數每個變量的作用都已經標明出來了,這里要重點解釋一下 corePoolSize、maximumPoolSize、largestPoolSize 三個變量。
corePoolSize 在很多地方被翻譯成核心池大小,其實我的理解這個就是線程池的大小。舉個簡單的例子:
假如有一個工廠,工廠里面有 10 個工人,每個工人同時只能做一件任務。
因此只要當 10 個工人中有工人是空閑的,來了任務就分配給空閑的工人做;
當 10 個工人都有任務在做時,如果還來了任務,就把任務進行排隊等待;
如果說新任務數目增長的速度遠遠大于工人做任務的速度,那么此時工廠主管可能會想補救措施,比如重新招4個臨時工人進來;
然后就將任務也分配給這 4 個臨時工人做;
如果說著 14 個工人做任務的速度還是不夠,此時工廠主管可能就要考慮不再接收新的任務或者拋棄前面的一些任務了。
當這 14 個工人當中有人空閑時,而新任務增長的速度又比較緩慢,工廠主管可能就考慮辭掉 4 個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。
這個例子中的 corePoolSize 就是 10,而 maximumPoolSize 就是14(10+4)。
也就是說 corePoolSize 就是線程池大小,maximumPoolSize 在我看來是線程池的一種補救措施,即任務量突然過大時的一種補救措施。
不過為了方便理解,在本文后面還是將 corePoolSize 翻譯成核心池大小。
largestPoolSize 只是一個用來起記錄作用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關系。
下面我們進入正題,看一下任務從提交到最終執行完畢經歷了哪些過程。
在 ThreadPoolExecutor 類中,最核心的任務提交方法是 execute() 方法,雖然通過 submit 也可以提交任務,但是實際上 submit 方法里面最終調用的還是 execute() 方法,所以我們只需要研究 execute() 方法的實現原理即可:
public?void?execute(Runnable?command)?{if?(command?==?null)throw?new?NullPointerException();if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))?{if?(runState?==?RUNNING?&&?workQueue.offer(command))?{if?(runState?!=?RUNNING?||?poolSize?==?0)ensureQueuedTaskHandled(command);}else?if?(!addIfUnderMaximumPoolSize(command))reject(command);?//?is?shutdown?or?saturated} }上面的代碼可能看起來不是那么容易理解,下面我們一句一句解釋:
首先,判斷提交的任務 command 是否為 null,若是 null,則拋出空指針異常;
接著是這句,這句要好好理解一下:
if?(poolSize?>=?corePoolSize?||?!addIfUnderCorePoolSize(command))由于是或條件運算符,所以先計算前半部分的值,如果線程池中當前線程數不小于核心池大小,那么就會直接進入下面的if語句塊了。
如果線程池中當前線程數小于核心池大小,則接著執行后半部分,也就是執行
addIfUnderCorePoolSize(command)
如果執行完 addIfUnderCorePoolSize 這個方法返回 false,則繼續執行下面的 if 語句塊,否則整個方法就直接執行完畢了。
如果執行完 addIfUnderCorePoolSize 這個方法返回 false,然后接著判斷:
if?(runState?==?RUNNING?&&?workQueue.offer(command))
如果當前線程池處于 RUNNING 狀態,則將任務放入任務緩存隊列;如果當前線程池不處于 RUNNING 狀態或者任務放入緩存隊列失敗,則執行:
addIfUnderMaximumPoolSize(command)
如果執行 addIfUnderMaximumPoolSize 方法失敗,則執行 reject() 方法進行任務拒絕處理。
回到前面:
if?(runState?==?RUNNING?&&?workQueue.offer(command))
這句的執行,如果說當前線程池處于RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
if?(runState?!=?RUNNING?||?poolSize?==?0)
這句判斷是為了防止在將此任務添加進任務緩存隊列的同時其他線程突然調用 shutdown 或者 shutdownNow 方法關閉了線程池的一種應急措施。如果是這樣就執行:
ensureQueuedTaskHandled(command)
進行應急處理,從名字可以看出是保證添加到任務緩存隊列中的任務得到處理。
我們接著看 2 個關鍵方法的實現:addIfUnderCorePoolSize 和 addIfUnderMaximumPoolSize:
private?boolean?addIfUnderCorePoolSize(Runnable?firstTask)?{Thread?t?=?null;final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{if?(poolSize?<?corePoolSize?&&?runState?==?RUNNING)t?=?addThread(firstTask);????????//創建線程去執行firstTask任務???}?finally?{mainLock.unlock();}if?(t?==?null)return?false;t.start();return?true; }這個是?addIfUnderCorePoolSize?方法的具體實現,從名字可以看出它的意圖就是當低于核心吃大小時執行的方法。
下面看其具體實現,首先獲取到鎖,因為這地方涉及到線程池狀態的變化,先通過 if 語句判斷當前線程池中的線程數目是否小于核心池大小,有朋友也許會有疑問:前面在 execute() 方法中不是已經判斷過了嗎,只有線程池當前線程數目小于核心池大小才會執行 addIfUnderCorePoolSize 方法的,為何這地方還要繼續判斷?原因很簡單,前面的判斷過程中并沒有加鎖,因此可能在execute方法判斷的時候 poolSize 小于 corePoolSize,而判斷完之后,在其他線程中又向線程池提交了任務,就可能導致 poolSize 不小于 corePoolSize 了,所以需要在這個地方繼續判斷。然后接著判斷線程池的狀態是否為 RUNNING,原因也很簡單,因為有可能在其他線程中調用了shutdown 或者 shutdownNow 方法。然后就是執行
t?=?addThread(firstTask);
這個方法也非常關鍵,傳進去的參數為提交的任務,返回值為Thread類型。然后接著在下面判斷 t 是否為空,為空則表明創建線程失敗(即 poolSize >= corePoolSize 或者 runState 不等于 RUNNING),否則調用 t.start() 方法啟動線程。
我們來看一下addThread方法的實現:
private?Thread?addThread(Runnable?firstTask)?{Worker?w?=?new?Worker(firstTask);Thread?t?=?threadFactory.newThread(w);??//創建一個線程,執行任務???if?(t?!=?null)?{w.thread?=?t;????????????//將創建的線程的引用賦值為w的成員變量???????workers.add(w);int?nt?=?++poolSize;?????//當前線程數加1???????if?(nt?>?largestPoolSize)largestPoolSize?=?nt;}return?t; }在 addThread 方法中,首先用提交的任務創建了一個 Worker 對象,然后調用線程工廠 threadFactory 創建了一個新的線程 t,然后將線程t的引用賦值給了 Worker 對象的成員變量 thread,接著通過 workers.add(w) 將 Worker對象添加到工作集當中。
下面我們看一下 Worker 類的實現:
private?final?class?Worker?implements?Runnable?{private?final?ReentrantLock?runLock?=?new?ReentrantLock();private?Runnable?firstTask;volatile?long?completedTasks;Thread?thread;Worker(Runnable?firstTask)?{this.firstTask?=?firstTask;}boolean?isActive()?{return?runLock.isLocked();}void?interruptIfIdle()?{final?ReentrantLock?runLock?=?this.runLock;if?(runLock.tryLock())?{try?{if?(thread?!=?Thread.currentThread())thread.interrupt();}?finally?{runLock.unlock();}}}void?interruptNow()?{thread.interrupt();}private?void?runTask(Runnable?task)?{final?ReentrantLock?runLock?=?this.runLock;runLock.lock();try?{if?(runState?<?STOP?&&Thread.interrupted()?&&runState?>=?STOP)boolean?ran?=?false;beforeExecute(thread,?task);???//beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶可以根據//自己需要重載這個方法和后面的afterExecute方法來進行一些統計信息,比如某個任務的執行時間等???????????try?{task.run();ran?=?true;afterExecute(task,?null);++completedTasks;}?catch?(RuntimeException?ex)?{if?(!ran)afterExecute(task,?ex);throw?ex;}}?finally?{runLock.unlock();}}public?void?run()?{try?{Runnable?task?=?firstTask;firstTask?=?null;while?(task?!=?null?||?(task?=?getTask())?!=?null)?{runTask(task);task?=?null;}}?finally?{workerDone(this);???//當任務隊列中沒有任務時,進行清理工作???????}} }它實際上實現了 Runnable 接口,因此上面的 Thread t = threadFactory.newThread(w) 效果跟下面這句的效果基本一樣:
Thread?t?=?new?Thread(w);
相當于傳進去了一個Runnable 任務,在線程t中執行這個 Runnable。
既然 Worker 實現了 Runnable 接口,那么自然最核心的方法便是 run() 方法了:
public?void?run()?{try?{Runnable?task?=?firstTask;firstTask?=?null;while?(task?!=?null?||?(task?=?getTask())?!=?null)?{runTask(task);task?=?null;}}?finally?{workerDone(this);} }從 run 方法的實現可以看出,它首先執行的是通過構造器傳進來的任務 firstTask,在調用 runTask() 執行完firstTask 之后,在 while 循環里面不斷通過 getTask() 去取新的任務來執行,那么去哪里取呢?自然是從任務緩存隊列里面去取,getTask 是 ThreadPoolExecutor 類中的方法,并不是 Worker 類中的方法,下面是 getTask 方法的實現:
Runnable?getTask()?{for?(;;)?{try?{int?state?=?runState;if?(state?>?SHUTDOWN)return?null;Runnable?r;if?(state?==?SHUTDOWN)??//?Help?drain?queuer?=?workQueue.poll();else?if?(poolSize?>?corePoolSize?||?allowCoreThreadTimeOut)?//如果線程數大于核心池大小或者允許為核心池線程設置空閑時間,//則通過poll取任務,若等待一定的時間取不到任務,則返回nullr?=?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);elser?=?workQueue.take();if?(r?!=?null)return?r;if?(workerCanExit())?{????//如果沒取到任務,即r為null,則判斷當前的worker是否可以退出if?(runState?>=?SHUTDOWN)?//?Wake?up?othersinterruptIdleWorkers();???//中斷處于空閑狀態的workerreturn?null;}//?Else?retry}?catch?(InterruptedException?ie)?{//?On?interruption,?re-check?runState}} }在 getTask 中,先判斷當前線程池狀態,如果 runState 大于 SHUTDOWN(即為 STOP 或者 TERMINATED),則直接返回 null。
如果 runState 為 SHUTDOWN 或者 RUNNING,則從任務緩存隊列取任務。
如果當前線程池的線程數大于核心池大小 corePoolSize 或者允許為核心池中的線程設置空閑存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待一定的時間,如果取不到任務就返回 null。
然后判斷取到的任務 r 是否為 null,為 null 則通過調用?workerCanExit()?方法來判斷當前 worker 是否可以退出,我們看一下 workerCanExit() 的實現:
private?boolean?workerCanExit()?{final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();boolean?canExit;//如果runState大于等于STOP,或者任務緩存隊列為空了//或者??允許為核心池線程設置空閑存活時間并且線程池中的線程數目大于1try?{canExit?=?runState?>=?STOP?||workQueue.isEmpty()?||(allowCoreThreadTimeOut?&&poolSize?>?Math.max(1,?corePoolSize));}?finally?{mainLock.unlock();}return?canExit; }也就是說如果線程池處于 STOP 狀態、或者任務隊列已為空或者允許為核心池線程設置空閑存活時間并且線程數大于 1 時,允許 worker 退出。如果允許 worker 退出,則調用interruptIdleWorkers()中斷處于空閑狀態的 worker,我們看一下 interruptIdleWorkers() 的實現:
void?interruptIdleWorkers()?{final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{for?(Worker?w?:?workers)??//實際上調用的是worker的interruptIfIdle()方法w.interruptIfIdle();}?finally?{mainLock.unlock();} }從實現可以看出,它實際上調用的是 worker 的?interruptIfIdle()方法,在 worker 的 interruptIfIdle() 方法中:
void?interruptIfIdle()?{final?ReentrantLock?runLock?=?this.runLock;if?(runLock.tryLock())?{????//注意這里,是調用tryLock()來獲取鎖的,因為如果當前worker正在執行任務,鎖已經被獲取了,是無法獲取到鎖的//如果成功獲取了鎖,說明當前worker處于空閑狀態try?{if?(thread?!=?Thread.currentThread())??thread.interrupt();}?finally?{runLock.unlock();}} }這里有一個非常巧妙的設計方式,假如我們來設計線程池,可能會有一個任務分派線程,當發現有線程空閑時,就從任務緩存隊列中取一個任務交給空閑線程執行。但是在這里,并沒有采用這樣的方式,因為這樣會要額外地對任務分派線程進行管理,無形地會增加難度和復雜度,這里直接讓執行完任務的線程去任務緩存隊列里面取任務來執行。
我們再看 addIfUnderMaximumPoolSize 方法的實現,這個方法的實現思想和 addIfUnderCorePoolSize 方法的實現思想非常相似,唯一的區別在于 addIfUnderMaximumPoolSize 方法是在線程池中的線程數達到了核心池大小并且往任務隊列中添加任務失敗的情況下執行的:
private?boolean?addIfUnderMaximumPoolSize(Runnable?firstTask)?{Thread?t?=?null;final?ReentrantLock?mainLock?=?this.mainLock;mainLock.lock();try?{if?(poolSize?<?maximumPoolSize?&&?runState?==?RUNNING)t?=?addThread(firstTask);}?finally?{mainLock.unlock();}if?(t?==?null)return?false;t.start();return?true; }看到沒有,其實它和 addIfUnderCorePoolSize 方法的實現基本一模一樣,只是 if 語句判斷條件中的 poolSize < maximumPoolSize 不同而已。
到這里,大部分朋友應該對任務提交給線程池之后到被執行的整個過程有了一個基本的了解,下面總結一下
1)首先,要清楚 corePoolSize 和 maximumPoolSize 的含義;
2)其次,要知道 Worker 是用來起到什么作用的;
3)要知道任務提交給線程池之后的處理策略,這里總結一下主要有 4 點:
-
如果當前線程池中的線程數目小于 corePoolSize,則每來一個任務,就會創建一個線程去執行這個任務;
-
如果當前線程池中的線程數目 >= corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閑線程將其取出去執行;若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務;
-
如果當前線程池中的線程數目達到 maximumPoolSize,則會采取任務拒絕策略進行處理;
-
如果線程池中的線程數量大于 corePoolSize 時,如果某線程空閑時間超過 keepAliveTime,線程將被終止,直至線程池中的線程數目不大于 corePoolSize ;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過 keepAliveTime ,線程也會被終止。
線程池中的線程初始化
默認情況下,創建線程池之后,線程池中是沒有線程的,需要提交任務之后才會創建線程。
在實際中如果需要線程池創建之后立即創建線程,可以通過以下兩個方法辦到:
-
prestartCoreThread():初始化一個核心線程;
-
prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現:
public?boolean?prestartCoreThread()?{return?addIfUnderCorePoolSize(null);?//注意傳進去的參數是null }public?int?prestartAllCoreThreads()?{int?n?=?0;while?(addIfUnderCorePoolSize(null))//注意傳進去的參數是null++n;return?n; }注意上面傳進去的參數是 null,根據第 2 小節的分析可知如果傳進去的參數為 null,則最后執行線程會阻塞在getTask方法中的
r?=?workQueue.take();
即等待任務隊列中有任務。
任務緩存隊列及排隊策略
在前面我們多次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。
workQueue 的類型為?BlockingQueue<Runnable>,通常可以取下面三種類型:
1)ArrayBlockingQueue:基于數組的先進先出隊列,此隊列創建時必須指定大小;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創建時沒有指定此隊列大小,則默認為Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
任務拒絕策略
當線程池的任務緩存隊列已滿并且線程池中的線程數目達到 maximumPoolSize,如果還有任務到來就會采取任務拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程) ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務線程池的關閉
ThreadPoolExecutor 提供了兩個方法,用于線程池的關閉,分別是 shutdown() 和 shutdownNow(),其中:
-
shutdown():不會立即終止線程池,而是要等所有任務緩存隊列中的任務都執行完后才終止,但再也不會接受新的任務
-
shutdownNow():立即終止線程池,并嘗試打斷正在執行的任務,并且清空任務緩存隊列,返回尚未執行的任務
線程池容量的動態調整
ThreadPoolExecutor 提供了動態調整線程池容量大小的方法:setCorePoolSize() 和 setMaximumPoolSize(),
-
setCorePoolSize:設置核心池大小
-
setMaximumPoolSize:設置線程池最大能創建的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor 進行線程賦值,還可能立即創建新的線程來執行任務。
使用示例
前面我們討論了關于線程池的實現原理,這一節我們來看一下它的具體使用:
public?class?Test?{public?static?void?main(String[]?args)?{???ThreadPoolExecutor?executor?=?new?ThreadPoolExecutor(5,?10,?200,?TimeUnit.MILLISECONDS,new?ArrayBlockingQueue<Runnable>(5));for(int?i=0;i<15;i++){MyTask?myTask?=?new?MyTask(i);executor.execute(myTask);System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());}executor.shutdown();} }class?MyTask?implements?Runnable?{private?int?taskNum;public?MyTask(int?num)?{this.taskNum?=?num;}@Overridepublic?void?run()?{System.out.println("正在執行task?"+taskNum);try?{Thread.currentThread().sleep(4000);}?catch?(InterruptedException?e)?{e.printStackTrace();}System.out.println("task?"+taskNum+"執行完畢");} }執行結果:
正在執行task?0 線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task?1 線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task?2 線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task?3 線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0 正在執行task?4 線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0 線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task?10 線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task?11 線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task?12 線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task?13 線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0 正在執行task?14 task?3執行完畢 task?0執行完畢 task?2執行完畢 task?1執行完畢 正在執行task?8 正在執行task?7 正在執行task?6 正在執行task?5 task?4執行完畢 task?10執行完畢 task?11執行完畢 task?13執行完畢 task?12執行完畢 正在執行task?9 task?14執行完畢 task?8執行完畢 task?5執行完畢 task?7執行完畢 task?6執行完畢 task?9執行完畢從執行結果可以看出,當線程池中線程的數目大于 5 時,便將任務放入任務緩存隊列里面,當任務緩存隊列滿了之后,便創建新的線程。如果上面程序中,將 for 循環中改成執行 20 個任務,就會拋出任務拒絕異常了。
不過在 java doc中,并不提倡我們直接使用 ThreadPoolExecutor,而是使用 Executors 類中提供的幾個靜態方法來創建線程池:
Executors.newCachedThreadPool();????????//創建一個緩沖池,緩沖池容量大小為Integer.MAX_VALUE Executors.newSingleThreadExecutor();???//創建容量為1的緩沖池 Executors.newFixedThreadPool(int);????//創建固定容量大小的緩沖池下面是這三個靜態方法的具體實現;
public?static?ExecutorService?newFixedThreadPool(int?nThreads)?{return?new?ThreadPoolExecutor(nThreads,?nThreads,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>()); } public?static?ExecutorService?newSingleThreadExecutor()?{return?new?FinalizableDelegatedExecutorService(new?ThreadPoolExecutor(1,?1,0L,?TimeUnit.MILLISECONDS,new?LinkedBlockingQueue<Runnable>())); } public?static?ExecutorService?newCachedThreadPool()?{return?new?ThreadPoolExecutor(0,?Integer.MAX_VALUE,60L,?TimeUnit.SECONDS,new?SynchronousQueue<Runnable>()); }從它們的具體實現來看,它們實際上也是調用了 ThreadPoolExecutor,只不過參數都已配置好了。
newFixedThreadPoo l創建的線程池 corePoolSize 和 maximumPoolSize 值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor 將 corePoolSize 和 maximumPoolSize 都設置為1,也使用的 LinkedBlockingQueue;
newCachedThreadPool 將 corePoolSize 設置為0,將 maximumPoolSize 設置為 Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就創建線程運行,當線程空閑超過60秒,就銷毀線程。
實際中,如果 Executors 提供的三個靜態方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor 的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,如果 ThreadPoolExecutor 達不到要求,可以自己繼承 ThreadPoolExecutor 類進行重寫。
如何合理配置線程池的大小
本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。
一般需要根據任務的類型來配置線程池大小:
如果是 CPU 密集型任務,就需要盡量壓榨 CPU,參考值可以設為?NCPU+1
如果是 IO 密集型任務,參考值可以設置為2*NCPU
當然,這只是一個參考值,具體的設置還需要根據實際情況進行調整,比如可以先將線程池大小設置為參考值,再觀察任務運行情況和系統負載、資源利用率來進行適當調整。
總結
以上是生活随笔為你收集整理的写的很好!细数 Java 线程池的原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 真强啊!建议每一位Java程序员都读读D
- 下一篇: 一篇来自前端同学对后端接口的吐槽:痛!