AQS实现原理分析
AQS
隊列同步器(AbstractQueuedSynchronizer)簡稱AQS,是J.U.C同步構件的基礎,包括ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphor都是基于AQS實現的。
理解AQS是理解這些同步工具的基礎,基于AQS提供的同步語義可以定制各種功能的同步工具。
AQS原理
-
用一個int類型的狀態變量(volatile)state記錄同步狀態,默認值是0
用一個雙向鏈表實現的隊列對線程進行排隊和調度 -
A線程使用compareAndSet(state,0,1)原子設置state的值,設置成功說明state當前無其他線程爭用,A線程取鎖的使用權。
-
設置不成功,說明B線程對state的值進行了設置,并且沒有復位(state!=0),B線程持有鎖的使用權(B線程還沒有釋放鎖)。A線程會構造成一個Node節點加入隊列尾部并掛起。
-
當B線程執行完同步操作后,對state進行復位(state==0),即釋放鎖,然后從隊列頭開始尋找,發現正在沉睡的A線程,將其喚醒。
AQS同步隊列
static final class Node {/**共享模式 */static final Node SHARED = new Node();/**獨占模式 */static final Node EXCLUSIVE = null;/**取消狀態,由于在同步隊列中等待的線程等待超時或被中斷,* 需要從同步隊列中取消等待。 */static final int CANCELLED = 1;/**通知狀態,當前節點的后繼節點包含的線程需要運行(unpark)* 當前節點的線程如果釋放了同步狀態或者被取消,將通知后續節點。*/static final int SIGNAL = -1;/**條件阻塞狀態,節點線程等待在Condition上,* 當其他線程對Condition調用了signal()方法后,* 該節點將會從等待隊列中轉移到同步隊列中,* 加入到對同步狀態的獲取中。 */static final int CONDITION = -2;/**傳播狀態,表示當前場景下后續的acquireShared能夠得以執行。*/static final int PROPAGATE = -3;/**節點的的狀態 * 初始狀態為0 表示當前節點在sync隊列中,等待著獲取狀態。*/volatile int waitStatus;/** 前驅節點 */volatile Node prev;/** 后繼節點 */volatile Node next;/**節點對應的線程,等待獲取同步狀態的線程。 */volatile Thread thread;/**下一等待節點*/Node nextWaiter;/**是否共享模式 */final boolean isShared() {return nextWaiter == SHARED;}/**獲取前驅節點 */final Node predecessor() throws NullPointerException {}Node() {}Node(Thread thread, Node mode) {}Node(Thread thread, int waitStatus) {} }AQS基于一個FIFO雙向隊列實現,被設計給那些依賴一個代表狀態的原子int值的同步器使用。我們都知道,既然叫同步器,那個肯定有個代表同步狀態(臨界資源)的東西,在AQS中即為一個叫state的int值,該值通過CAS進行原子修改。
在AQS中存在一個FIFO隊列,隊列中的節點表示被阻塞的線程,隊列節點元素有4種類型, 每種類型表示線程被阻塞的原因,這四種類型分別是:
- CANCELLED : 表示該線程是因為超時或者中斷原因而被放到隊列中
- CONDITION : 表示該線程是因為某個條件不滿足而被放到隊列中,需要等待一個條件,直到條件成立后才會出隊
- SIGNAL : 表示該線程需要被喚醒
- PROPAGATE : 表示在共享模式下,當前節點執行釋放release操作后,當前結點需要傳播通知給后面所有節點
由于一個共享資源同一時間只能由一條線程持有,也可以被多個線程持有,因此AQS中存在兩種模式,如下:
-
1、獨占模式
獨占模式表示共享狀態值state每次能由一條線程持有,其他線程如果需要獲取,則需要阻塞,如JUC中的ReentrantLock
-
2、共享模式
共享模式表示共享狀態值state每次可以由多個線程持有,如JUC中的CountDownLatch
AQS中的共享狀態值
之前提到,AQS是基于一個共享的int類型的state值來實現同步器同步的,其聲明如下:
/*** 同步狀態值*/ private volatile int state;/*** 獲取同步狀態值*/ protected final int getState() {return state; }/*** 修改同步狀態值*/ protected final void setState(int newState) {state = newState; }由源碼我們可以看出,AQS聲明了一個int類型的state值,為了達到多線程同步的功能,必然對該值的修改必須多線程可見,因此,state采用volatile修飾,而且getState()和setState()方法采用final進行修飾,目的是限制AQS的子類只能調用這兩個方法對state的值進行設置和獲取,而不能對其進行重寫自定義設置/獲取邏輯。
AQS中提供對state值修改的方法不僅僅只有setState()和getState(),還有諸如采用CAS機制進行設置的compareAndSetState()方法,同樣,該方法也是采用final修飾的,不允許子類重寫,只能調用。
AQS中的tryXXX方法
一般基于AQS實現的同步器,如ReentrantLock,CountDownLatch等,對于state的獲取操作,子類只需重寫其tryAcquire()和tryAcquireShared()方法即可,這兩個方法分別對應獨占模式和共享模式下對state的獲取操作;而對于釋放操作,子類只需重寫tryRelease()和tryReleaseShared()方法即可。
至于如何維護隊列的出隊、入隊操作,子類不用管,AQS已經幫你做好了。
AQS 設計妙處
CAS自旋鎖
當我們執行一個有確定結果的操作,同時又需要并發正確執行,通常可以采用自旋鎖實現。在AQS中,自旋鎖采用 死循環 + CAS 實現。針對AQS中的enq()進行講解:
private Node enq(final Node node) {// 死循環 + CAS ,解決入隊并發問題/*** 假設有三個線程同時都需要入隊操作,那么使用死循環和CAS可保證并發安全,同一時間只有一個節點安全入隊,入隊失敗的線程則循環重試* * 1、如果不要死循環可以嗎?只用CAS.* 不可以,因為如果其他線程修改了tail的值,導致1處代碼返回false,那么方法enq方法將退出,導致該入隊的節點卻沒能入隊* * 2、如果只用死循環,不需要CAS可以嗎?* 不可以,首先不需要使用CAS,那就沒必要再使用死循環了,再者,如果不使用CAS,那么當執行1處代碼時,將會改變隊列的結構*/for (;;) {// 獲取尾部節點Node t = tail;// 如果還沒有初始化,那么就初始化if (t == null) { // Must initializeif (compareAndSetHead(new Node()))// 剛開始肯定是頭指針和尾指針相等tail = head;} else {// 當前結點的前驅節點等于尾部節點node.prev = t;// 如果當前尾結點仍然是t,那么執行入隊并返回true,否則返回false,然后重試if (compareAndSetTail(t, node)) { // 1t.next = node;return t;}}} }首先入隊操作要求的最終結果必須是一個節點插入到隊列中去,只能成功,不能失敗!然而這個入隊的操作是需要并發執行的,有可能同時有很多的線程需要執行入隊操作,因此我們需要采取相關的線程同步機制。自旋鎖采取樂觀策略,即使用了CAS中的compareAndSet()操作,如果某次執行返回fasle,那么當前操作必須重試,因此,采用for死循環直到成功為止,成功,則break跳出for循環或者直接return操作退出方法。
模板方法
在AQS中,模板方法設計模式體現在其acquire()、release()方法上,我們先來看下源碼:
public final void acquire(int arg) {// 首先嘗試獲取共享狀態,如果獲取成功,則tryAcquire()返回trueif (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}其中調用tryAcquire()方法的默認實現是拋出一個異常,也就是說tryAcquire()方法留給子類去實現,acquire()方法定義了一個模板,一套處理邏輯,相關具體執行方法留給子類去實現。
自定義AQS并發同步器
下邊以JDK文檔的一個實例進行介紹:
class Mutex implements Lock, java.io.Serializable {// 自定義同步器private static class Sync extends AbstractQueuedSynchronizer {// 判斷是否鎖定狀態protected boolean isHeldExclusively() {return getState() == 1;}// 嘗試獲取資源,立即返回。成功則返回true,否則false。public boolean tryAcquire(int acquires) {assert acquires == 1; // 這里限定只能為1個量if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入!setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨占資源return true;}return false;}// 嘗試釋放資源,立即返回。成功則為true,否則false。protected boolean tryRelease(int releases) {assert releases == 1; // 限定為1個量if (getState() == 0)//既然來釋放,那肯定就是已占有狀態了。只是為了保險,多層判斷!throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);//釋放資源,放棄占有狀態return true;}}// 真正同步類的實現都依賴繼承于AQS的自定義同步器!private final Sync sync = new Sync();//lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。public void lock() {sync.acquire(1);}//tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。public boolean tryLock() {return sync.tryAcquire(1);}//unlock<-->release。兩者語文一樣:釋放資源。public void unlock() {sync.release(1);}//鎖是否占有狀態public boolean isLocked() {return sync.isHeldExclusively();} }實現自己的同步類一般都會自定義同步器(sync),并且將該類定義為內部類,供自己使用;而同步類自己(Mutex)則實現某個接口,對外服務。當然,接口的實現要直接依賴sync,它們在語義上也存在某種對應關系!!而sync只用實現資源state的獲取-釋放方式tryAcquire-tryRelelase,至于線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,我們不用關心。
除了Mutex,ReentrantLock/CountDownLatch/Semphore這些同步類的實現方式都差不多,不同的地方就在獲取-釋放資源的方式tryAcquire-tryRelelase。掌握了這點,AQS的核心便被攻破了!
總結
- 上一篇: 【完整代码】使用Semaphore实现线
- 下一篇: AQS源码