ThreadPoolExecutor(二)——execute
?1.execute方法
/*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}看方法注釋:
執行給定的任務在將來的某個時間。該任務可能會用一個新的線程來執行,也有可能用線程池中一個已有的線程來執行。
如果該executor已經被shutdown了,或者因為容量已滿,任務不會被執行,通過RejectedExecutionHandler來處理剩下流程。
將來的某個時間執行:說的是任務會入隊列,然后根據線程池目前的各項指標狀況來決定何時執行。
新的線程或已有線程:根據線程池的各項指標狀況來決定是喚醒線程池中一個已有的阻塞線程來執行還是new一個Thread來執行任務。
2.execute方法的三個步驟
看方法內部注釋:
1.如果當前正在run的線程數小于corePoolSize,那么就調用addWorker方法來new一個線程用來執行傳入的任務。
對應代碼片:
int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}2.如果addWorker方法執行失敗了,任務要入隊列,如果成功入隊列了,需要做double check來處理一些極端情況,比如線程池是否shutdown了等等。
對應代碼片:
if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}3.如果任務無法入隊列,再次嘗試addWorker,這次是用正在run的線程數和maximumPoolSize比,如果超過了maximumPoolSize則reject任務,說明線程池已經飽和了。
對應代碼片:
else if (!addWorker(command, false))reject(command);3.addWorker
/** Methods for creating, running and cleaning up after workers*//*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked, which requires a* backout of workerCount, and a recheck for termination, in case* the existence of this worker was holding up termination.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}Worker w = new Worker(firstTask);Thread t = w.thread;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}t.start();// It is possible (but unlikely) for a thread to have been// added to workers, but not yet started, during transition to// STOP, which could result in a rare missed interrupt,// because Thread.interrupt is not guaranteed to have any effect// on a non-yet-started Thread (see Thread#interrupt).if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())t.interrupt();return true;}先看注釋:
檢查根據當前線程池的狀態是否允許添加一個新的Worker,如果可以,調整wc(workerCount,以后都用wc表示),代碼塊:
if (compareAndIncrementWorkerCount(c))break retry;如果線程被stop了或者可以shutdown,addWorker方法返回false。
如果thread工廠創建線程失敗,需要a?backout of workerCount(這是個啥?!),代碼塊:
// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}3.1.addWorker局部分析(一)
// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;這個代碼塊的判斷,如果是STOP,TIDYING和TERMINATED這三種狀態,都會返回false。
如果是SHUTDOWN,還要判斷firstTask和workQueue的狀況,如果firstTask不是null,返回false。
如果firstTask是null,判斷workQueue的狀況,workQueue是空的時候,返回false。
這個還要再看看,SHUTDOWN狀態下為什么要判斷firstTask和隊列,是要保證在SHUTDOWN的時候,新添加進來和隊列中剩余的task要正常執行完嗎?
3.2.addWorker局部分析(二)
for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}這個代碼塊比上一個代碼塊更容易理解,是正常流程,當線程池沒有處于RUNNING之外的幾種狀態的時候。
這時候的處理流程就是線程池是否創建線程的正常語義,依次進行下列比較:
1.和CAPACITY比較
如果當前wc超過CAPACITY(這個基本上不可能),返回false。
2.入參core為true,表示
轉存失敗重新上傳取消addWorker的時候,wc還沒到達corePoolSize,和corePoolSize比較
如果超過corePoolSize,返回false。否則原子操作compareAndIncrementWorkerCount修改wc值。
3.入參core為false,表示addWorker的時候隊列已滿,wc和maximumPoolSize比較
如果超過maximumPoolSize,返回false,否則原子操作compareAndIncrementWorkerCount修改wc值。
3.3.addWorker局部分析(三)
final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int c = ctl.get();int rs = runStateOf(c);if (t == null ||(rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null))) {decrementWorkerCount();tryTerminate();return false;}workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;} finally {mainLock.unlock();}這個代碼塊,首先加鎖,整個類用到這個鎖的地方,除了獲取該線程池的一些關鍵參數之外,就是shutdown和terminate等相關操作。
注釋里說,Back out on ThreadFactory failure or if?shut down before lock acquired,需要再看看。
如果這個if判斷沒有走,用該task構建的Worker就可以正常添加到workers這個HashSet中。
4.ctl
ctl是控制線程池狀態的變量,由兩部分組成,runState(高位)和workerCount(低28位),
private static int CAPACITY = (1 << COUNT_BITS) - 1;CAPACITY是1左移COUNT_BITS,然后減一。
?
COUNT_BITS是Integer的位數減去3,即29。
所以CAPACITY是100000000000000000000000000000-1=11111111111111111111111111111,表示低28位。這28位是表示運行的worker個數的。
private static final int RUNNING = -1 << COUNT_BITS;二進制,1111111111111111111111111111111111100000000000000000000000000000,和CAPACITY互補,所以~CAPACITY也是這個值。
?
二進制,100000000000000000000000000000000
?
二進制,1000000000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;二進制,1100000000000000000000000000000000
放在一起對比看,狀態位置看標紅三位:
1111111111111111111111111111111111100000000000000000000000000000,RUNNING
0000000000000000000000000000000000000000000000000000000000000000,SHUTDOWN
0000000000000000000000000000000100000000000000000000000000000000,STOP
0000000000000000000000000000001000000000000000000000000000000000,TIDYING
0000000000000000000000000000001100000000000000000000000000000000,TERMINATED
5.和ctl相關的幾組方法
private static int runStateOf(int c) { return c & ~CAPACITY; }把低28位都清掉了,只拿高位的運行狀態。
private static int workerCountOf(int c) { return c & CAPACITY; }只取低28位,即拿workerCount。
?
?
總結
以上是生活随笔為你收集整理的ThreadPoolExecutor(二)——execute的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 左神---基础提升笔记
- 下一篇: Dear小弟×××,给你们的一封信「社区