Java多线程——FutureTask源码解析
一個(gè)很常見的多線程案例是,我們安排主線程作為分配任務(wù)和匯總的一方,然后將計(jì)算工作切分為多個(gè)子任務(wù),安排多個(gè)線程去計(jì)算,最后所有的計(jì)算結(jié)果由主線程進(jìn)行匯總。比如,歸并排序,字符頻率的統(tǒng)計(jì)等等。
我們知道Runnable是不返回計(jì)算結(jié)果的,如果想利用多線程的話,只能存儲到一個(gè)實(shí)例的內(nèi)部變量里面進(jìn)行交互,但存在一個(gè)問題,如何判斷是否已經(jīng)計(jì)算完成了。用Thread.join是一個(gè)方案,但是我們只能依次等待一個(gè)線程結(jié)束后處理一個(gè)線程,如果線程1恰好特別慢,則后續(xù)已經(jīng)完成的線程不能被及時(shí)處理。我們希望能夠獲知線程的執(zhí)行狀態(tài),發(fā)現(xiàn)哪個(gè)線程處理完就先統(tǒng)計(jì)它的計(jì)算結(jié)果。可以考慮使用Callable和FutureTask來完成。
先說Callable它是一個(gè)功能接口,它只有一個(gè)方法V call(),計(jì)算一個(gè)結(jié)果,失敗的話拋出一個(gè)異常。和Runnable不同的是,它不能直接交給Thread來執(zhí)行,所以需要一個(gè)別的類來封裝它與Runnable,這個(gè)類就是FutureTask。FutureTask是一個(gè)類,繼承了RunnableFuture,而RunnableFuture是一個(gè)多繼承接口,它繼承了Runnable和 Future,所以FutureTask是可以作為實(shí)現(xiàn)了Runnable的實(shí)例交給Thread執(zhí)行。
從內(nèi)部變量來看,含有一個(gè)下層Callable實(shí)例,一個(gè)狀態(tài)表示,一個(gè)返回結(jié)果,以及對運(yùn)行線程的記錄
/*** 任務(wù)的運(yùn)行狀態(tài),最初是NEW。運(yùn)行狀態(tài)只在set, setException和cancel方法中過度到最終狀態(tài)。* 在完成過程中,狀態(tài)可能發(fā)生轉(zhuǎn)移到COMPLETING(在設(shè)置結(jié)果時(shí))或者INTERRUPTING(僅當(dāng)中斷運(yùn)行來滿足cancel(true)時(shí))。* 從這些中間狀態(tài)轉(zhuǎn)移到最終狀態(tài)使用成本更低有序/懶惰寫入,因?yàn)橹凳俏ㄒ坏那抑蟛荒茉傩薷摹?* Possible state transitions:* NEW -> COMPLETING -> NORMAL* NEW -> COMPLETING -> EXCEPTIONAL* NEW -> CANCELLED* NEW -> INTERRUPTING -> INTERRUPTED*/private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;/** 下層的callable,運(yùn)行后為null */private Callable<V> callable;/** get()操作返回的結(jié)果或者拋出的異常*/private Object outcome; // 不是volatile,由reads/writes狀態(tài)來保護(hù)/** 運(yùn)行callable的線程,在run()通過CAS修改*/private volatile Thread runner;/** 等待線程的Treiber堆棧 */private volatile WaitNode waiters;構(gòu)造函數(shù)
構(gòu)造函數(shù)總共有兩種重載,第一種直接給出Callable實(shí)例
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // 確保callable的可見性}第二種,給出Runnable實(shí)例和期望的返回結(jié)果,如果Runnable實(shí)例運(yùn)行成功則返回的是result
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);//創(chuàng)建一個(gè)callablethis.state = NEW; // 確保callable的可見性}run
run方法是Thread運(yùn)行FutureTask內(nèi)任務(wù)的接口。首先,根據(jù)最上方的進(jìn)入條件可以看出,只有成功競爭到修改runnerOffset成功的線程才能執(zhí)行后續(xù)方法,而搜索整個(gè)類文件,可以發(fā)現(xiàn)只有在run結(jié)束后才會重置為null,所以同一時(shí)間只能有一個(gè)線程執(zhí)行run方法成功。然后要檢查state和callable的狀態(tài),因?yàn)閞un會將它們修改。調(diào)用callable.call方法獲取返回結(jié)果,成功的話設(shè)置結(jié)果,失敗的話設(shè)置返回結(jié)果為異常。無論是否執(zhí)行成功,runner會被重置為null。
public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;//狀態(tài)必須是NEW且修改執(zhí)行線程成功,否則直接返回,避免被多個(gè)線程同時(shí)執(zhí)行try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();//調(diào)用callable.call方法獲取返回結(jié)果ran = true;//執(zhí)行成功} catch (Throwable ex) {result = null;ran = false;//執(zhí)行失敗setException(ex);//設(shè)置返回異常并喚醒等待線程解除阻塞}if (ran)set(result);//設(shè)置結(jié)果}} finally {//runner直到狀態(tài)設(shè)置完成不能為null來避免并發(fā)調(diào)用run()runner = null;//在將runner設(shè)置為null后需要重新讀取state避免漏掉中斷int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}set方法先修改state為COMPLETING,然后將outcome設(shè)置為剛才計(jì)算出來的結(jié)果,最后設(shè)置state為NORMAL,并調(diào)用finishCompletion。這個(gè)方法移除并通知所有等待的線程解除阻塞,調(diào)用done(),并將callable設(shè)為null。done方法默認(rèn)是什么也不做。
protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終狀態(tài)finishCompletion();//喚醒等待線程,將callable設(shè)為null}}private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);//如果線程被park阻塞,解除阻塞}WaitNode next = q.next;if (next == null)break;q.next = null; // 取消連接幫助gcq = next;}break;}}done();//未重寫時(shí)什么也不做callable = null; // to reduce footprint減少覆蓋區(qū)}setException跟set邏輯上基本一樣,除了設(shè)置返回結(jié)果是Throwable對象
protected void setException(Throwable t) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {//修改狀態(tài)outcome = t;//結(jié)果為ThrowableUNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最終狀態(tài)finishCompletion();}}get
get方法如果FutureTask已經(jīng)執(zhí)行完成則返回結(jié)果,否則會等待并阻止線程調(diào)度。等待時(shí)長可以輸入,單位為納秒,不輸入為不限時(shí)等待,限時(shí)等待超時(shí)仍然沒有完成會拋出異常。
public V get() throws InterruptedException, ExecutionException {int s = state;if (s <= COMPLETING)s = awaitDone(false, 0L);//等待完成return report(s);//檢查時(shí)返回結(jié)果還是拋出異常}public V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {if (unit == null)throw new NullPointerException();int s = state;if (s <= COMPLETING &&(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)//限時(shí)等待完成throw new TimeoutException();return report(s);}awaitDone這個(gè)方法會阻塞當(dāng)前線程(get方法的調(diào)用線程)的調(diào)度并增加等待結(jié)點(diǎn),阻塞時(shí)長根據(jù)輸入的時(shí)間長度決定。如果執(zhí)行Callable任務(wù)的線程完成了運(yùn)行或者被中斷,則會解除棧中等待結(jié)點(diǎn)對應(yīng)線程的阻塞。然后會根據(jù)執(zhí)行結(jié)果決定是否要拋出異常還是返回執(zhí)行完成的結(jié)果。
private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;//是否完成入棧for (;;) {if (Thread.interrupted()) {//檢查線程是否已經(jīng)被中斷removeWaiter(q);//移除被中斷的等待結(jié)點(diǎn)throw new InterruptedException();}int s = state;if (s > COMPLETING) {//已經(jīng)完成if (q != null)q.thread = null;//移除等待return s;}else if (s == COMPLETING) // cannot time out yet還沒有超時(shí)Thread.yield();//已經(jīng)在賦值,所以只需讓出時(shí)間片等待賦值完成//下方都是還在沒有完成call方法的情況else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);//q加入到棧的最前方else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {//超時(shí)了removeWaiter(q);//移除超時(shí)的等待結(jié)點(diǎn)return state;}LockSupport.parkNanos(this, nanos);//阻塞當(dāng)前線程nanos納秒}elseLockSupport.park(this);//阻塞當(dāng)前線程}}cancel
cancel輸入的參數(shù)表示如果當(dāng)前還在運(yùn)行中是否要中斷執(zhí)行線程,如果輸入?yún)?shù)是false則只有線程已經(jīng)執(zhí)行完成或者拋出異常或者已經(jīng)被中斷時(shí)可以把狀態(tài)修改為CANCELLED,如果是true則會中斷線程并將狀態(tài)改為INTERRUPTED。所以,cancel在該任務(wù)已經(jīng)結(jié)束或者已被取消,或者競爭修改狀態(tài)失敗時(shí)都會失敗。如果中斷成功,會釋放所有被阻塞的等待線程。
public boolean cancel(boolean mayInterruptIfRunning) {if (!(state == NEW &&UNSAFE.compareAndSwapInt(this, stateOffset, NEW,mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))return false;//已經(jīng)完成或者被取消或者競爭取消失敗返回falsetry { // in case call to interrupt throws exceptionif (mayInterruptIfRunning) {try {Thread t = runner;if (t != null)t.interrupt();} finally { // final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);}}} finally {finishCompletion();}return true;}狀態(tài)檢查
非常簡單的兩個(gè)方法。因?yàn)镃ANCELLED是state中最大的,所以只有cancel方法成功才會是這種狀態(tài)。而isDone只要不是還在運(yùn)行或者還沒有被執(zhí)行就是返回true。
public boolean isCancelled() {return state >= CANCELLED;}public boolean isDone() {return state != NEW;}簡單的使用示例
public class CallableTest implements Callable<Integer>{private int start;public CallableTest(int start) {this.start = start;}@Overridepublic Integer call() throws Exception {Thread.sleep(500);return start + 1;}public static void main(String args[]) throws InterruptedException, ExecutionException{long start = System.currentTimeMillis();FutureTask<Integer> task1 = new FutureTask<>(new CallableTest(2));new Thread(task1).start();FutureTask<Integer> task2 = new FutureTask<>(new CallableTest(4));new Thread(task2).start();System.out.println(task1.get() + task2.get());//8long end = System.currentTimeMillis();System.out.println(end - start);//506} }總結(jié)
以上是生活随笔為你收集整理的Java多线程——FutureTask源码解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MugLife app是一款可以将静态照
- 下一篇: Java编译器优化与运行期优化技术浅析