吐血总结:AQS到底是什么?
文章目錄
- 1、概述
- 2、基本框架
- 2.1、AQS框架介紹
- 2.2、AQS核心成員變量和方法
- 3、源碼分析
- 3.1、CLH隊列(FIFO)
- 3.2、獨占模式獲取資源
- 3.2.1、acquire(int)
- 3.2.1.1、tryAcquire(int arg)
- 3.2.1.2、addWaiter(Node mode)
- 3.2.1.3、acquireQueued(final Node node, int arg)
- 3.2.1.3.1、shouldParkAfterFailedAcquire(p, node)
- 3.2.1.3.2、parkAndCheckInterrupt()
- 3.2.1.4、selfInterrupt()
- 3.2.2、獨占式獲取資源小結
- 3.3、獨占模式釋放資源
- 3.3.1、release(int arg)
- 3.3.1.1、tryRelease(int arg)
- 3.3.1.2、unparkSuccessor(Node node)
- 3.3.2、獨占式釋放資源小結
- 3.4、共享模式獲取資源
- 3.4.1、acquireShared(int arg)
- 3.4.1.1、tryAcquireShared(int arg)
- 3.4.1.2、doAcquireShared(int arg)
- 3.4.1.2.1、setHeadAndPropagate(Node node, int propagate)
- 3.4.2、共享模式獲取資源小結
- 3.5、共享模式釋放資源
- 3.5.1、共享模式釋放資源
- 3.5.1.1、releaseShared(int arg)
- 3.5.1.1.1、doReleaseShared()
- 3.5.2、共享模式釋放資源小結
- 4、總結
1、概述
AQS,即AbstractQueuedSynchronizer,抽象的隊列式同步器。AQS定義了一套多線程訪問共享資源的同步器框架,許多我們使用的同步器都是基于它來實現的,如常用的ReentrantLock、Semaphore、CountDownLatch、CyclicBarrie并發類都是通過實現AQS里面的模板方法來實現內部的組件。
2、基本框架
2.1、AQS框架介紹
原圖地址:https://www.processon.com/view/link/5ef89c477d9c08442039b8c5
AQS實現原理依賴內部state(同步狀態)和CHL隊列(FIFO雙向隊列),如果當前線程獲取state同步狀態失敗AQS會將該線程以及狀態等信息構造一個Node節點,并將這個Node節點添加到隊尾,同時阻塞當前線程,當同步狀態釋放時,喚醒隊列頭節點。
2.2、AQS核心成員變量和方法
AQS核心的三個成員變量如下:
private transient volatile Node head;//CHL隊列的頭部節點,延遲初始化。除了初始化,它只通過setHead()方法進行修改。如果head節點存在,head節點的waitStatus保證不會被CANCELLEDprivate transient volatile Node tail;//CHL隊列的尾部節點,延遲初始化。僅通過enq()方法新增等待的節點。private volatile int state; //同步狀態我們可以看出來這三個成員變量都是使用volatile關鍵字來修飾的,volatile代表變量內存可見。
state有以下三種訪問方式:
- getState():獲取同步狀態。
- setState(int newState):設置同步狀態。
- compareAndSetState(int expect, int update):通過CAS方式修改同步狀態。
三種方法源碼實現如下:
資源共享方式分為兩種:
- 獨占式(Exclusive):只有單個線程能夠成功呢獲取資源并執行,如ReentrantLock。
- 共享式(Shared):多個線程可成功獲取資源并執行,如Semaphore、CountDownLatch等。
AQS將大部分的同步邏輯均已經實現好了,繼承的自定義同步器只需要實現state的獲取(acquire)和釋放(release)的邏輯代碼就可以了。
獨占
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續等待結點返回true,否則返回false。
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
AQS需要子類復寫的方法均沒有聲明為abstract,目的是避免子類需要強制性覆寫多個方法,因為一般自定義同步器要么是獨占要么是共享方式,只需實現tryAcquire-tryRelease或tryAcquireShared-tryReleaseShared中的一種組合即可。當然,AQS也支持子類同時實現獨占和共享兩種模式,如ReentrantReadWriteLock。
3、源碼分析
3.1、CLH隊列(FIFO)
AQS是通過內部類Node來實現FIFO隊列的,源碼如下:
static final class Node {//表明節點在共享模式下等待的標記static final Node SHARED = new Node();//表情節點在獨占模式下等待的標記static final Node EXCLUSIVE = null;//指示等待線程已取消static final int CANCELLED = 1;//指示需要喚醒后續線程static final int SIGNAL = -1;//指示線程在等待觸發條件(condition)static final int CONDITION = -2;//指示下一個acquireShared應無條件傳播的waitStatus值static final int PROPAGATE = -3;/*** CANCELLED(1) :表示當前節點因timeout和interrupt而放棄競爭state,進入該狀態后的節點將不會再變化。* SIGNAL(-1) :表示后繼節點等待當前節點喚醒。后繼節點入隊列是,會將前繼節點的狀態變更為SIGNAL。 * CONDITION(-2):表示節點等待在Condition上,當其他線程調用了Condition的signal()方法后,CONDITION狀態的節點將從等待隊列轉移到同步隊列中,等待獲取同步鎖* PROPAGATE(-3):共享模式下,前繼節點不僅會喚醒其后繼節點,同時也可能喚醒后繼的后繼節點。* 0 :以上情況都不是。*/volatile int waitStatus;//前繼節點volatile Node prev;//后繼節點volatile Node next;//持有的線程volatile Thread thread;//下一個等待條件出發的節點Node nextWaiter;//返回節點是否處于Shared狀態下final boolean isShared() {return nextWaiter == SHARED;}//返回前繼節點final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}//Shared模式下的Node構造方法Node() { }//用于addWaiter的構造方法Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}//用于Condition下的構造方法Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}當waitStatus非負的時候,表示不可用,正數代表處于等待狀態,所以waitStatus只需要檢查其正負符號即可,不用太多關注特定值。
3.2、獨占模式獲取資源
3.2.1、acquire(int)
獨占模式(Exclusive)獲取資源的入口方法為:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取失敗,則加入等待隊列selfInterrupt();}從上面的源代碼我們可以看出執行方法的順序依次為:
(1)tryAcquire(arg)嘗試獲取資源,如果獲取成功返回true則acquire()直接返回。如果返回fasle,則進入(2);
(2)addWaiter(Node.EXCLUSIVE), arg)將該線程加入CHL等待隊列的尾部,并標記為獨占模式,完成后進入(3);
(3)acquireQueued()以獨占模式不間斷獲取隊列中已存在的線程直到獲取元素。獲取元素成功后若線程未中斷過則返回false然后acquire()直接返回,如果等待過程中被中斷過則返回true,然后進入(4);
(4)selfInterrupt()這個方法翻譯過來就是自我在中斷,注意這個中斷方法必須是在獲取元素成功之后才會執行的,就是說獲取資源成功了才會執行的,不是立即響應中斷的。
下面就詳細介紹一下上面的這4個方法。
3.2.1.1、tryAcquire(int arg)
這個目的是嘗試獲取獨占資源的方法,成功直接返回true,失敗直接返回false,這個地方體現了非公平鎖,因為調用的線程直接獲取,完全不考慮CHL隊列中還有可能有線程在等待獲取資源。源碼如下:
//(1)嘗試獲取資源protected boolean tryAcquire(int arg) {throw new UnsupportedOperationException();}注意,這里的tryAcquire()是一個由protected修飾的空方法,AQS只是一個框架具體的資源獲取-釋放則是由自定義的同步器去實現的。這里本可以定義成為abstract方法,前面我們說過如果獨占模式下只用實現tryAcquire-tryRelease而共享模式下只用實現tryAcquireShared-tryReleaseShared,如果這里我們定義成abstract方法的話,我們在實現獨占模式的情況下還要去考慮實現共享模式的兩個方法。作者Doug Lea使用這種方式可以讓我們該去實現獨占模式的時候不去考慮共享模式的方法,如果未自己實現就用則會拋UnsupportedOperationException異常。
3.2.1.2、addWaiter(Node mode)
這個方法就是在上面獲取資源失敗的情況下,將當前線程加入到CHL隊列的隊尾,并返回當前線程所在的Node節點。源碼如下:
//(2)將獲取資源失敗的線程放入隊尾private Node addWaiter(Node mode) {//(2.1)用給定模式構造Node節點。mode取值有兩種:EXCLUSIVE(獨占)、SHARED(共享)Node node = new Node(Thread.currentThread(), mode);//(2.2)嘗試快速插入等待隊列,如果失敗則執行常規插入操作enq(node);Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//(2.3)上面插入失敗,則使用此方法插入enq(node);return node;}其中enq(node)方法如下:
private Node enq(final Node node) {//(2.3.1)CAS自旋,直到成功加入到隊尾for (;;) {Node t = tail;if (t == null) { //(2.3.2)如果隊列為空,則創建一個空的Node節點作為head節點,并將tail指向headif (compareAndSetHead(new Node()))tail = head;} else {//(2.3.3)正常流程,放入隊尾node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}其中compareAndSetHead和compareAndSetTail執行的是unsafe里面的compareAndSwapObject方法,這個方法是native方法,屬于原子操作。想了解這個方法的可以查閱CAS相關的知識。
//CAS自旋賦值head節點private final boolean compareAndSetHead(Node update) {return unsafe.compareAndSwapObject(this, headOffset, null, update);}//CAS自旋賦值tail節點private final boolean compareAndSetTail(Node expect, Node update) {return unsafe.compareAndSwapObject(this, tailOffset, expect, update);}3.2.1.3、acquireQueued(final Node node, int arg)
通過上面的tryAcquire()和addWaiter()這個 線程已經獲取資源失敗了,并且已經被放到等待隊列的尾部了。acquireQueued()方法是以自旋方式獲取獨占模式獲取隊列中已存在的線程。舉個例子:例如我們去12306網站買票,剛開頁面顯示無票,我們就一直刷新頁面直到有車票資源。
//(3)進入等待狀態直到head節點線程釋放資源,當前線程獲取資源并返回是否被中斷標識final boolean acquireQueued(final Node node, int arg) {//(3.1)標記是否成功拿到資源boolean failed = true;try {//(3.2)標記等待過程中是否被中斷過boolean interrupted = false;//自旋for (;;) {//(3.3)拿到前驅節點final Node p = node.predecessor();//(3.4)如果前驅節點是head,即當前是第2個節點,那么符合條件嘗試獲取資源。可能是head節點釋放完資源釋放了當前節點,也有可能被interrupt中斷了。if (p == head && tryAcquire(arg)) {//(3.4.1)剩下的這兩步就是把當前節點設置為head節點,并且釋放原來的head節點setHead(node);//(3.4.2)這里p.next指向null就是交給GC回收了p.next = null;//(3.4.3)成功獲取資源failed = false;//(3.4.4)返回等待過程中是否被中斷過return interrupted;}//(3.5)走到這里代表當前元素不是第2個節點則繼續判斷是否滿足下面2個條件。//1.shouldParkAfterFailedAcquire方法檢查線程是否應該阻塞//2.parkAndCheckInterrupt方法調用park()當前線程,直到unpack()被喚醒,判斷當前線程是否被中斷了if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//如果等待過程中被紅斷過,就將interrupted設置為trueinterrupted = true;}} finally {//(3.6)代表等待過程中沒有成功獲取到資源(timeout,或者被中斷),則放棄爭搶資源if (failed)cancelAcquire(node);}}3.2.1.3.1、shouldParkAfterFailedAcquire(p, node)
上面的shouldParkAfterFailedAcquire方法的實現如下:
//(3.5.1)shouldParkAfterFailedAcquire方法檢查線程是否應該阻塞private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//(3.5.1.1)獲取前繼節點的waitStatus值int ws = pred.waitStatus;//(3.5.1.2)如果ws值為SIGNAL(-1),代表前繼節點完成資源釋放或者中斷后,會通知當前節點,因此當前節點可以安全的parkif (ws == Node.SIGNAL)return true;//(3.5.1.3)如果ws>0,其實就是CANCELLED(1)代表前繼節點處于放棄狀態,//那就繼續遍歷直到前繼節點的ws為0或者為-1if (ws > 0) {do {//這一句的意思就是節點指針向前移動,直到前繼節點滿足條件node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {//(3.5.1.4)如果當前ws<=0則設置當前節點為SIGNAL(-1),以保證外層方法自旋的時候返回truecompareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}3.2.1.3.2、parkAndCheckInterrupt()
parkAndCheckInterrupt主要是調用LockSupport類的park()方法阻塞當前線程,并返回線程是否被中斷過。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}3.2.1.4、selfInterrupt()
通過上面的分析,能走到這一步代表此線程在等待過程中被中斷了。
//(4)中斷當前線程static void selfInterrupt() {Thread.currentThread().interrupt();}3.2.2、獨占式獲取資源小結
再一次,拿出獨占模式(Exclusive)獲取資源的入口方法:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//獲取失敗,則加入等待隊列selfInterrupt();}從上面的源代碼我們可以看出執行方法的順序依次為:
(1)tryAcquire(arg)嘗試獲取資源,如果獲取成功返回true則acquire()直接返回。如果返回fasle,則進入(2);
(2)addWaiter(Node.EXCLUSIVE), arg)將該線程加入CHL等待隊列的尾部,并標記為獨占模式,完成后進入(3);
(3)acquireQueued()以獨占模式不間斷獲取隊列中已存在的線程直到獲取元素。獲取元素成功后若線程未中斷過則返回false然后acquire()直接返回,如果等待過程中被中斷過則返回true,然后進入(4);
(4)selfInterrupt()這個方法翻譯過來就是自我在中斷,注意這個中斷方法必須是在獲取元素成功之后才會執行的,就是說獲取資源成功了才會執行的,不是立即響應中斷的。
我們再補一個流程圖便于理解:
這也就是ReentrantLock.lock()的流程,其整個函數就是一條acquire(1)
3.3、獨占模式釋放資源
3.3.1、release(int arg)
獨占模式釋放資源的過程其也就是unlock()過程,其實就是賦值state=0,此時線程AQS會喚醒隊列其他線程獲取資源。
public final boolean release(int arg) {//(1)嘗試釋放資源if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)//(2)喚醒隊列里其他線程unparkSuccessor(h);return true;}return false;}3.3.1.1、tryRelease(int arg)
嘗試釋放資源,根據他的返回值判斷是否釋放資源成功。
protected boolean tryRelease(int arg) {throw new UnsupportedOperationException();}這是個空方法,這個需要根據自己的情況自定義同步器去實現。如果徹底釋放資源返回true,否則返回false。
3.3.1.2、unparkSuccessor(Node node)
此方法是用來喚醒等待隊列中的下一個線程:
//(2)喚醒隊列里其他線程private void unparkSuccessor(Node node) {//(2.1)獲取當前Node節點的waitStateint ws = node.waitStatus;//(2.2)如果當前的狀態為SIGNAL(-1),則嘗試置為0if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//(2.3)找到下一個節點Node s = node.next;//(2.4)如果節點為空或者CANCELLED(1)if (s == null || s.waitStatus > 0) {s = null;//(2.5)這就是從尾部tail節點遍歷隊列,直到獲取狀態<=0的節點for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//(2.6)如果s不為空喚醒線程LockSupport.unpark(s.thread);}后繼節點的阻塞線程被喚醒后,就進入到acquireQueued()的if (p == head && tryAcquire(arg))的判斷中,此時被喚醒的線程將嘗試獲取資源。如果被喚醒的線程所在節點的前繼節點不是頭結點,經過shouldParkAfterFailedAcquire的調整,也會移動到等待隊列的前面,直到其前繼節點為頭結點。
3.3.2、獨占式釋放資源小結
release()是獨占模式下釋放共享資源的入口方法,它會釋放指定量的資源,如果徹底釋放了(即state=0),此時它將喚醒等待隊列的線程來獲取資源。一共分為兩個步驟:
3.4、共享模式獲取資源
3.4.1、acquireShared(int arg)
共享模式獲取共享資源的入口就是acquireShared方法,方法的代碼如下:
public final void acquireShared(int arg) {//(1)嘗試獲取共享資源if (tryAcquireShared(arg) < 0)//(2)獲取資源失敗,進入等待隊列doAcquireShared(arg);}共享模式獲取鎖分為以下兩步:
(1)tryAcquireShared()方法嘗試獲取共享資源,如果獲取成功了就是返回的結果大于等于0,那恭喜你,直接返回了。如果返回值小于0,代表獲取共享資源失敗了,則進入(2);
(2)通過doAcquireShared()方法將獲取鎖失敗的線程放入到隊列中。
這里tryAcquireShared返回值負值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數表示獲取成功,還有剩余資源,其他線程還可以去獲取。
3.4.1.1、tryAcquireShared(int arg)
這個tryAcquireShared()也是一個空方法跟我們之前的獨占式的一樣,需要我們自定義的同步器去實現。
//(1)嘗試獲取共享資源protected int tryAcquireShared(int arg) {throw new UnsupportedOperationException();}3.4.1.2、doAcquireShared(int arg)
將線程加入等待隊尾,直到其他線程釋放資源釋放資源,并成功大道相應的共享資源才 返回。
//(2)獲取資源失敗,進入等待隊列private void doAcquireShared(int arg) {//(2.1)將SHARED模式的節點添加到隊尾final Node node = addWaiter(Node.SHARED);boolean failed = true;//是否失敗標識默認truetry {boolean interrupted = false;//是否被中斷標識,默認fasle//(2.2)使用自旋方式獲取資源for (;;) {final Node p = node.predecessor();//獲取前驅節點if (p == head) {//如果當前線程的前驅節點是首節點,此時當前節點就是第2個節點,head執行完就該喚醒自己了//(2.2)嘗試獲取資源int r = tryAcquireShared(arg);if (r >= 0) {//代表獲取資源成功了//(2.2.1)獲取資源成功后重新設置head節點并且釋放;老的head節點setHeadAndPropagate(node, r);p.next = null; //釋放老head節點,交給GC//(2.2.2)如果此時發現線程已經被中斷,則中斷自己,這個跟獨占方式一樣if (interrupted)//(2.2.2)中斷自己selfInterrupt();failed = false;//成功標識return;}}//(2.3)走到這里代表獲取資源失敗了,判斷是否可以park,如果可以調用給你park()方法,然后等待unpark或者interrupt。如果線程被中斷過,則將中斷標識修改為trueif (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {//(2.4)代表等待過程中沒有成功獲取到資源(timeout,或者被中斷),則放棄爭搶資源if (failed)cancelAcquire(node);}}我們可以發現doAcquireShared()的實現和acquireQueued(),十分相似,流程沒有太大差別。只是把selfInterrupt()方法是在doAcquireShared()內部,獨占模式是在acquireQueued()外面,其實結果都差不多。
3.4.1.2.1、setHeadAndPropagate(Node node, int propagate)
//(2.2.1)獲取資源成功后重新設置head節點private void setHeadAndPropagate(Node node, int propagate) {Node h = head;//記錄老節點下面會檢查setHead(node);//將頭結點指向當前節點//如果共享資源還有剩余量,則繼續喚醒下一個線程if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}這里面其實和獨占模式基本一樣的,就是多了一步如果有剩余共享資源則主動喚醒下一個節點這一步。下面釋放資源的時候分析doReleaseShared()。
3.4.2、共享模式獲取資源小結
其實共享模式獲取資源和獨占模式獲取資源很相似,它的流程如下:
(1)tryAcquireShared()方法嘗試獲取共享資源,如果獲取成功了就是返回的結果大于等于0,那恭喜你,直接返回了。如果返回值小于0,代表獲取共享資源失敗了,則進入(2);
(2)通過doAcquireShared()方法將獲取鎖失敗的線程放入隊列,并調用park()方法,直到被unpark()或者interrupt()。這里多了一步,在當先線程拿到資源后,還會去喚醒后繼線程的操作。
3.5、共享模式釋放資源
3.5.1、共享模式釋放資源
3.5.1.1、releaseShared(int arg)
releaseShared方法是釋放資源的入口方法。這個方法會釋放定量的資源,如果成功釋放且允許喚醒等待線程,則會喚醒等待隊列里的其他線程來獲取資源。。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}//這是一個空方法等待自定義同步器去實現protected boolean tryReleaseShared(int arg) {throw new UnsupportedOperationException();}這個流程原理概括起來就是:釋放資源后,喚醒后繼。舉個例子理解以下這個場景:
假如一共有10個共享資源,線程A、B、C分別需要5、4、3個資源
這點表達跟獨占鎖不同的地方是獨占方式必須是資源釋放掉(state=0)才返回true,但是共享模式下根據我們上面的例子可以看出沒有這種要求。
3.5.1.1.1、doReleaseShared()
這個方法主要是喚醒后繼線程。
private void doReleaseShared() {//自旋for (;;) {//儲存head節點,后續檢查Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck cases//喚醒后繼線程unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}//head節點發生變化就跳出循環if (h == head) // loop if head changedbreak;}}3.5.2、共享模式釋放資源小結
上面的我們已經把共享模式下釋放資源的情況分析了。一句話總結:釋放指定量的資源,如果成功釋放且允許喚醒等待線程,它會喚醒等待隊列里的其他線程來獲取資源。
4、總結
以上我們分析了獨占模式、共享模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,他們在獲取共享資源的時候都是忽略中斷的直到獲取資源。其實AQS也支持響應中斷的,acquireInterruptibly()和acquireSharedInterruptibly(),有興趣可以自行研究。
最后牢記這個獲取資源流程圖:
原圖地址:https://www.processon.com/view/link/5efa08dbe0b34d4dba5dccc8
總結
以上是生活随笔為你收集整理的吐血总结:AQS到底是什么?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java中如何使用Thread和Runn
- 下一篇: Java中的锁的概念大汇总