聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析
ThreadPoolExecutor是Executor執(zhí)行框架最重要的一個(gè)實(shí)現(xiàn)類,提供了線程池管理和任務(wù)管理是兩個(gè)最基本的能力。這篇通過分析ThreadPoolExecutor的源碼來看看如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)基于生產(chǎn)者消費(fèi)者模型的執(zhí)行器。
?
生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型包含三個(gè)角色:生產(chǎn)者,工作隊(duì)列,消費(fèi)者。對(duì)于ThreadPoolExecutor來說,
1. 生產(chǎn)者是任務(wù)的提交者,是外部調(diào)用ThreadPoolExecutor的線程
2. 工作隊(duì)列是一個(gè)阻塞隊(duì)列的接口,具體的實(shí)現(xiàn)類可以有很多種。BlockingQueue<Runnable> workQueue;
3. 消費(fèi)者是封裝了線程的Worker類的集合。HashSet<Worker> workers = new HashSet<Worker>();
?
?
主要屬性
明確了ThreadPoolExecutor的基本執(zhí)行模型之后,來看下它的幾個(gè)主要屬性:
1.?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));??? 一個(gè)32位的原子整形作為線程池的狀態(tài)控制描述符。低29位作為工作者線程的數(shù)量。所以工作者線程最多有2^29 -1個(gè)。高3位來保持線程池的狀態(tài)。ThreadPoolExecutor總共有5種狀態(tài):
???? *?? RUNNING:? 可以接受新任務(wù)并執(zhí)行
???? *?? SHUTDOWN: 不再接受新任務(wù),但是仍然執(zhí)行工作隊(duì)列中的任務(wù)
???? *?? STOP:???? 不再接受新任務(wù),不執(zhí)行工作隊(duì)列中的任務(wù),并且中斷正在執(zhí)行的任務(wù)
???? *?? TIDYING:? 所有任務(wù)被終止,工作線程的數(shù)量為0,會(huì)去執(zhí)行terminated()鉤子方法
???? *?? TERMINATED: terminated()執(zhí)行結(jié)束
?
下面是一系列ctl這個(gè)變量定義和工具方法
?
?private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2. private final BlockingQueue<Runnable> workQueue; 工作隊(duì)列,采用了BlockingQueue阻塞隊(duì)列的接口,具體實(shí)現(xiàn)類可以按照不同的策略來選擇,比如有邊界的ArrayBlockingQueue,無邊界的LinkedBlockingQueue。
?
3. private final ReentrantLock mainLock = new ReentrantLock();? 控制ThreadPoolExecutor的全局可重入鎖,所有需要同步的操作都要被這個(gè)鎖保護(hù)
4. private final Condition termination = mainLock.newCondition(); mainLock的條件隊(duì)列,來進(jìn)行wait()和notify()等條件操作
5. private final HashSet<Worker> workers = new HashSet<Worker>();? 工作線程集合
6. private volatile ThreadFactory threadFactory; 創(chuàng)建線程的工廠,可以自定義線程創(chuàng)建的邏輯
7. private volatile RejectedExecutionHandler handler;? 拒絕執(zhí)行任務(wù)的處理器,可以自定義拒絕的策略
8. private volatile long keepAliveTime;?? 空閑線程的存活時(shí)間。可以根據(jù)這個(gè)存活時(shí)間來判斷空閑線程是否等待超時(shí),然后采取相應(yīng)的線程回收操作
9. private volatile boolean allowCoreThreadTimeOut;? 是否允許coreThread線程超時(shí)回收
10. private volatile int corePoolSize;? 可存活的線程的最小值。如果設(shè)置了allowCoreThreadTimeOut, 那么corePoolSize的值可以為0。
11. private volatile int maximumPoolSize;? 可存活的線程的最大值
?
工作線程創(chuàng)建和回收策略
ThreadPoolExecutor通過corePoolSize,maximumPoolSize, allowCoreThreadTimeOut,keepAliveTime等幾個(gè)參數(shù)提供一個(gè)靈活的工作線程創(chuàng)建和回收的策略。
創(chuàng)建策略:
1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),不管其他線程是否空閑,都創(chuàng)建新的工作線程來處理新加入的任務(wù)
2. 當(dāng)工作線程數(shù)量大于corePoolSize,小于maximumPoolSize時(shí),只有當(dāng)工作隊(duì)列滿了,才會(huì)創(chuàng)建新的工作線程來處理新加入的任務(wù)。當(dāng)工作隊(duì)列有空余時(shí),只把新任務(wù)加入隊(duì)列
3. 把corePoolSize和maximumPoolSize 設(shè)置成相同的值時(shí),線程池就是一個(gè)固定(fixed)工作線程數(shù)的線程。
回收策略:
1. keepAliveTime變量設(shè)置了空閑工作線程超時(shí)的時(shí)間,當(dāng)工作線程數(shù)量超過了corePoolSize后,空閑的工作線程等待超過了keepAliveTime后,會(huì)被回收。后面會(huì)說怎么確定一個(gè)工作線程是否“空閑”。
2. 如果設(shè)置了allowCoreThreadTimeOut,那么core Thread也可以被回收,即當(dāng)core thread也空閑時(shí),也可以被回收,直到工作線程集合為0。
工作隊(duì)列策略
?
工作隊(duì)列BlockingQueue<Runnable> workQueue 是用來存放提交的任務(wù)的。它有4個(gè)基本的策略,并且根據(jù)不同的阻塞隊(duì)列的實(shí)現(xiàn)類可以引入更多的工作隊(duì)列的策略。
4個(gè)基本策略:
1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),新提交的任務(wù)總是會(huì)由新創(chuàng)建的工作線程執(zhí)行,不入隊(duì)列
2. 當(dāng)工作線程數(shù)量大于corePoolSize,如果工作隊(duì)列沒滿,新提交的任務(wù)就入隊(duì)列
3. 當(dāng)工作線程數(shù)量大于corePoolSize,小于MaximumPoolSize時(shí),如果工作隊(duì)列滿了,新提交的任務(wù)就交給新創(chuàng)建的工作線程,不入隊(duì)列
4. 當(dāng)工作線程數(shù)量大于MaximumPoolSize,并且工作隊(duì)列滿了,那么新提交的任務(wù)會(huì)被拒絕執(zhí)行。具體看采用何種拒絕策略
根據(jù)不同的阻塞隊(duì)列的實(shí)現(xiàn)類,又有幾種額外的策略
1. 采用SynchronousQueue直接將任務(wù)傳遞給空閑的線程執(zhí)行,不額外存儲(chǔ)任務(wù)。這種方式需要無限制的MaximumPoolSize,可以創(chuàng)建無限制的工作線程來處理提交的任務(wù)。這種方式的好處是任務(wù)可以很快被執(zhí)行,適用于任務(wù)到達(dá)時(shí)間大于任務(wù)處理時(shí)間的情況。缺點(diǎn)是當(dāng)任務(wù)量很大時(shí),會(huì)占用大量線程
2. 采用無邊界的工作隊(duì)列LinkedBlockingQueue。這種情況下,由于工作隊(duì)列永遠(yuǎn)不會(huì)滿,那么工作線程的數(shù)量最大就是corePoolSize,因?yàn)楫?dāng)工作線程數(shù)量達(dá)到corePoolSize時(shí),只有工作隊(duì)列滿的時(shí)候才會(huì)創(chuàng)建新的工作線程。這種方式好處是使用的線程數(shù)量是穩(wěn)定的,當(dāng)內(nèi)存足夠大時(shí),可以處理足夠多的請(qǐng)求。缺點(diǎn)是如果任務(wù)直接有依賴,很有可能形成死鎖,因?yàn)楫?dāng)工作線程被消耗完時(shí),不會(huì)創(chuàng)建新的工作現(xiàn)場,只會(huì)把任務(wù)加入工作隊(duì)列。并且可能由于內(nèi)存耗盡引發(fā)內(nèi)存溢出OOM
3. 采用有界的工作隊(duì)列AraayBlockingQueue。這種情況下對(duì)于內(nèi)存資源是可控的,但是需要合理調(diào)節(jié)MaximumPoolSize和工作隊(duì)列的長度,這兩個(gè)值是相互影響的。當(dāng)工作隊(duì)列長度比較小的時(shí),必定會(huì)創(chuàng)建更多的線程。而更多的線程會(huì)引起上下文切換等額外的消耗。當(dāng)工作隊(duì)列大,MaximumPoolSize小的時(shí)候,會(huì)影響吞吐量,并且會(huì)觸發(fā)拒絕機(jī)制。
?
拒絕執(zhí)行策略
當(dāng)Executor處于shutdown狀態(tài)或者工作線程超過MaximumPoolSize并且工作隊(duì)列滿了之后,新提交的任務(wù)將會(huì)被拒絕執(zhí)行。RejectedExecutionHandler接口定義了拒絕執(zhí)行的策略。具體的策略有
CallerRunsPolicy:由調(diào)用者線程來執(zhí)行被拒絕的任務(wù),屬于同步執(zhí)行
AbortPolicy:中止執(zhí)行,拋出RejectedExecutionException異常
DiscardPolicy:丟棄任務(wù)
DiscardOldestPolicy:丟棄最老的任務(wù)
?
?public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
?
工作線程Worker的設(shè)計(jì)
工作線程沒有直接使用Thread,而是采用了Worker類封裝了Thread,目的是更好地進(jìn)行中斷控制。Worker直接繼承了AbstractQueuedSynchronizer來進(jìn)行同步操作,它實(shí)現(xiàn)了一個(gè)不可重入的互斥結(jié)構(gòu)。當(dāng)它的state屬性為0時(shí)表示unlock,state為1時(shí)表示lock。任務(wù)執(zhí)行時(shí)必須在lock狀態(tài)的保護(hù)下,防止出現(xiàn)同步問題。因此當(dāng)Worker處于lock狀態(tài)時(shí),表示它正在運(yùn)行,當(dāng)它處于unlock狀態(tài)時(shí),表示它“空閑”。當(dāng)它空閑超過keepAliveTime時(shí),就有可能被回收。
Worker還實(shí)現(xiàn)了Runnable接口, 執(zhí)行它的線程是Worker包含的Thread對(duì)象,在Worker的構(gòu)造函數(shù)可以看到Thread創(chuàng)建時(shí),把Worker對(duì)象傳遞給了它。
?
?private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 把Worker對(duì)象作為Runnable的實(shí)例傳遞給了新創(chuàng)建Thread對(duì)象
?this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
Worker被它的線程執(zhí)行時(shí),run方法調(diào)用了ThreadPoolExecutor的runWorker方法。
1. wt指向當(dāng)前執(zhí)行Worker的run方法的線程,也就是指向了Worker包含的工作線程對(duì)象
2. task指向Worker包含的firstTask對(duì)象,表示當(dāng)前要執(zhí)行的任務(wù)
3. 當(dāng)task不為null或者從工作隊(duì)列中取到了新任務(wù),那么先加鎖w.lock表示正在運(yùn)行任務(wù)。在真正開始執(zhí)行task.run()之前,先判斷線程池的狀態(tài)是否已經(jīng)STOP,如果是,就中斷Worker的線程。
4. 一旦判斷當(dāng)前線程不是STOP并且工作線程沒有中斷。那么就開始執(zhí)行task.run()了。Worker的interruptIfStarted方法可以中斷這個(gè)Worker的線程,從而中斷正在執(zhí)行任務(wù)。
5.?beforeExecute(wt, task)和afterExecute(wt,task)是兩個(gè)鉤子方法,支持在任務(wù)真正開始執(zhí)行前就行擴(kuò)展。
?
?final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
工作線程Worker創(chuàng)建和回收的源碼
首先看一下ThreadPoolExecutor的execute方法,這個(gè)方式是任務(wù)提交的入口。可以看到它的邏輯符合之前說的工作線程創(chuàng)建的基本策略
1. 當(dāng)工作線程數(shù)量小于corePoolSize時(shí),通過addWorker(command,true)來新建工作線程處理新建的任務(wù),不入工作隊(duì)列
2. 當(dāng)工作線程數(shù)量大于等于corePoolSize時(shí),先入隊(duì)列,使用的是BlockingQueue的offer方法。當(dāng)工作線程數(shù)量為0時(shí),還會(huì)通過addWorker(null, false)添加一個(gè)新的工作線程
3. 當(dāng)工作隊(duì)列滿了并且工作線程數(shù)量在corePoolSize和MaximumPoolSize之間,就創(chuàng)建新的工作線程去執(zhí)行新添加的任務(wù)。當(dāng)工作線程數(shù)量超過了MaximumPoolSize,就拒絕任務(wù)。
?
?public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到addWorker方法是創(chuàng)建Worker工作線程的所在。
1. retry這個(gè)循環(huán)判斷線程池的狀態(tài)和當(dāng)前工作線程數(shù)量的邊界。如果允許創(chuàng)建工作現(xiàn)場,首先修改ctl變量表示的工作線程的數(shù)量
2. 把工作線程添加到workers集合中的操作要在mainLock這個(gè)鎖的保護(hù)下進(jìn)行。所有和ThreadPoolExecutor狀態(tài)相關(guān)的操作都要在mainLock鎖的保護(hù)下進(jìn)行
3. w = new Worker(firstTask); 創(chuàng)建Worker實(shí)例,把firstTask作為它當(dāng)前的任務(wù)。firstTask為null時(shí)表示先只創(chuàng)建Worker線程,然后去工作隊(duì)列中取任務(wù)執(zhí)行
4. 把新創(chuàng)建的Worker實(shí)例加入到workers集合,修改相關(guān)統(tǒng)計(jì)變量。
5. 當(dāng)加入集合成功后,開始啟動(dòng)這個(gè)Worker實(shí)例。啟動(dòng)的方法是調(diào)用Worker封裝的Thread的start()方法。之前說了,這個(gè)Thread對(duì)應(yīng)的Runnable是Worker本身,會(huì)去調(diào)用Worker的run方法,然后調(diào)用ThreadPoolExecutor的runWorker方法。在runWorker方法中真正去執(zhí)行任務(wù)。
?
?private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
工作線程回收的方法是processWorkerExit(),它在runWorker方法執(zhí)行結(jié)束的時(shí)候被調(diào)用。之前說了空閑的工作線程可能會(huì)在keepAliveTime時(shí)間之后被回收。這個(gè)邏輯隱含在runWorker方法和getTask方法中,會(huì)在下面說如何從工作隊(duì)列取任務(wù)時(shí)說明。processWorkerExit方法單純只是處理工作線程的回收。
1. 結(jié)合runWorker方法看,如果Worker執(zhí)行task.run()的時(shí)候拋出了異常,那么completedAbruptly為true,需要從workers集合中把這個(gè)工作線程移除掉。
2. 如果是completedAbruptly為true,并且線程池不是STOP狀態(tài),那么就創(chuàng)建一個(gè)新的Worker工作線程
3. 如果是completedAbruptly為false,并且線程池不是STOP狀態(tài),首先檢查是否allowCoreThreadTimeout,如果運(yùn)行,那么最少線程數(shù)可以為0,否則是corePoolSize。如果最少線程數(shù)為0,并且工作隊(duì)列不為空,那么最小值為1。最后檢查當(dāng)前的工作線程數(shù)量,如果小于最小值,就創(chuàng)建新的工作線程。
?
?private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
任務(wù)的獲取
工作線程從工作隊(duì)列中取任務(wù)的代碼在getTask方法中
1. timed變量表示是否要計(jì)時(shí),當(dāng)計(jì)時(shí)超過keepAliveTime后還沒取到任務(wù),就返回null。結(jié)合runWorker方法可以知道,當(dāng)getTask返回null時(shí),該Worker線程會(huì)被回收,這就是如何回收空閑工作線程的方法。
timed變量當(dāng)allowCoreThreadTimeout為true或者當(dāng)工作線程數(shù)大于corePoolSize時(shí)為true。
2. 如果timed為true,就用BlockingQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法來計(jì)時(shí)從隊(duì)頭取任務(wù),否則直接用take()方法從隊(duì)頭取任務(wù)
?
?private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
線程池的關(guān)閉
線程池有SHUTDOWN, STOP, TIDYING, TERMINATED這幾個(gè)狀態(tài)和線程池關(guān)閉相關(guān)。通常我們把關(guān)閉分為優(yōu)雅的關(guān)閉和強(qiáng)制立刻關(guān)閉。
所謂優(yōu)雅的關(guān)閉就是調(diào)用shutdown()方法,線程池進(jìn)入SHUTDOWN狀態(tài),不在接收新的任務(wù),會(huì)把工作隊(duì)列的任務(wù)執(zhí)行完畢后再結(jié)束。
強(qiáng)制立刻關(guān)閉就是調(diào)用shutdownNow()方法,線程池直接進(jìn)入STOP狀態(tài),會(huì)中斷正在執(zhí)行的工作線程,清空工作隊(duì)列。
1. 在shutdown方法中,先設(shè)置線程池狀態(tài)為SHUTDOWN,然后先去中斷空閑的工作線程,再調(diào)用onShutdown鉤子方法。最后tryTerminate()
2. 在shutdownNow方法中,先設(shè)置線程池狀態(tài)為STOP,然后先中斷所有的工作線程,再清空工作隊(duì)列。最后tryTerminate()。這個(gè)方法會(huì)把工作隊(duì)列中的任務(wù)返回給調(diào)用者處理。
?
?public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
? public List<Runnable> shutdownNow() {
??????? List<Runnable> tasks;
??????? final ReentrantLock mainLock = this.mainLock;
??????? mainLock.lock();
??????? try {
??????????? checkShutdownAccess();
??????????? advanceRunState(STOP);
??????????? interruptWorkers();
??????????? tasks = drainQueue();
??????? } finally {
??????????? mainLock.unlock();
??????? }
??????? tryTerminate();
??????? return tasks;
??? }
interruptIdleWorkers方法會(huì)去中斷空閑的工作線程,所謂空閑的工作線程即沒有上鎖的Worker。
而interruptWorkers方法直接去中斷所有的Worker,調(diào)用Worker.interruptIfStarted()方法
?
?private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
?private void interruptWorkers() {
??????? final ReentrantLock mainLock = this.mainLock;
??????? mainLock.lock();
??????? try {
??????????? for (Worker w : workers)
??????????????? w.interruptIfStarted();
??????? } finally {
??????????? mainLock.unlock();
??????? }
??? }
? void interruptIfStarted() {
??????????? Thread t;
??????????? if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
??????????????? try {
??????????????????? t.interrupt();
??????????????? } catch (SecurityException ignore) {
??????????????? }
??????????? }
??????? }
tryTerminate方法會(huì)嘗試終止線程池,根據(jù)線程池的狀態(tài),在相應(yīng)狀態(tài)會(huì)中斷空閑工作線程,調(diào)用terminated()鉤子方法,設(shè)置狀態(tài)為TERMINATED。
?
?final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
最后說明一下,JVM的守護(hù)進(jìn)程只有當(dāng)所有派生出來的線程都結(jié)束后才會(huì)退出,使用ThreadPoolExecutor線程池時(shí),如果有的任務(wù)一直執(zhí)行,并且不響應(yīng)中斷,那么會(huì)一直占用線程,那么JVM也會(huì)一直工作,不會(huì)退出。
總結(jié)
以上是生活随笔為你收集整理的聊聊高并发(四十)解析java.util.concurrent各个组件(十六) ThreadPoolExecutor源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(三十六)Java内存模型那些
- 下一篇: Breeze库API总结(Spark线性