AQS 源码流程分析
導讀:?
我們日常開發中,經常會碰到并發的場景,在?Java 中語言體系里,我們會想到 ReentrantLock、CountDownLatch、Semaphore 等工具,但你是否清楚它們內部的實現原理?這些工具都很類似,底層都是基于AbstractQueuedSynchronizer(AQS)來實現的。今天我們就來一起學習 AQS 內部原理。俗話說知己知彼百戰百勝,如果我們了解了其中的原理,用該類工具開發就可以做到事半功倍。
文|夏福利 網易智企資深開發工程師
一、AQS 執行框架
下圖是 AQS 大體執行框架:
通過上面這張圖能夠了解到 AQS 大概的執行過程。ReentrantLock、CountDownLatch、Semaphore 都是在這個流程上封裝的。
拿 ReentrantLock 來說,ReentrantLock 是獨占鎖,可以是公平鎖,也可以是非公平鎖,通過這張圖可以很好理解了。
ReentrantLock 是獨占鎖體現在同一時刻只有一個線程能夠嘗試獲取資源成功,其他獲取失敗的都會加入阻塞隊列的隊尾進行排隊。
公平鎖就是線程嚴格按照阻塞隊列的排列順序獲取資源,先到先得,不得插隊。如下圖所示:
而非公平鎖就可能存在插隊的可能。例如,如果上面頭節點被喚醒,正準備嘗試獲取資源,這時來了一個線程也嘗試獲取資源,有可能新來的線程獲取資源成功,而頭結點獲取資源失敗。這就是非公平鎖。
在 ReentrantLock 源碼中可以看到非公平鎖會嘗試獲取資源時,不會考慮阻塞隊列是否為空,如果能夠獲取資源成功則直接占用了資源。獲取失敗才會加入阻塞隊列。代碼如下:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false;}而對于公平鎖嘗試獲取資源時,會判斷阻塞隊列是否為空(和非公平鎖關鍵差別所在),如下:
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ @ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }}二、AQS 實現原理詳解
AQS 從名字上就知道是抽象類,通過模板方法定義了上面那張圖的流程。對于“資源占用”和“資源釋放”的定義,則是交給具體的子類去定義去實現。
對于共享鎖,子類需要實現如下兩個方法:
protected int tryAcquireShared(int arg);protected boolean tryReleaseShared(int arg);-
tryAcquireShared:共享方式。arg 為獲取鎖的次數,嘗試獲取資源;返回值為:
負數表示失敗;
0表示成功,但沒有剩余可用資源;
正數表示成功,且有剩余資源;
-
tryReleaseShared:共享方式。arg 為釋放鎖的次數,嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回 True,否則返回 False;
而對于獨占鎖,子類需要實現三個方法:
protected boolean tryAcquire(int arg)protected boolean tryRelease(int arg)protected boolean isHeldExclusively()-
isHeldExclusively:該線程是否正在獨占資源。只有用到 Condition 才需要去實現它;
-
tryAcquire:獨占方式。arg 為獲取鎖的次數,嘗試獲取資源,成功則返回 True,失敗則返回 False;
-
tryRelease:獨占方式。arg 為釋放鎖的次數,嘗試釋放資源,成功則返回 True,失敗則返回 False;
?獲取資源?
首先,我們看一下獨占鎖獲取資源的過程。
在 AQS 中,獨占鎖的獲取資源的核心代碼如下:
public final void acquire(int arg) { // 當 tryAcquire 返回 true 就說明獲取到鎖了,直接結束。 // 反之,返回 false 的話,就需要執行后面的方法。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}如果子類的 tryAcquire 返回 true, 則表示獲取鎖成功,直接結束。
只要子類的 tryAcquire 方法返回 false,那么就說明獲取鎖失敗,就需要將自己加入隊列。
private Node addWaiter(Node mode) { // 創建一個獨占類型的節點 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果 tail 節點不是 null,就將新節點的 pred 節點設置為 tail 節點。 // 并且將新節點設置成 tail 節點。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果 tail 節點是 null,或者 CAS 設置 tail 失敗。 // 在 enq 方法中處理 enq(node); return node;}如果 tail 節點為 null, 或者 CAS 設置 tail 失敗,則通過自旋的方式加入尾結點。
private Node enq(final Node node) { for (;;) { Node t = tail; // 如果 tail 是 null,就創建一個虛擬節點,同時指向 head 和 tail,稱為 初始化。 if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else {// 如果不是 null // 和 上個方法邏輯一樣,將新節點追加到 tail 節點后面,并更新隊列的 tail 為新節點。 // 只不過這里是死循環的,失敗了還可以再來 。 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } }}enq 方法的邏輯是什么呢?當 tail 是 null(沒有初始化隊列),就需要初始化隊列了。CAS 設置 tail 失敗,也會走這里,需要在 enq 方法中循環設置 tail。直到成功。
上面的過程用一張圖表示如下:
將自己加入到阻塞隊列后(注意 addWaiter 方法返回的是當前節點),執行 acquireQueued() 方法,將當前節點對應的線程掛起,源碼如下:
// 這里返回的節點是新創建的節點,arg 是請求的數量final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 找上一個節點 final Node p = node.predecessor(); // 如果上一個節點是 head ,就嘗試獲取鎖 // 如果 獲取成功,就將當前節點設置為 head,注意 head 節點是永遠不會喚醒的。 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 在獲取鎖失敗后,就需要阻塞了。 // shouldParkAfterFailedAcquire ---> 檢查上一個節點的狀態,如果是 SIGNAL 就阻塞,否則就改成 SIGNAL。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); }}這個方法有兩個邏輯:
-
如何將自己掛起?
-
被喚醒之后做什么?
先回答第二個問題:被喚醒之后做什么?
嘗試拿鎖,成功之后,將自己設置為 head,斷開和 next 的連接。
再看第一個問題:如何將自己掛起?
具體邏輯在 shouldParkAfterFailedAcquire 方法中:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果他的上一個節點的 ws 是 SIGNAL,他就需要阻塞。 if (ws == Node.SIGNAL) // 阻塞 return true; // 前任被取消。跳過前任并重試。 if (ws > 0) { do { // 將前任的前任 賦值給 當前的前任 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); // 將前任的前任的 next 賦值為 當前節點 pred.next = node; } else { // 如果沒有取消 || 0 || CONDITION || PROPAGATE,那么就將前任的 ws 設置成 SIGNAL. // 為什么必須是 SIGNAL 呢? // 答:希望自己的上一個節點在釋放鎖的時候,通知自己(讓自己獲取鎖) compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } // 重來 return false;}該方法的主要邏輯就是將前置節點的狀態修改成 SIGNAL,告訴他:你釋放鎖的時候記得喚醒我。其中如果前置節點被取消了,就跳過他。那么在前置節點釋放鎖的時候,肯定會喚醒這個節點。
上面是獨占鎖獲取資源過程,共享鎖獲取資源的過程類似,會有稍微的不同,核心代碼如下:
private void doAcquireShared(int arg) { // 將自己加入阻塞隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { // 將自己掛起前,嘗試再次獲取鎖,如果獲取成功, 則將自己設置為頭結點,并通知喚醒下一節點 final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { // 這里是和獨占鎖有區別的地方。這里不但會將自己設置為頭結點, 而且會喚醒下一個節點,通過這種方式將所有等待共享鎖的節點喚醒 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // 獲取鎖失敗,則掛起。同獨占鎖邏輯 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }這個方法同樣是包含兩個邏輯:
-
如何將自己掛起?
-
被喚醒之后做什么?
如何將自己掛起和獨占鎖沒有區別。喚醒之后做什么是和獨占鎖區別的關鍵:如果當前節點喚醒后獲取到了鎖后,會喚醒下一個節點。下一個節點喚醒后會繼續喚醒下下一個節點,從而將所有等待共享鎖的線程喚醒。核心代碼如下:
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below // 將自己設置為頭結點 setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 喚醒下一個節點 doReleaseShared(); } }釋放資源?
上面講了獲取資源的邏輯,那如何釋放資源呢?
同樣,還是先看一下獨占鎖的釋放邏輯:
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; // 所有的節點在將自己掛起之前,都會將前置節點設置成 SIGNAL,希望前置節點釋放的時候,喚醒自己。 // 如果前置節點是 0 ,說明前置節點已經釋放過了。不能重復釋放了,后面將會看到釋放后會將 ws 修改成0. if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}從這個方法的判斷就可以看出,head 必須不等于 0。為什么呢?上面獲取資源過程中說到:當一個節點嘗試掛起自己之前,都會將前置節點設置成 SIGNAL -1,就算是第一個加入隊列的節點,在獲取鎖失敗后,也會將初始化節點設置的 ws 設置成 SIGNAL。
而這個判斷也是防止多線程重復釋放,那么在釋放鎖之后,肯定會將 ws 狀態設置成 0。防止重復操作。代碼如下:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) // 將 head 節點的 ws 改成 0,清除信號。表示,他已經釋放過了。不能重復釋放。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next; // 如果 next 是 null,或者 next 被取消了。就從 tail 開始向上找節點。 if (s == null || s.waitStatus > 0) { s = null; // 從尾部開始,向前尋找最靠前的那個未被取消的節點 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 喚醒這個節點。 if (s != null) LockSupport.unpark(s.thread);}喚醒之后的邏輯是什么樣子的還記得嗎?在上面講解資源獲取時有講到:
線程喚醒后嘗試拿鎖,拿鎖成功則設置自己為 head,斷開前任 head 和自己的連接。
final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; //喚醒之后再次進行for循環,嘗試獲取鎖,獲取成功則將自己設置為頭結點 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } }再來看一下共享鎖的釋放邏輯,代碼如下:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }doReleaseShared() 代碼如下:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 將 head 節點的 ws 改成 0,清除信號。表示,他已經釋放過了。不能重復釋放。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 喚醒下一個節點 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }同樣,喚醒后做什么呢?
線程喚醒后嘗試拿鎖,拿鎖成功則設置自己為 head,斷開前任 head 和自己的連接, 并喚醒下一個節點,代碼如下:
private void doAcquireShared(long arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; //喚醒后再次for循環,嘗試獲取鎖,獲取鎖成功,則設置自己為頭結點, //并喚醒下一個結點,從而一直傳播下去,將所有等待共享鎖的線程喚醒 for (;;) { final Node p = node.predecessor(); if (p == head) { long r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } }}為了證明共享鎖喚醒時是一個接一個被喚醒,我們用一個 demo 來驗證下,示例代碼如下:
CountDownLatch countDownLatch = new CountDownLatch(1); Thread t1 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(1); countDownLatch.await(); System.out.println("線程1被喚醒了"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t2 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(2); countDownLatch.await(); System.out.println("線程2被喚醒了"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t3 = new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); countDownLatch.await(); System.out.println("線程3被喚醒了"); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); t3.start(); TimeUnit.SECONDS.sleep(4); countDownLatch.countDown(); }上面示例代碼中,線程 1、2、3 按順序加入阻塞隊列,當主線程調用 countDown() 時,此時會喚醒線程 1,線程 1 喚醒后會喚醒線程 2,線程 2 會喚醒線程 3。從運行結果可以看出確實如此:
線程1被喚醒了線程2被喚醒了線程3被喚醒了從而證明了上面的推斷。
三、總結
獨占鎖和共享鎖在獲取資源失敗時,都會將自己加入阻塞隊列的尾部,并將前一節點的 ws 設置為 SINGAL,告訴他:釋放鎖的時候記得喚醒我。
獨占鎖和共享鎖的不同之處在于:節點被喚醒后,獨占鎖線程不會喚醒下一個節點(要喚醒必須主動釋放鎖,比如使用 ReentrantLock 最后要調用 release() 方法主動釋放鎖)。
而對于共享鎖來說,只要一個節點被喚醒了,那就會繼續喚醒下一個節點,下一個節點又會去喚醒下下一個節點,從而將所有等待共享鎖的線程喚醒。
作者介紹?
夏福利,網易智企資深開發工程師,主要負責網易七魚在線智能客服的研發
總結
以上是生活随笔為你收集整理的AQS 源码流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Gitlab-ci 替代 webhook
- 下一篇: “易+”开源 | 网易会议开源之移动端篇