条件队列java_Java并发系列(4)AbstractQueuedSynchronizer源码分析之条件队列
AbstractQueuedSynchronizer內部維護了一個同步狀態和兩個排隊區,這兩個排隊區分別是同步隊列和條件隊列。
我們還是拿公共廁所做比喻,同步隊列是主要的排隊區,如果公共廁所沒開放,所有想要進入廁所的人都得在這里排隊。而條件隊列主要是為條件等待設置的,我們想象一下如果一個人通過排隊終于成功獲取鎖進入了廁所,但在方便之前發現自己沒帶手紙,碰到這種情況雖然很無奈,但是它也必須接受這個事實,這時它只好乖乖的出去先準備好手紙(進入條件隊列等待),當然在出去之前還得把鎖給釋放了好讓其他人能夠進來,在準備好了手紙(條件滿足)之后它又得重新回到同步隊列中去排隊。
當然進入房間的人并不都是因為沒帶手紙,可能還有其他一些原因必須中斷操作先去條件隊列中去排隊,所以條件隊列可以有多個,依不同的等待條件而設置不同的條件隊列。條件隊列是一條單向鏈表,Condition接口定義了條件隊列中的所有操作,AbstractQueuedSynchronizer內部的ConditionObject類實現了Condition接口。
下面我們看看Condition接口都定義了哪些操作。
public interfaceCondition {//響應線程中斷的條件等待
void await() throwsInterruptedException;//不響應線程中斷的條件等待
voidawaitUninterruptibly();//設置相對時間的條件等待(不進行自旋)
long awaitNanos(long nanosTimeout) throwsInterruptedException;//設置相對時間的條件等待(進行自旋)
boolean await(long time, TimeUnit unit) throwsInterruptedException;//設置絕對時間的條件等待
boolean awaitUntil(Date deadline) throwsInterruptedException;//喚醒條件隊列中的頭結點
voidsignal();//喚醒條件隊列的所有結點
voidsignalAll();
}
Condition接口雖然定義了這么多方法,但總共就分為兩類,以await開頭的是線程進入條件隊列等待的方法,以signal開頭的是將條件隊列中的線程“喚醒”的方法。這里要注意的是,調用signal方法可能喚醒線程也可能不會喚醒線程,什么時候會喚醒線程這得看情況,后面會講到,但是調用signal方法一定會將線程從條件隊列中移到同步隊列尾部。
這里為了敘述方便,我們先暫時不糾結這么多,統一稱signal方法為喚醒條件隊列線程的操作。大家注意看一下,await方法分為5種,分別是響應線程中斷等待,不響應線程中斷等待,設置相對時間不自旋等待,設置相對時間自旋等待,設置絕對時間等待;signal方法只有2種,分別是只喚醒條件隊列頭結點和喚醒條件隊列所有結點的操作。
同一類的方法基本上是相通的,由于篇幅所限,我們不可能也不需要將這些方法全部仔細的講到,只需要將一個代表方法搞懂了再看其他方法就能夠觸類旁通。所以在本文中我只會細講await方法和signal方法,其他方法不細講但會貼出源碼來以供大家參考。
1. 響應線程中斷的條件等待
//響應線程中斷的條件等待
public final void await() throwsInterruptedException {if(Thread.interrupted())throw newInterruptedException();//1.將當前線程添加到條件隊列尾部
Node node =addConditionWaiter();//2.在進入條件等待之前先完全釋放鎖
int savedState =fullyRelease(node);int interruptMode = 0;//3.線程一直在while循環里進行條件等待
while (!isOnSyncQueue(node)) {//進行條件等待的線程都在這里被掛起, 線程被喚醒的情況有以下幾種://1.同步隊列的前繼結點已取消//2.設置同步隊列的前繼結點的狀態為SIGNAL失敗//3.前繼結點釋放鎖后喚醒當前結點
LockSupport.park(this);//4.檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}//4結點移出條件隊列后的操作//1.線程醒來后就會以獨占模式獲取鎖并檢查是否被中斷
if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)
interruptMode=REINTERRUPT;//2.clean up if cancelled
if (node.nextWaiter != null)
unlinkCancelledWaiters();//3.根據中斷模式進行響應的中斷處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
當線程調用await方法的時候,首先會將當前線程包裝成node結點放入條件隊列尾部。在addConditionWaiter方法中,如果發現條件隊列尾結點已取消就會調用unlinkCancelledWaiters方法將條件隊列所有的已取消結點清空。
第一步:將當前線程添加到條件隊列尾部
privateNode addConditionWaiter() {
Node t=lastWaiter;//If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus !=Node.CONDITION) {
unlinkCancelledWaiters();
t=lastWaiter;
}
Node node= newNode(Thread.currentThread(), Node.CONDITION);if (t == null)
firstWaiter=node;elset.nextWaiter=node;
lastWaiter=node;returnnode;
}
這步操作是插入結點的準備工作,那么確保了尾結點的狀態也是CONDITION之后,就會新建一個node結點將當前線程包裝起來然后放入條件隊列尾部。注意,這個過程只是將結點添加到同步隊列尾部而沒有掛起線程哦。
第二步:完全將鎖釋放
//完全釋放鎖
final intfullyRelease(Node node) {boolean failed = true;try{//獲取當前的同步狀態
int savedState =getState();//使用當前的同步狀態去釋放鎖
if(release(savedState)) {
failed= false;//如果釋放鎖成功就返回當前同步狀態
returnsavedState;
}else{throw newIllegalMonitorStateException();
}
}finally{//保證沒有成功釋放鎖就將該結點設置為取消狀態
if(failed)
node.waitStatus=Node.CANCELLED;
}
}
//釋放鎖的操作(獨占模式)
public final boolean release(intarg) {//撥動密碼鎖, 看看是否能夠開鎖
if(tryRelease(arg)) {//獲取head結點
Node h =head;//如果head結點不為空并且等待狀態不等于0就去喚醒后繼結點
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);return true;
}return false;
}
//喚醒后繼結點
private voidunparkSuccessor(Node node) {//獲取給定結點的等待狀態
int ws =node.waitStatus;//將等待狀態更新為0
if (ws < 0)
compareAndSetWaitStatus(node, ws,0);//獲取給定結點的后繼結點
Node s =node.next;//后繼結點為空或者等待狀態為取消狀態
if (s == null || s.waitStatus > 0) {
s= null;//從后向前遍歷隊列找到第一個不是取消狀態的結點
for (Node t = tail; t != null && t != node; t =t.prev)if (t.waitStatus <= 0)
s=t;
}//喚醒給定結點后面首個不是取消狀態的結點
if (s != null)
LockSupport.unpark(s.thread);
}
將當前線程包裝成結點添加到條件隊列尾部后,緊接著就調用fullyRelease方法釋放鎖。注意,方法名為fullyRelease也就這步操作會完全的釋放鎖,因為鎖是可重入的,所以在進行條件等待前需要將鎖全部釋放了,不然的話別人就獲取不了鎖了。如果釋放鎖失敗的話就會拋出一個運行時異常,如果成功釋放了鎖的話就返回之前的同步狀態。
第三步:進行條件等待
//線程一直在while循環里進行條件等待
final booleanisOnSyncQueue(Node node) {if (node.waitStatus == Node.CONDITION || node.prev == null)return false;if (node.next != null) //If has successor, it must be on queue
return true;returnfindNodeFromTail(node);
}
//線程一直在while循環里進行條件等待
while (!isOnSyncQueue(node)) {//進行條件等待的線程都在這里被掛起, 線程被喚醒的情況有以下幾種://1.同步隊列的前繼結點已取消//2.設置同步隊列的前繼結點的狀態為SIGNAL失敗//3.前繼結點釋放鎖后喚醒當前結點
LockSupport.park(this);//當前線程醒來后立馬檢查是否被中斷, 如果是則代表結點取消條件等待, 此時需要將結點移出條件隊列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {break;
}
}//檢查條件等待時的線程中斷情況
private intcheckInterruptWhileWaiting(Node node) {//中斷請求在signal操作之前:THROW_IE//中斷請求在signal操作之后:REINTERRUPT//期間沒有收到任何中斷請求:0
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
//將取消條件等待的結點從條件隊列轉移到同步隊列中
final booleantransferAfterCancelledWait(Node node) {//如果這步CAS操作成功的話就表明中斷發生在signal方法之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//狀態修改成功后就將該結點放入同步隊列尾部
enq(node);return true;
}//到這里表明CAS操作失敗, 說明中斷發生在signal方法之后
while (!isOnSyncQueue(node)) {//如果sinal方法還沒有將結點轉移到同步隊列, 就通過自旋等待一下
Thread.yield();
}return false;
}
在以上兩個操作完成了之后就會進入while循環,可以看到while循環里面首先調用LockSupport.park(this)將線程掛起了,所以線程就會一直在這里阻塞。在調用signal方法后僅僅只是將結點從條件隊列轉移到同步隊列中去,至于會不會喚醒線程需要看情況。
如果轉移結點時發現同步隊列中的前繼結點已取消,或者是更新前繼結點的狀態為SIGNAL失敗,這兩種情況都會立即喚醒線程,否則的話在signal方法結束時就不會去喚醒已在同步隊列中的線程,而是等到它的前繼結點來喚醒。當然,線程阻塞在這里除了可以調用signal方法喚醒之外,線程還可以響應中斷,如果線程在這里收到中斷請求就會繼續往下執行。
可以看到線程醒來后會馬上檢查是否是由于中斷喚醒的還是通過signal方法喚醒的,如果是因為中斷喚醒的同樣會將這個結點轉移到同步隊列中去,只不過是通過調用transferAfterCancelledWait方法來實現的。最后執行完這一步之后就會返回中斷情況并跳出while循環。
第四步:結點移出條件隊列后的操作
//1.線程醒來后就會以獨占模式獲取鎖并檢查是否被中斷
if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)
interruptMode=REINTERRUPT;//2.clean up if cancelled
if (node.nextWaiter != null)//這步操作主要為防止線程在signal之前中斷而導致沒與條件隊列斷絕聯系
unlinkCancelledWaiters();//3.根據中斷模式進行響應的中斷處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//結束條件等待后根據中斷情況做出相應處理
private void reportInterruptAfterWait(intinterruptMode)throwsInterruptedException {//如果中斷模式是THROW_IE就拋出異常
if (interruptMode ==THROW_IE)throw newInterruptedException();//如果中斷模式是REINTERRUPT就自己掛起
else if (interruptMode ==REINTERRUPT)
selfInterrupt();
}
當線程終止了while循環也就是條件等待后,就會回到同步隊列中。不管是因為調用signal方法回去的還是因為線程中斷導致的,結點最終都會在同步隊列中。這時就會調用acquireQueued方法執行在同步隊列中獲取鎖的操作,這個方法我們在獨占模式這一篇已經詳細的講過。
也就是說,結點從條件隊列出來后又是乖乖的走獨占模式下獲取鎖的那一套,等這個結點再次獲得鎖之后,就會調用reportInterruptAfterWait方法來根據這期間的中斷情況做出相應的響應。如果中斷發生在signal方法之前,interruptMode就為THROW_IE,再次獲得鎖后就拋出異常;如果中斷發生在signal方法之后,interruptMode就為REINTERRUPT,再次獲得鎖后就重新中斷。
2.不響應線程中斷的條件等待
//不響應線程中斷的條件等待
public final voidawaitUninterruptibly() {//將當前線程添加到條件隊列尾部
Node node =addConditionWaiter();//完全釋放鎖并返回當前同步狀態
int savedState =fullyRelease(node);boolean interrupted = false;//結點一直在while循環里進行條件等待
while (!isOnSyncQueue(node)) {//條件隊列中所有的線程都在這里被掛起
LockSupport.park(this);//線程醒來發現中斷并不會馬上去響應
if(Thread.interrupted())
interrupted= true;
}if (acquireQueued(node, savedState) ||interrupted)//在這里響應所有中斷請求, 滿足以下兩個條件之一就會將自己掛起//1.線程在條件等待時收到中斷請求//2.線程在acquireQueued方法里收到中斷請求
selfInterrupt();
}
//以不可中斷方式獲取鎖(獨占模式)
final boolean acquireQueued(final Node node, intarg) {boolean failed = true;try{boolean interrupted = false;for(;;) {//獲取給定結點的前繼結點的引用
final Node p =node.predecessor();//如果當前結點是同步隊列的第一個結點, 就嘗試去獲取鎖
if (p == head &&tryAcquire(arg)) {//將給定結點設置為head結點
setHead(node);//為了幫助垃圾收集, 將上一個head結點的后繼清空
p.next = null; //help GC//設置獲取成功狀態
failed = false;//返回中斷的狀態, 整個循環執行到這里才是出口
returninterrupted;
}//否則說明鎖的狀態還是不可獲取, 這時判斷是否可以掛起當前線程//如果判斷結果為真則掛起當前線程, 否則繼續循環,//在這期間線程不響應中斷
if(shouldParkAfterFailedAcquire(p, node)//掛起當前線程
&&parkAndCheckInterrupt())
interrupted= true;
}
}finally{//在最后確保如果獲取失敗就取消獲取
if(failed)
cancelAcquire(node);
}
}
3.設置相對時間的條件等待(進行自旋)
//設置定時條件等待(相對時間), 進行自旋等待
public final long awaitNanos(longnanosTimeout)throwsInterruptedException {//如果線程被中斷則拋出異常
if(Thread.interrupted())throw newInterruptedException();//將當前線程添加到條件隊列尾部
Node node =addConditionWaiter();//在進入條件等待之前先完全釋放鎖
int savedState =fullyRelease(node);final long deadline = System.nanoTime() +nanosTimeout;int interruptMode = 0;while (!isOnSyncQueue(node)) {//判斷超時時間是否用完了
if (nanosTimeout <= 0L) {//如果已超時就需要執行取消條件等待操作
transferAfterCancelledWait(node);break;
}//將當前線程掛起一段時間, 線程在這期間可能被喚醒, 也可能自己醒來
if (nanosTimeout >=spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//線程醒來后先檢查中斷信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
nanosTimeout= deadline -System.nanoTime();
}//線程醒來后就會以獨占模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)
interruptMode=REINTERRUPT;if (node.nextWaiter != null)
unlinkCancelledWaiters();if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);//返回剩余時間
return deadline -System.nanoTime();
}
//將取消條件等待的結點從條件隊列轉移到同步隊列中
final booleantransferAfterCancelledWait(Node node) {//如果這步CAS操作成功的話就表明中斷發生在signal方法之前
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//狀態修改成功后就將該結點放入同步隊列尾部
enq(node);return true;
}//到這里表明CAS操作失敗, 說明中斷發生在signal方法之后
while (!isOnSyncQueue(node))//如果sinal方法還沒有將結點轉移到同步隊列, 就通過自旋等待一下
Thread.yield();return false;
}
4.設置相對時間的條件等待(進行自旋)
//設置定時條件等待(相對時間), 進行自旋等待
public final boolean await(longtime, TimeUnit unit)throwsInterruptedException {//獲取超時時間的毫秒數
long nanosTimeout =unit.toNanos(time);//如果線程被中斷則拋出異常
if(Thread.interrupted())throw newInterruptedException();//將當前線程添加條件隊列尾部
Node node =addConditionWaiter();//在進入條件等待之前先完全釋放鎖
int savedState =fullyRelease(node);final long deadline = System.nanoTime() +nanosTimeout;boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {//如果超時就需要執行取消條件等待操作
if (nanosTimeout <= 0L) {
timedout=transferAfterCancelledWait(node);break;
}//如果超時時間大于自旋時間, 就將線程掛起一段時間
if (nanosTimeout >=spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//線程醒來后先檢查中斷信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
nanosTimeout= deadline -System.nanoTime();
}if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)
interruptMode=REINTERRUPT;if (node.nextWaiter != null)
unlinkCancelledWaiters();if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);//返回是否超時標志
return !timedout;
}
5.設置絕對時間的條件等待
//設置定時條件等待(絕對時間)
public final booleanawaitUntil(Date deadline)throwsInterruptedException {//獲取絕對時間的毫秒數
long abstime =deadline.getTime();//如果線程被中斷則拋出異常
if(Thread.interrupted())throw newInterruptedException();//將當前線程添加到條件隊列尾部
Node node =addConditionWaiter();//在進入條件等待之前先完全釋放鎖
int savedState =fullyRelease(node);boolean timedout = false;int interruptMode = 0;while (!isOnSyncQueue(node)) {//如果超時就需要執行取消條件等待操作
if (System.currentTimeMillis() >abstime) {
timedout=transferAfterCancelledWait(node);break;
}//將線程掛起一段時間, 期間線程可能被喚醒, 也可能到了點自己醒來
LockSupport.parkUntil(this, abstime);//線程醒來后先檢查中斷信息
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;
}//線程醒來后就會以獨占模式獲取鎖
if (acquireQueued(node, savedState) && interruptMode !=THROW_IE)
interruptMode=REINTERRUPT;//由于transferAfterCancelledWait方法沒有把nextWaiter置空, 所有這里要再清理一遍
if (node.nextWaiter != null)
unlinkCancelledWaiters();//根據中斷模式進行響應的中斷處理
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);//返回是否超時標志
return !timedout;
}
6.喚醒條件隊列中的頭結點
//喚醒條件隊列中的下一個結點
public final voidsignal() {//判斷當前線程是否持有鎖
if (!isHeldExclusively())throw newIllegalMonitorStateException();
Node first=firstWaiter;if (first != null)//喚醒條件隊列中的頭結點
doSignal(first);
}
//喚醒條件隊列中的頭結點
private voiddoSignal(Node first) {do{//1.將firstWaiter引用向后移動一位
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter= null;//2.將頭結點的后繼結點引用置空
first.nextWaiter = null;//3.將頭結點轉移到同步隊列, 轉移完成后有可能喚醒線程//4.如果transferForSignal操作失敗就去喚醒下一個結點
} while (!transferForSignal(first) &&(first= firstWaiter) != null);
}
//將指定結點從條件隊列轉移到同步隊列中
final booleantransferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/
//將等待狀態從CONDITION設置為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//如果更新狀態的操作失敗就直接返回false//可能是transferAfterCancelledWait方法先將狀態改變了, 導致這步CAS操作失敗
return false;//將該結點添加到同步隊列尾部
Node p =enq(node);int ws =p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//出現以下情況就會喚醒當前線程//1.前繼結點是取消狀態//2.更新前繼結點的狀態為SIGNAL操作失敗
LockSupport.unpark(node.thread);return true;
}
可以看到signal方法最終的核心就是去調用transferForSignal方法,在transferForSignal方法中首先會用CAS操作將結點的狀態從CONDITION設置為0,然后再調用enq方法將該結點添加到同步隊列尾部。
我們再看到接下來的if判斷語句,這個判斷語句主要是用來判斷什么時候會去喚醒線程,出現這兩種情況就會立即喚醒線程,一種是當發現前繼結點的狀態是取消狀態時,還有一種是更新前繼結點的狀態失敗時。
這兩種情況都會馬上去喚醒線程,否則的話就僅僅只是將結點從條件隊列中轉移到同步隊列中就完了,而不會立馬去喚醒結點中的線程。signalAll方法也大致類似,只不過它是去循環遍歷條件隊列中的所有結點,并將它們轉移到同步隊列,轉移結點的方法也還是調用transferForSignal方法。
7.喚醒條件隊列的所有結點
//喚醒條件隊列后面的全部結點
public final voidsignalAll() {//判斷當前線程是否持有鎖
if (!isHeldExclusively())throw newIllegalMonitorStateException();
Node first=firstWaiter;if (first != null)//喚醒條件隊列的所有結點
doSignalAll(first);
}
//喚醒條件隊列的所有結點
private voiddoSignalAll(Node first) {//先把頭結點和尾結點的引用置空
lastWaiter = firstWaiter = null;do{//先獲取后繼結點的引用
Node next =first.nextWaiter;//把即將轉移的結點的后繼引用置空
first.nextWaiter = null;//將結點從條件隊列轉移到同步隊列
transferForSignal(first);//將引用指向下一個結點
first =next;
}while (first != null);
}
至此,我們整個的AbstractQueuedSynchronizer源碼分析就結束了,相信通過這四篇的分析,大家能更好的掌握并理解AQS。
這個類確實很重要,因為它是其他很多同步類的基石,由于筆者水平和表達能力有限,如果哪些地方沒有表述清楚的,或者理解不到位的,還請廣大讀者們能夠及時指正,共同探討學習。
總結
以上是生活随笔為你收集整理的条件队列java_Java并发系列(4)AbstractQueuedSynchronizer源码分析之条件队列的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php本地文件打包代码,PHP实战:几行
- 下一篇: 上传附件_留学落户|上传附件预审时一定一