java aqs源码_Java-AQS源码详解(细节很多!)
ReentrantLock調用lock()時時序圖:
addWaiter方法:
enq方法:自旋
它維護了一個volatile int state(代表共享資源)和一個FIFO線程等待隊列(多線程爭用資源被阻塞時會進入此隊列)。這里volatile是核心關鍵詞,具體volatile的語義,在此不述。state的訪問方式有三種:
getState()
setState()
compareAndSetState()
aqs有兩種資源訪問模式:獨占(ReentrantLock)和共享(CountDownLatch和Semaphore、CyclicBarrier)
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了!接下來開始擼吧。。至于這里雙向鏈表是怎么樣的一個結構,這里就不做多于的描述了,大家可以自行去補充。
首先我們由一張圖開頭,我們要知道AQS其實主要實現的是一個FIFO的雙向鏈表的維護,每個Node其實就是一個等待被釋放的線程,在競爭鎖失敗后,會封裝成Node的形式進入到鏈表尾部。。在了解了最基本的概念后,我們先來看看AQS最經典的應用ReentrantLock的lock方法:
public voidlock() {
sync.lock();// sync主要兩種實現類
}
// 第一種非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**非公平鎖實現的lock方法
*/
final void lock() {
if (compareAndSetState(0, 1))// CAS操作去嘗試將state變為1,也就是獨占狀態
setExclusiveOwnerThread(Thread.currentThread());// 非公平鎖并不會老老實實去排隊,而是一上來就插隊,插不了就只能去排隊了。。
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 第二種公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);// 相比于非公平鎖,就比較守規矩了
}
」
因為非公平和公平就只有這么一個差別,那我就以非公平鎖為切入點了,可以看到在嘗試搶占失敗后,調用acquire方法,ok進入到該方法:
// 此方法是AQS的,但是注意里面的tryAcquire是需要我們的自定義AQS實現的,直接調用AQS的會直接拋出異常UnsupportedOperationException
public final void acquire(intarg) {
if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire是NonfairSync實現的,而他內部又直接調用Sync父類的nonfairTryAcquire方法:
final boolean nonfairTryAcquire(intacquires) {final Thread current =Thread.currentThread();int c =getState();if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);// 直接CAS獨占return true;
}
}else if (current ==getExclusiveOwnerThread()) {int nextc = c +acquires;// 這里很確切的說明了ReentrantLock是一個可重入的鎖if (nextc < 0) //overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);return true;
}return false;
}
上面的方法相信大家應該很快就理解了,在嘗試獨占失敗后,tryAcquire操作返回false,然后這個時候要做的操作相信大家也可以猜到,就是插入到雙向鏈表中,看上面的代碼,第一個操作是addWaiter,于是我們貼出這個方法涉及的源碼:
privateNode addWaiter(Node mode) {
Node node= newNode(Thread.currentThread(), mode);// 在這里首先根據當前線程創建出一個節點
Node pred =tail;// 既然要插入節點,肯定是要插入到最尾部的,先獲取到tail節點if (pred != null) {
node.prev=pred;// 將當前節點的prev和尾部節點關聯 --第五行if(compareAndSetTail(pred, node)) {
pred.next=node;// node和老tail關聯完成returnnode;
}
}
enq(node);returnnode;
}
如果某個線程在插入隊列沒有其他線程干擾的話,enq都不會進去的,直接在CAS設置成tail之后直接返回了,但是實際上,總是會有那么幾個“不長眼”的線程來和你對著干。。。來假設這么一個場景:A線程是tail節點,此時B和C進來,他們都同時進入到第五行那里,也就是你會發現A會有B和C兩個節點的prev指向它,但是下一行的CAS操作是一個原子性操作,所以B和C只能一個成為tail,那么又假設B成功CAS了,也就是B可以直接返回,但是C就比較“悲催”了,它得進入到下一個方法enq,因為此時的鏈表結構很是奇怪,C的prev指向了old tail:A,所以得做一個“修復”結構操作,將C的prev指向B,接下來看enq代碼:
private Node enq(finalNode node) {// 此時沒有成功CAS的C節點“失魂落魄”的走了進來for(;;) {
Node t=tail;if (t == null) { //
if (compareAndSetHead(newNode()))// 如果此時隊列完全為空(第一個線程進來),需要弄一個冗余head節點,之后你會看到作用的。。別急
tail=head;
}else{
node.prev=t;// 此時的C節點要和B節點綁上關系if(compareAndSetTail(t, node)) {
t.next=node;// 關聯完成returnt;
}
}
}
}
此時的C應該是可以回到正軌的,就算此時又一個線程打擾了C的關聯操作而導致CAS失敗,但是因為代碼在for循環里,可以重試,基本上很快就可以回到隊列正軌!!于是我們又可以愉快的進行下一個步驟了,再回到我們熟悉的acquire(有點繞,忘記的往上翻),可以看到addWaiter之后,會將當前節點返回給一個“新面孔”-acquireQueued方法作為參數,我們再看看這個方法是怎么做的:
final boolean acquireQueued(final Node node, intarg) {boolean failed = true;try{boolean interrupted = false;for(;;) {final Node p =node.predecessor();// 當前節點的前置節點if (p == head &&tryAcquire(arg)) {
// 當前置節點為head,那么可以去嘗試獲取鎖,成功的話就調用setHead方法將自己設置為head節點
setHead(node);
p.next= null; //help GC
failed = false;returninterrupted;
}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
// 判斷當前節點是否可以被阻塞,shouldParkAfterFailedAcquire方法為核心
interrupted= true;
}
}finally{if(failed)
cancelAcquire(node);
}
}
可以看到,如果當前節點的上一個節點就是head的話,說明當前可以競爭到鎖的概率會很大,一旦head節點的線程執行完unlock后,當前的state變為0,當前節點就可以進入到setHead方法,但是如果頭節點還在執行中,那么當前節點只能老老實實的進入到shouldParkAfterFailedAcquire方法內部,來決定當前節點是否應該能被阻塞:
private static booleanshouldParkAfterFailedAcquire(Node pred, Node node) {
//static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;static final int PROPAGATE = -3;
int ws =pred.waitStatus;// 在這里終于有用上這個變量了if (ws ==Node.SIGNAL)/*在這里我打算用一個很易于理解的方式來講述這個SIGNAL值有什么用:
相信大家都有過排隊的經歷,在這里服務窗口相當于鎖,每個人過來時,發現窗口有其他人在,所以此時只能去隊尾排隊,也就是addWaiter操作,在隊尾后,waitStatus的值默認是0,但是此時剛排進隊的小伙伴,因為隊伍太長,
而且比較累,需要低頭打個盹,但是怕如果瞌睡打過頭了,就不知道什么時候窗口沒人了可以被服務,所以此時小伙伴為了保險,他需要一個可靠的“前置隊友”,也就是他前面的人如果業務辦完了,可以順便回頭來叫醒他,在這里可
以把“委托前面的人,如果結束了麻煩叫醒我,謝謝!”這個操作理解為將prev節點的waitStatus設置為SIGNAL,如果前置節點的waitStatus不是0,需要嘗試設置為SIGNAL,但如果前面的小伙伴已經是SIGNAL了,直接返回,
說明當前小伙伴可以安心的打盹了(被阻塞)!!*/
return true;if (ws > 0) {/** 如果是CANCELLED,代表當前節點已經不需要處理業務了,可以在隊列里直接清除出去,然后隊列重新規整*/
do{
node.prev= pred =pred.prev;
}while (pred.waitStatus > 0);
pred.next=node;
}else{/* 到這一步,就會嘗試去將前置節點設置為SIGNAL,但是有可能會設置失敗或者設置成功,但是不論成功還是失敗,都會返回false,也就是在上面的acquireQueued中,返回false后會繼續for循環里去嘗試獲取鎖,因為小伙伴必須要確定前面的伙伴要靠譜,也就是必須要是SIGNAL
*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}return false;
}
我們再來看看unlock方法,他有直接調用AQS的release方法,而tryRelease方法由自定義的AQS類實現:
public final boolean release(intarg) {if(tryRelease(arg)) {
Node h=head;if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 關鍵操作,如何去喚醒后面的小伙伴return true;
}return false;
}
protected final boolean tryRelease(intreleases) {int c = getState() -releases;if (Thread.currentThread() !=getExclusiveOwnerThread())throw newIllegalMonitorStateException();boolean free = false;if (c == 0) {
free= true;
setExclusiveOwnerThread(null);// 徹底釋放鎖后,將ownerThread設置為null,重置state
}
setState(c);returnfree;
}
tryRelease操作其實很好理解,主要是unparkSuccessor方法:
private voidunparkSuccessor(Node node) {/** 在釋放完鎖后,此時的節點他已經不需要SIGNAL這個狀態了,因為他覺得自己辦完業務了,就可以嘗試去給自己“放個假”,當變成0的時候,后面的小伙伴在shouldPark里就會返回false,代表當前前置節點很有可能不是剛剛初始化
導致的waitStatus == 0,而是前置節點剛釋放完鎖,所以就是head:“我此時已經釋放完鎖了,后面的,你現在就別打盹了,趕緊再去嘗試搶鎖吧!”,于是此時心急的小伙伴就趕緊再進入for循環里嘗試tryAcquire*/
int ws =node.waitStatus;if (ws < 0)
compareAndSetWaitStatus(node, ws,0);/***/Node s=node.next;if (s == null || s.waitStatus > 0) {
s= null;for (Node t = tail; t != null && t != node; t =t.prev)
// 從后往前,找到第一個需要被喚醒的小伙伴,狀態也是必須>=0,至于為什么會==0,因為最后一個節點的status一定是0if (t.waitStatus <= 0)
s=t;
}if (s != null)
LockSupport.unpark(s.thread);// 此時的s就是下一個需要被喚醒的,于是unpark
}
不知道你們有沒有發現,為什么上面的代碼里要從后往前掃描呢,雙向鏈表不是兩邊都可以掃嗎,這個就很有趣了,不知道你們有沒有看到在addWaiter和enq方法里,在將當前節點CAS成tail的前一步,有一個先將node的prev設置為前一個節點,也就是雙向表的建立關系是先后節點連接前節點開始的,但是因為設置兩個節點的關系時不是原子操作,那么就會導致可能prev關系存在,但是next關系不存在的時候,unpark操作就開始需要去遍歷鏈表了,而這個時候,用next操作就很可能會遺漏掉哪個“小伙伴”而導致出現誤“喚醒”!!
總結
以上是生活随笔為你收集整理的java aqs源码_Java-AQS源码详解(细节很多!)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos设置mysql为系统服务_C
- 下一篇: 电影(《我,机器人》,《全民公敌》)的内