多线程:AQS源码分析
AQS 源碼分析
?
概述
Java的內置鎖一直都是備受爭議的,在JDK 1.6之前,synchronized這個重量級鎖其性能一直都是較為低下,雖然在1.6后,進行大量的鎖優化策略,但是與Lock相比synchronized還是存在一些缺陷的:雖然synchronized提供了便捷性的隱式獲取鎖釋放鎖機制(基于JVM機制),但是它卻缺少了獲取鎖與釋放鎖的可操作性,可中斷、超時獲取鎖,且它為獨占式在高并發場景下性能大打折扣。
AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并發包的作者(Doug Lea)期望它能夠成為實現大部分同步需求的基礎。它是JUC并發包中的核心基礎組件。
AQS解決了子類實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步隊列?;贏QS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。
AQS的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態。
AQS使用一個int類型的成員變量state來表示同步狀態,當state>0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。
AQS通過內置的FIFO同步隊列來完成資源獲取線程的排隊工作,如果當前線程獲取同步狀態失敗(鎖)時,AQS則會將當前線程以及等待狀態等信息構造成一個節點(Node)并將其加入同步隊列,同時會阻塞當前線程,當同步狀態釋放時,則會把節點中的線程喚醒,使其再次嘗試獲取同步狀態。
AQS可以實現獨占鎖和共享鎖,RenntrantLock實現的是獨占鎖,ReentrantReadWriteLock實現的是獨占鎖和共享鎖,CountDownLatch實現的是共享鎖。
下面我們通過源碼來分析下AQS的實現原理
AbstractQueuedSynchronizer類結構
public?abstract?class AbstractQueuedSynchronizer
????extends AbstractOwnableSynchronizer
????implements java.io.Serializable {
????protected AbstractQueuedSynchronizer() { }
????//同步器隊列頭結點
????private?transient?volatile?Node head;
????//同步器隊列尾結點
????private?transient?volatile?Node tail;
????//同步狀態(打的那個state為0時,無鎖,當state>0時說明有鎖。)
????private?volatile?int?state;
????//獲取鎖狀態
????protected final int getState() {
????????return?state;
????}
????//設置鎖狀態
????protected final void setState(int newState) {
????????state = newState;
????}
????......
通過AQS的類結構我們可以看到它內部有一個隊列和一個state的int變量。
隊列:通過一個雙向鏈表實現的隊列來存儲等待獲取鎖的線程。
state:鎖的狀態。
head、tail和state 都是volatile類型的變量,volatile可以保證多線程的內存可見性。
同步隊列的基本結構如下:
?
同步隊列
同步器隊列Node元素的類結構如下:
static?final?class Node {
????static?final?Node SHARED = new?Node();
????static?final?Node EXCLUSIVE = null;
????//表示當前的線程被取消;
????static?final?int?CANCELLED = ?1;
????//表示當前節點的后繼節點包含的線程需要運行,也就是unpark;
????static?final?int?SIGNAL ???= -1;
????//表示當前節點在等待condition,也就是在condition隊列中;
????static?final?int?CONDITION = -2;
????//表示當前場景下后續的acquireShared能夠得以執行;
????static?final?int?PROPAGATE = -3;
????//表示節點的狀態。默認為0,表示當前節點在sync隊列中,等待著獲取鎖。
????//其它幾個狀態為:CANCELLED、SIGNAL、CONDITION、PROPAGATE
????volatile?int?waitStatus;
????//前驅節點
????volatile?Node prev;
????//后繼節點
????volatile?Node next;
????//獲取鎖的線程
????volatile?Thread thread;
????//存儲condition隊列中的后繼節點。
????Node nextWaiter;
????......
}
從Node結構prev和next節點可以看出它是一個雙向鏈表,waitStatus存儲了當前線程的狀態信息
waitStatus
下面我們通過以下五個方面來介紹AQS是怎么實現的鎖的獲取和釋放的
5.獨占超時獲得鎖
1.獨占式獲得鎖
acquire方法代碼如下:
public final void acquire(int arg) {
????????//嘗試獲得鎖,獲取不到則加入到隊列中等待獲取
????????if?(!tryAcquire(arg) &&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
????????????selfInterrupt();
????}
addWaiter方法代碼如下:
private Node addWaiter(Node mode) {
????Node node = new?Node(Thread.currentThread(), mode);
????// Try the fast path of enq; backup to full enq on failure
????Node pred = tail;
????if?(pred != null) {
????????node.prev = pred;
????????//將該節點添加到隊列尾部
????????if?(compareAndSetTail(pred, node)) {
????????????pred.next = node;
????????????return?node;
????????}
????}
????//如果前驅節點為null,則進入enq方法通過自旋方式入隊列
????enq(node);
????return?node;
}
將構造的同步節點加入到同步隊列中
enq方法代碼如下:
????private Node enq(final Node node) {
????????for?(;;) {
????????????Node t = tail;
????????????if?(t == null) { // Must initialize
????????????????//如果隊列為空,則通過CAS把當前Node設置成頭節點
????????????????if?(compareAndSetHead(new?Node()))
????????????????????tail = head;
????????????} else?{
????????????????node.prev = t;
????????????????//如果隊列不為空,則向隊列尾部添加Node
????????????????if?(compareAndSetTail(t, node)) {
????????????????????t.next = node;
????????????????????return?t;
????????????????}
????????????}
????????}
????}
該方法使用CAS自旋的方式來保證向隊列中添加Node(同步節點簡寫Node)
acquireQueued方法代碼如下:
final boolean acquireQueued(final Node node, int arg) { ?
????boolean?failed = true; ?
????try?{ ?
????????boolean?interrupted = false; ?
????????for?(;;) { ?
????????????//找到當前節點的前驅節點
????????????final?Node p = node.predecessor(); ?
????????????//檢測p是否為頭節點,如果是,再次調用tryAcquire方法
????????????if?(p == head && tryAcquire(arg)) { ?
????????????????//如果p節點是頭節點且tryAcquire方法返回true。那么將當前節點設置為頭節點。
????????????????setHead(node); ?
????????????????p.next = null; // help GC ?
????????????????failed = false; ?
????????????????return?interrupted; ?
????????????} ?
????????????//如果p節點不是頭節點,或者tryAcquire返回false,說明請求失敗。 ?
????????????//那么首先需要判斷請求失敗后node節點是否應該被阻塞,如果應該 ?
????????????//被阻塞,那么阻塞node節點,并檢測中斷狀態。 ?
????????????if?(shouldParkAfterFailedAcquire(p, node) && ?
????????????????parkAndCheckInterrupt()) ?
????????????????//如果有中斷,設置中斷狀態。 ?
????????????????interrupted = true; ?
????????} ?
????} finally?{ ?
????????if?(failed) //最后檢測一下如果請求失敗(異常退出),取消請求。 ?
????????????cancelAcquire(node); ?
????} ?
}
在acquireQueued方法中,當前線程通過自旋的方式來嘗試獲取同步狀態,
通過上面的代碼我們可以發現AQS內部的同步隊列是FIFO的方式存取的。節點自旋獲取同步狀態的行為如下圖所示
?
節點自旋獲取同步狀態
shouldParkAfterFailedAcquire方法代碼如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
????????//獲得前驅節點狀態
????????int?ws = pred.waitStatus;
????????if?(ws == Node.SIGNAL)
???????????//如果前驅節點狀態為SIGNAL,當前線程則可以阻塞。
???????????return?true;
????????if?(ws > 0) {
????????????do?{
????????????????//判斷如果前驅節點狀態為CANCELLED,那就一直往前找,直到找到最近一個正常等待的狀態
????????????????node.prev = pred = pred.prev;
????????????} while?(pred.waitStatus > 0);
????????????//并將當前Node排在它的后邊。
????????????pred.next = node;
????????} else?{
????????????//如果前驅節點正常,則修改前驅節點狀態為SIGNAL
????????????compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
????????}
????????return?false;
????}
節點的狀態如下表:
| 狀態 | 值 | 說明 |
| CANCELLED | 1 | 等待超時或者中斷,需要從同步隊列中取消 |
| SIGNAL | -1 | 后繼節點出于等待狀態,當前節點釋放鎖后將會喚醒后繼節點 |
| CONDITION | -2 | 節點在等待隊列中,節點線程等待在Condition上,其它線程對Condition調用signal()方法后,該節點將會從等待同步隊列中移到同步隊列中,然后等待獲取鎖。 |
| PROPAGATE | -3 | 表示下一次共享式同步狀態獲取將會無條件地傳播下去 |
| INITIAL | 0 | 初始狀態 |
parkAndCheckInterrupt方法代碼如下:
private final boolean parkAndCheckInterrupt() {
????//阻塞當前線程
????LockSupport.park(this);
????//判斷是否中斷來喚醒的
????return?Thread.interrupted();
}
2. 獨占式釋放鎖
release方法代碼如下:
????public final boolean release(int arg) {
????????//嘗試釋放鎖
????????if?(tryRelease(arg)) {
????????????Node h = head;
????????????if?(h != null?&& h.waitStatus != 0)
????????????????//喚醒后繼節點
????????????????unparkSuccessor(h);
????????????return?true;
????????}
????????return?false;
????}
tryRelease(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
3. 共享式獲得鎖
acquireShared方法代碼如下:
public final void acquireShared(int arg) {
????//嘗試獲取的鎖,如果獲取失敗執行doAcquireShared方法。
????if?(tryAcquireShared(arg) < 0)
????????doAcquireShared(arg);
}
tryAcquireShared()嘗試獲取鎖,如果獲取失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。
這里tryAcquireShared()需要自定義同步器去實現。
AQS中規定:負值代表獲取失敗,非負數標識獲取成功。
doAcquireShared方法代碼如下:
private void doAcquireShared(int arg) {
????//構建共享Node
????final?Node node = addWaiter(Node.SHARED);
????boolean?failed = true;
????try?{
????????boolean?interrupted = false;
????????for?(;;) {
????????????//獲取前驅節點
????????????final?Node p = node.predecessor();
????????????//如果是頭節點進行嘗試獲得鎖
????????????if?(p == head) {
????????????????//如果返回值大于等于0,則說明獲得鎖
????????????????int?r = tryAcquireShared(arg);
????????????????if?(r >= 0) {
????????????????????//當前節點設置為隊列頭,并
????????????????????setHeadAndPropagate(node, r);
????????????????????p.next = null; // help GC
????????????????????if?(interrupted)
????????????????????????selfInterrupt();
????????????????????failed = false;
????????????????????return;
????????????????}
????????????}
????????????if?(shouldParkAfterFailedAcquire(p, node) &&
????????????????parkAndCheckInterrupt())
????????????????interrupted = true;
????????}
????} finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}
在acquireQueued方法中,當前線程也通過自旋的方式來嘗試獲取同步狀態,同獨享式獲得鎖一樣
setHeadAndPropagate方法代碼如下:
private void setHeadAndPropagate(Node node, int propagate) {
????????Node h = head; // Record old head for check below
????????setHead(node);
????????//如果propagate >0,說明共享鎖還有可以進行獲得鎖,繼續喚醒下一個節點
????????if?(propagate > 0?|| h == null?|| h.waitStatus < 0?||
????????????(h = head) == null?|| h.waitStatus < 0) {
????????????Node s = node.next;
????????????if?(s == null?|| s.isShared())
????????????????doReleaseShared();
????????}
????}
設置當前節點為頭結點,并調用了doReleaseShared()方法,acquireShared方法最終調用了release方法,得看下為什么。原因其實也很簡單,shared模式下是允許多個線程持有一把鎖的,其中tryAcquire的返回值標志了是否允許其他線程繼續進入。如果允許的話,需要喚醒隊列中等待的線程。其中doReleaseShared方法的邏輯很簡單,就是喚醒后繼線程。
因此acquireShared的主要邏輯就是嘗試加鎖,如果允許其他線程繼續加鎖,那么喚醒后繼線程,如果失敗,那么入隊阻塞等待。
4. 共享式釋放鎖
releaseShared方法代碼如下:
public final boolean releaseShared(int arg) {
????if?(tryReleaseShared(arg)) {
????????doReleaseShared();
????????return?true;
????}
????return?false;
}
tryReleaseShared(int arg) 方法應該由實現AQS的子類來實現具體的邏輯。
doReleaseShared方法代碼如下:
private void doReleaseShared() {
????for?(;;) {
????????// 獲取隊列的頭節點
????????Node h = head;
????????// 如果頭節點不為null,并且頭節點不等于tail節點。
????????if?(h != null?&& h != tail) {
????????????// 獲取頭節點對應的線程的狀態
????????????int?ws = h.waitStatus;
????????????// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。
????????????if?(ws == Node.SIGNAL) {
????????????????// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。
????????????????if?(!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
????????????????????continue;
????????????????// 喚醒“頭節點的下一個節點所對應的線程”。
????????????????unparkSuccessor(h);
????????????}
????????????// 如果頭節點對應的線程是空狀態,則設置“尾節點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。
????????????else?if?(ws == 0?&&
?????????????????????!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
????????????????continue; ???????????????// loop on failed CAS
????????}
????????// 如果頭節點發生變化,則繼續循環。否則,退出循環。
????????if?(h == head) ??????????????????// loop if head changed
????????????break;
????}
}
該方法主要是喚醒后繼節點。對于能夠支持多個線程同時訪問的并發組件(比如Semaphore),它和獨占式主要區別在于tryReleaseShared(int arg)方法必須確保同步狀態(或者資源數)線程安全釋放,一般是通過循環和CAS來保證的,因為釋放同步狀態的操作會同時來自多個線程。
5. 獨占超時獲得鎖
doAcquireNanos方法代碼如下:
private boolean doAcquireNanos(int arg, long nanosTimeout)
????????throws InterruptedException {
????if?(nanosTimeout <= 0L)
????????return?false;
????//計算出超時時間點
????final?long?deadline = System.nanoTime() + nanosTimeout;
????final?Node node = addWaiter(Node.EXCLUSIVE);
????boolean?failed = true;
????try?{
????????for?(;;) {
????????????final?Node p = node.predecessor();
????????????if?(p == head && tryAcquire(arg)) {
????????????????setHead(node);
????????????????p.next = null; // help GC
????????????????failed = false;
????????????????return?true;
????????????}
????????????//計算剩余超時時間,超時時間點deadline減去當前時間點System.nanoTime()得到還應該睡眠的時間
????????????nanosTimeout = deadline - System.nanoTime();
????????????//如果超時,返回false,獲取鎖失敗
????????????if?(nanosTimeout <= 0L)
????????????????return?false;
????????????//判斷是否需要阻塞當前線程
????????????//如果需要,在判斷當前剩余納秒數是否大于1000
????????????if?(shouldParkAfterFailedAcquire(p, node) &&
????????????????nanosTimeout > spinForTimeoutThreshold)
????????????????//阻塞 nanosTimeout納秒數
????????????????LockSupport.parkNanos(this, nanosTimeout);
????????????if?(Thread.interrupted())
????????????????throw?new?InterruptedException();
????????}
????} finally?{
????????if?(failed)
????????????cancelAcquire(node);
????}
}
該方法在自旋過程中,當節點的前驅節點為頭節點時嘗試獲取同步狀態,如果獲取成功則從該方法返回,這個過程和獨占式同步獲取的過程類似,但是在同步狀態獲取失敗的處理上有所不同。如果當前線程獲取同步狀態失敗,則首先重新計算超時間隔nanosTimeout,則判斷是否超時(nanosTimeout小于等于0表示已經超時),如果沒有超時,則使當前線程等待nanosTimeout納秒(當已到設置的超時時間,該線程會從LockSupport.parkNanos(Object blocker,long nanos)方法返回)。
如果nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)時,將不會使該線程進行
超時等待,而是進入快速的自旋過程。原因在于,非常短的超時等待無法做到十分精確,如果
這時再進行超時等待,相反會讓nanosTimeout的超時從整體上表現得反而不精確。因此,在超
時非常短的場景下,同步器會進入無條件的快速自旋。
總結
以上是生活随笔為你收集整理的多线程:AQS源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java面向对象基础整理
- 下一篇: 多线程:AQS的一些心得