转:AbstractQueuedSynchronizer的介绍和原理分析
引自:http://ifeve.com/introduce-abstractqueuedsynchronizer/
簡介
提供了一個基于FIFO隊列,可以用于構(gòu)建鎖或者其他相關(guān)同步裝置的基礎(chǔ)框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態(tài),期望它能夠成為實現(xiàn)大部分同步需求的基礎(chǔ)。使用的方法是繼承,子類通過繼承同步器并需要實現(xiàn)它的方法來管理其狀態(tài),管理的方式就是通過類似acquire和release的方式來操縱狀態(tài)。然而多線程環(huán)境中對狀態(tài)的操縱必須確保原子性,因此子類對于狀態(tài)的把握,需要使用這個同步器提供的以下三個方法對狀態(tài)進行操作:
- java.util.concurrent.locks.AbstractQueuedSynchronizer.getState()
- java.util.concurrent.locks.AbstractQueuedSynchronizer.setState(int)
- java.util.concurrent.locks.AbstractQueuedSynchronizer.compareAndSetState(int, int)
子類推薦被定義為自定義同步裝置的內(nèi)部類,同步器自身沒有實現(xiàn)任何同步接口,它僅僅是定義了若干acquire之類的方法來供使用。該同步器即可以作為排他模式也可以作為共享模式,當它被定義為一個排他模式時,其他線程對其的獲取就被阻止,而共享模式對于多個線程獲取都可以成功。
同步器是實現(xiàn)鎖的關(guān)鍵,利用同步器將鎖的語義實現(xiàn),然后在鎖的實現(xiàn)中聚合同步器。可以這樣理解:鎖的API是面向使用者的,它定義了與鎖交互的公共行為,而每個鎖需要完成特定的操作也是透過這些行為來完成的(比如:可以允許兩個線程進行加鎖,排除兩個以上的線程),但是實現(xiàn)是依托給同步器來完成;同步器面向的是線程訪問和資源控制,它定義了線程對資源是否能夠獲取以及線程的排隊等操作。鎖和同步器很好的隔離了二者所需要關(guān)注的領(lǐng)域,嚴格意義上講,同步器可以適用于除了鎖以外的其他同步設(shè)施上(包括鎖)。
同步器的開始提到了其實現(xiàn)依賴于一個FIFO隊列,那么隊列中的元素Node就是保存著線程引用和線程狀態(tài)的容器,每個線程對同步器的訪問,都可以看做是隊列中的一個節(jié)點。Node的主要包含以下成員變量:
Node {int waitStatus;Node prev;Node next;Node nextWaiter;Thread thread; }
?
以上五個成員變量主要負責(zé)保存該節(jié)點的線程引用,同步等待隊列(以下簡稱sync隊列)的前驅(qū)和后繼節(jié)點,同時也包括了同步狀態(tài)。
| 屬性名稱 | 描述 |
| int waitStatus | 表示節(jié)點的狀態(tài)。其中包含的狀態(tài)有: ?
|
| Node prev | 前驅(qū)節(jié)點,比如當前節(jié)點被取消,那就需要前驅(qū)節(jié)點和后繼節(jié)點來完成連接。 |
| Node next | 后繼節(jié)點。 |
| Node nextWaiter | 存儲condition隊列中的后繼節(jié)點。 |
| Thread thread | 入隊列時的當前線程。 |
節(jié)點成為sync隊列和condition隊列構(gòu)建的基礎(chǔ),在同步器中就包含了sync隊列。同步器擁有三個成員變量:sync隊列的頭結(jié)點head、sync隊列的尾節(jié)點tail和狀態(tài)state。對于鎖的獲取,請求形成節(jié)點,將其掛載在尾部,而鎖資源的轉(zhuǎn)移(釋放再獲取)是從頭部開始向后進行。對于同步器維護的狀態(tài)state,多個線程對其的獲取將會產(chǎn)生一個鏈式的結(jié)構(gòu)。
API說明
實現(xiàn)自定義同步器時,需要使用同步器提供的getState()、setState()和compareAndSetState()方法來操縱狀態(tài)的變遷。
| 方法名稱 | 描述 |
| protected boolean tryAcquire(int arg) | 排它的獲取這個狀態(tài)。這個方法的實現(xiàn)需要查詢當前狀態(tài)是否允許獲取,然后再進行獲取(使用compareAndSetState來做)狀態(tài)。 |
| protected boolean tryRelease(int arg)? | 釋放狀態(tài)。 |
| protected int tryAcquireShared(int arg) | 共享的模式下獲取狀態(tài)。 |
| protected boolean tryReleaseShared(int arg) | 共享的模式下釋放狀態(tài)。 |
| protected boolean isHeldExclusively() | 在排它模式下,狀態(tài)是否被占用。 |
實現(xiàn)這些方法必須是非阻塞而且是線程安全的,推薦使用該同步器的父類java.util.concurrent.locks.AbstractOwnableSynchronizer來設(shè)置當前的線程。
開始提到同步器內(nèi)部基于一個FIFO隊列,對于一個獨占鎖的獲取和釋放有以下偽碼可以表示。
獲取一個排他鎖。
while(獲取鎖) {if (獲取到) {退出while循環(huán)} else {if(當前線程沒有入隊列) {那么入隊列}阻塞當前線程} }
?
釋放一個排他鎖。
if (釋放成功) {刪除頭結(jié)點激活原頭結(jié)點的后繼節(jié)點 }
?
示例
下面通過一個排它鎖的例子來深入理解一下同步器的工作原理,而只有掌握同步器的工作原理才能夠更加深入了解其他的并發(fā)組件。
排他鎖的實現(xiàn),一次只能一個線程獲取到鎖。
class Mutex implements Lock, java.io.Serializable {// 內(nèi)部類,自定義同步器private static class Sync extends AbstractQueuedSynchronizer {// 是否處于占用狀態(tài)protected boolean isHeldExclusively() {return getState() == 1;}// 當狀態(tài)為0的時候獲取鎖public boolean tryAcquire(int acquires) {assert acquires == 1; // Otherwise unusedif (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}// 釋放鎖,將狀態(tài)設(shè)置為0protected boolean tryRelease(int releases) {assert releases == 1; // Otherwise unusedif (getState() == 0) throw new IllegalMonitorStateException();setExclusiveOwnerThread(null);setState(0);return true;}// 返回一個Condition,每個condition都包含了一個condition隊列Condition newCondition() { return new ConditionObject(); }}// 僅需要將操作代理到Sync上即可private final Sync sync = new Sync();public void lock() { sync.acquire(1); }public boolean tryLock() { return sync.tryAcquire(1); }public void unlock() { sync.release(1); }public Condition newCondition() { return sync.newCondition(); }public boolean isLocked() { return sync.isHeldExclusively(); }public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}}
?
可以看到Mutex將Lock接口均代理給了同步器的實現(xiàn)。
使用方將Mutex構(gòu)造出來之后,調(diào)用lock獲取鎖,調(diào)用unlock進行解鎖。下面以Mutex為例子,詳細分析以下同步器的實現(xiàn)邏輯。
實現(xiàn)分析
public final void acquire(int arg)
該方法以排他的方式獲取鎖,對中斷不敏感,完成synchronized語義。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }
?
上述邏輯主要包括:
1. 嘗試獲取(調(diào)用tryAcquire更改狀態(tài),需要保證原子性);
在tryAcquire方法中使用了同步器提供的對state操作的方法,利用compareAndSet保證只有一個線程能夠?qū)顟B(tài)進行成功修改,而沒有成功修改的線程將進入sync隊列排隊。
2. 如果獲取不到,將當前線程構(gòu)造成節(jié)點Node并加入sync隊列;
進入隊列的每個線程都是一個節(jié)點Node,從而形成了一個雙向隊列,類似CLH隊列,這樣做的目的是線程間的通信會被限制在較小規(guī)模(也就是兩個節(jié)點左右)。
3. 再次嘗試獲取,如果沒有獲取到那么將當前線程從線程調(diào)度器上摘下,進入等待狀態(tài)。
使用LockSupport將當前線程unpark,關(guān)于LockSupport后續(xù)會詳細介紹。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// 快速嘗試在尾部添加Node pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node; }private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}} }
?
上述邏輯主要包括:
1. 使用當前線程構(gòu)造Node;
對于一個節(jié)點需要做的是將當節(jié)點前驅(qū)節(jié)點指向尾節(jié)點(current.prev = tail),尾節(jié)點指向它(tail = current),原有的尾節(jié)點的后繼節(jié)點指向它(t.next = current)而這些操作要求是原子的。上面的操作是利用尾節(jié)點的設(shè)置來保證的,也就是compareAndSetTail來完成的。
2. 先行嘗試在隊尾添加;
如果尾節(jié)點已經(jīng)有了,然后做如下操作:
(1)分配引用T指向尾節(jié)點;
(2)將節(jié)點的前驅(qū)節(jié)點更新為尾節(jié)點(current.prev = tail);
(3)如果尾節(jié)點是T,那么將當尾節(jié)點設(shè)置為該節(jié)點(tail = current,原子更新);
(4)T的后繼節(jié)點指向當前節(jié)點(T.next = current)。
注意第3點是要求原子的。
這樣可以以最短路徑O(1)的效果來完成線程入隊,是最大化減少開銷的一種方式。
3. 如果隊尾添加失敗或者是第一個入隊的節(jié)點。
如果是第1個節(jié)點,也就是sync隊列沒有初始化,那么會進入到enq這個方法,進入的線程可能有多個,或者說在addWaiter中沒有成功入隊的線程都將進入enq這個方法。
可以看到enq的邏輯是確保進入的Node都會有機會順序的添加到sync隊列中,而加入的步驟如下:
(1)如果尾節(jié)點為空,那么原子化的分配一個頭節(jié)點,并將尾節(jié)點指向頭節(jié)點,這一步是初始化;
(2)然后是重復(fù)在addWaiter中做的工作,但是在一個while(true)的循環(huán)中,直到當前節(jié)點入隊為止。
進入sync隊列之后,接下來就是要進行鎖的獲取,或者說是訪問控制了,只有一個線程能夠在同一時刻繼續(xù)的運行,而其他的進入等待狀態(tài)。而每個線程都是一個獨立的個體,它們自省的觀察,當條件滿足的時候(自己的前驅(qū)是頭結(jié)點并且原子性的獲取了狀態(tài)),那么這個線程能夠繼續(xù)運行。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }
?
上述邏輯主要包括:
1. 獲取當前節(jié)點的前驅(qū)節(jié)點;
需要獲取當前節(jié)點的前驅(qū)節(jié)點,而頭結(jié)點所對應(yīng)的含義是當前站有鎖且正在運行。
2. 當前驅(qū)節(jié)點是頭結(jié)點并且能夠獲取狀態(tài),代表該當前節(jié)點占有鎖;
如果滿足上述條件,那么代表能夠占有鎖,根據(jù)節(jié)點對鎖占有的含義,設(shè)置頭結(jié)點為當前節(jié)點。
3. 否則進入等待狀態(tài)。
如果沒有輪到當前節(jié)點運行,那么將當前線程從線程調(diào)度器上摘下,也就是進入等待狀態(tài)。
這里針對acquire做一下總結(jié):
1. 狀態(tài)的維護;
需要在鎖定時,需要維護一個狀態(tài)(int類型),而對狀態(tài)的操作是原子和非阻塞的,通過同步器提供的對狀態(tài)訪問的方法對狀態(tài)進行操縱,并且利用compareAndSet來確保原子性的修改。
2. 狀態(tài)的獲取;
一旦成功的修改了狀態(tài),當前線程或者說節(jié)點,就被設(shè)置為頭節(jié)點。
3. sync隊列的維護。
在獲取資源未果的過程中條件不符合的情況下(不該自己,前驅(qū)節(jié)點不是頭節(jié)點或者沒有獲取到資源)進入睡眠狀態(tài),停止線程調(diào)度器對當前節(jié)點線程的調(diào)度。
這時引入的一個釋放的問題,也就是說使睡眠中的Node或者說線程獲得通知的關(guān)鍵,就是前驅(qū)節(jié)點的通知,而這一個過程就是釋放,釋放會通知它的后繼節(jié)點從睡眠中返回準備運行。
下面的流程圖基本描述了一次acquire所需要經(jīng)歷的過程:
如上圖所示,其中的判定退出隊列的條件,判定條件是否滿足和休眠當前線程就是完成了自旋spin的過程。
public final boolean release(int arg)
在unlock方法的實現(xiàn)中,使用了同步器的release方法。相對于在之前的acquire方法中可以得出調(diào)用acquire,保證能夠獲取到鎖(成功獲取狀態(tài)),而release則表示將狀態(tài)設(shè)置回去,也就是將資源釋放,或者說將鎖釋放。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false; }
?
上述邏輯主要包括:
1. 嘗試釋放狀態(tài);
tryRelease能夠保證原子化的將狀態(tài)設(shè)置回去,當然需要使用compareAndSet來保證。如果釋放狀態(tài)成功過之后,將會進入后繼節(jié)點的喚醒過程。
2. 喚醒當前節(jié)點的后繼節(jié)點所包含的線程。
通過LockSupport的unpark方法將休眠中的線程喚醒,讓其繼續(xù)acquire狀態(tài)。
private void unparkSuccessor(Node node) {// 將狀態(tài)設(shè)置為同步狀態(tài)int ws = node.waitStatus;if (ws < 0) compareAndSetWaitStatus(node, ws, 0); // 獲取當前節(jié)點的后繼節(jié)點,如果滿足狀態(tài),那么進行喚醒操作 // 如果沒有滿足狀態(tài),從尾部開始找尋符合要求的節(jié)點并將其喚醒 Node s = node.next; if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread); }
?
上述邏輯主要包括,該方法取出了當前節(jié)點的next引用,然后對其線程(Node)進行了喚醒,這時就只有一個或合理個數(shù)的線程被喚醒,被喚醒的線程繼續(xù)進行對資源的獲取與爭奪。
回顧整個資源的獲取和釋放過程:
在獲取時,維護了一個sync隊列,每個節(jié)點都是一個線程在進行自旋,而依據(jù)就是自己是否是首節(jié)點的后繼并且能夠獲取資源;
在釋放時,僅僅需要將資源還回去,然后通知一下后繼節(jié)點并將其喚醒。
這里需要注意,隊列的維護(首節(jié)點的更換)是依靠消費者(獲取時)來完成的,也就是說在滿足了自旋退出的條件時的一刻,這個節(jié)點就會被設(shè)置成為首節(jié)點。
protected boolean tryAcquire(int arg)
tryAcquire是自定義同步器需要實現(xiàn)的方法,也就是自定義同步器非阻塞原子化的獲取狀態(tài),如果鎖該方法一般用于Lock的tryLock實現(xiàn)中,這個特性是synchronized無法提供的。
public final void acquireInterruptibly(int arg)
該方法提供獲取狀態(tài)能力,當然在無法獲取狀態(tài)的情況下會進入sync隊列進行排隊,這類似acquire,但是和acquire不同的地方在于它能夠在外界對當前線程進行中斷的時候提前結(jié)束獲取狀態(tài)的操作,換句話說,就是在類似synchronized獲取鎖時,外界能夠?qū)Ξ斍熬€程進行中斷,并且獲取鎖的這個操作能夠響應(yīng)中斷并提前返回。一個線程處于synchronized塊中或者進行同步I/O操作時,對該線程進行中斷操作,這時該線程的中斷標識位被設(shè)置為true,但是線程依舊繼續(xù)運行。
如果在獲取一個通過網(wǎng)絡(luò)交互實現(xiàn)的鎖時,這個鎖資源突然進行了銷毀,那么使用acquireInterruptibly的獲取方式就能夠讓該時刻嘗試獲取鎖的線程提前返回。而同步器的這個特性被實現(xiàn)Lock接口中的lockInterruptibly方法。根據(jù)Lock的語義,在被中斷時,lockInterruptibly將會拋出InterruptedException來告知使用者。
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg); }private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}// 檢測中斷標志位if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }
?
上述邏輯主要包括:
1. 檢測當前線程是否被中斷;
判斷當前線程的中斷標志位,如果已經(jīng)被中斷了,那么直接拋出異常并將中斷標志位設(shè)置為false。
2. 嘗試獲取狀態(tài);
調(diào)用tryAcquire獲取狀態(tài),如果順利會獲取成功并返回。
3. 構(gòu)造節(jié)點并加入sync隊列;
獲取狀態(tài)失敗后,將當前線程引用構(gòu)造為節(jié)點并加入到sync隊列中。退出隊列的方式在沒有中斷的場景下和acquireQueued類似,當頭結(jié)點是自己的前驅(qū)節(jié)點并且能夠獲取到狀態(tài)時,即可以運行,當然要將本節(jié)點設(shè)置為頭結(jié)點,表示正在運行。
4. 中斷檢測。
在每次被喚醒時,進行中斷檢測,如果發(fā)現(xiàn)當前線程被中斷,那么拋出InterruptedException并退出循環(huán)。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
該方法提供了具備有超時功能的獲取狀態(tài)的調(diào)用,如果在指定的nanosTimeout內(nèi)沒有獲取到狀態(tài),那么返回false,反之返回true。可以將該方法看做acquireInterruptibly的升級版,也就是在判斷是否被中斷的基礎(chǔ)上增加了超時控制。
針對超時控制這部分的實現(xiàn),主要需要計算出睡眠的delta,也就是間隔值。間隔可以表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠之前記錄的時間)。如果nanosTimeout大于0,那么還需要使當前線程睡眠,反之則返回false。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {long lastTime = System.nanoTime();final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return true;}if (nanosTimeout <= 0) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);long now = System.nanoTime();//計算時間,當前時間減去睡眠之前的時間得到睡眠的時間,然后被//原有超時時間減去,得到了還應(yīng)該睡眠的時間nanosTimeout -= now - lastTime;lastTime = now;if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }
?
上述邏輯主要包括:
1. 加入sync隊列;
將當前線程構(gòu)造成為節(jié)點Node加入到sync隊列中。
2. 條件滿足直接返回;
退出條件判斷,如果前驅(qū)節(jié)點是頭結(jié)點并且成功獲取到狀態(tài),那么設(shè)置自己為頭結(jié)點并退出,返回true,也就是在指定的nanosTimeout之前獲取了鎖。
3. 獲取狀態(tài)失敗休眠一段時間;
通過LockSupport.unpark來指定當前線程休眠一段時間。
4. 計算再次休眠的時間;
喚醒后的線程,計算仍需要休眠的時間,該時間表示為nanosTimeout = 原有nanosTimeout – now(當前時間)+ lastTime(睡眠之前記錄的時間)。其中now – lastTime表示這次睡眠所持續(xù)的時間。
5. 休眠時間的判定。
喚醒后的線程,計算仍需要休眠的時間,并無阻塞的嘗試再獲取狀態(tài),如果失敗后查看其nanosTimeout是否大于0,如果小于0,那么返回完全超時,沒有獲取到鎖。 如果nanosTimeout小于等于1000L納秒,則進入快速的自旋過程。那么快速自旋會造成處理器資源緊張嗎?結(jié)果是不會,經(jīng)過測算,開銷看起來很小,幾乎微乎其微。Doug Lea應(yīng)該測算了在線程調(diào)度器上的切換造成的額外開銷,因此在短時1000納秒內(nèi)就讓當前線程進入快速自旋狀態(tài),如果這時再休眠相反會讓nanosTimeout的獲取時間變得更加不精確。
上述過程可以如下圖所示:
上述這個圖中可以理解為在類似獲取狀態(tài)需要排隊的基礎(chǔ)上增加了一個超時控制的邏輯。每次超時的時間就是當前超時剩余的時間減去睡眠的時間,而在這個超時時間的基礎(chǔ)上進行了判斷,如果大于0那么繼續(xù)睡眠(等待),可以看出這個超時版本的獲取狀態(tài)只是一個近似超時的獲取狀態(tài),因此任何含有超時的調(diào)用基本結(jié)果就是近似于給定超時。
public final void acquireShared(int arg)
調(diào)用該方法能夠以共享模式獲取狀態(tài),共享模式和之前的獨占模式有所區(qū)別。以文件的查看為例,如果一個程序在對其進行讀取操作,那么這一時刻,對這個文件的寫操作就被阻塞,相反,這一時刻另一個程序?qū)ζ溥M行同樣的讀操作是可以進行的。如果一個程序在對其進行寫操作,那么所有的讀與寫操作在這一時刻就被阻塞,直到這個程序完成寫操作。
以讀寫場景為例,描述共享和獨占的訪問模式,如下圖所示:
上圖中,紅色代表被阻塞,綠色代表可以通過。
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }
?
上述邏輯主要包括:
1. 嘗試獲取共享狀態(tài);
調(diào)用tryAcquireShared來獲取共享狀態(tài),該方法是非阻塞的,如果獲取成功則立刻返回,也就表示獲取共享鎖成功。
2. 獲取失敗進入sync隊列;
在獲取共享狀態(tài)失敗后,當前時刻有可能是獨占鎖被其他線程所把持,那么將當前線程構(gòu)造成為節(jié)點(共享模式)加入到sync隊列中。
3. 循環(huán)內(nèi)判斷退出隊列條件;
如果當前節(jié)點的前驅(qū)節(jié)點是頭結(jié)點并且獲取共享狀態(tài)成功,這里和獨占鎖acquire的退出隊列條件類似。
4. 獲取共享狀態(tài)成功;
在退出隊列的條件上,和獨占鎖之間的主要區(qū)別在于獲取共享狀態(tài)成功之后的行為,而如果共享狀態(tài)獲取成功之后會判斷后繼節(jié)點是否是共享模式,如果是共享模式,那么就直接對其進行喚醒操作,也就是同時激發(fā)多個線程并發(fā)的運行。
5. 獲取共享狀態(tài)失敗。
通過使用LockSupport將當前線程從線程調(diào)度器上摘下,進入休眠狀態(tài)。
對于上述邏輯中,節(jié)點之間的通知過程如下圖所示:
上圖中,綠色表示共享節(jié)點,它們之間的通知和喚醒操作是在前驅(qū)節(jié)點獲取狀態(tài)時就進行的,紅色表示獨占節(jié)點,它的被喚醒必須取決于前驅(qū)節(jié)點的釋放,也就是release操作,可以看出來圖中的獨占節(jié)點如果要運行,必須等待前面的共享節(jié)點均釋放了狀態(tài)才可以。而獨占節(jié)點如果獲取了狀態(tài),那么后續(xù)的獨占式獲取和共享式獲取均被阻塞。
public final boolean releaseShared(int arg)
調(diào)用該方法釋放共享狀態(tài),每次獲取共享狀態(tài)acquireShared都會操作狀態(tài),同樣在共享鎖釋放的時候,也需要將狀態(tài)釋放。比如說,一個限定一定數(shù)量訪問的同步工具,每次獲取都是共享的,但是如果超過了一定的數(shù)量,將會阻塞后續(xù)的獲取操作,只有當之前獲取的消費者將狀態(tài)釋放才可以使阻塞的獲取操作得以運行。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }
?
上述邏輯主要就是調(diào)用同步器的tryReleaseShared方法來釋放狀態(tài),并同時在doReleaseShared方法中喚醒其后繼節(jié)點。
一個例子
在上述對同步器AbstractQueuedSynchronizer進行了實現(xiàn)層面的分析之后,我們通過一個例子來加深對同步器的理解:
設(shè)計一個同步工具,該工具在同一時刻,只能有兩個線程能夠并行訪問,超過限制的其他線程進入阻塞狀態(tài)。
對于這個需求,可以利用同步器完成一個這樣的設(shè)定,定義一個初始狀態(tài),為2,一個線程進行獲取那么減1,一個線程釋放那么加1,狀態(tài)正確的范圍在[0,1,2]三個之間,當在0時,代表再有新的線程對資源進行獲取時只能進入阻塞狀態(tài)(注意在任何時候進行狀態(tài)變更的時候均需要以CAS作為原子性保障)。由于資源的數(shù)量多于1個,同時可以有兩個線程占有資源,因此需要實現(xiàn)tryAcquireShared和tryReleaseShared方法,這里謝謝luoyuyou和同事小明指正,已經(jīng)修改了實現(xiàn)。
public class TwinsLock implements Lock {private final Sync sync = new Sync(2);private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -7889272986162341211L;Sync(int count) {if (count <= 0) {throw new IllegalArgumentException("count must large than zero.");}setState(count);}public int tryAcquireShared(int reduceCount) {for (;;) {int current = getState();int newCount = current - reduceCount;if (newCount < 0 || compareAndSetState(current, newCount)) {return newCount;}}}public boolean tryReleaseShared(int returnCount) {for (;;) {int current = getState();int newCount = current + returnCount;if (compareAndSetState(current, newCount)) {return true;}}}}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean tryLock() {return sync.tryAcquireShared(1) >= 0;}public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(time));}public void unlock() {sync.releaseShared(1);}@Overridepublic Condition newCondition() {return null;} }
?
這里我們編寫一個測試來驗證TwinsLock是否能夠正常工作并達到預(yù)期。
public class TwinsLockTest {@Testpublic void test() {final Lock lock = new TwinsLock();class Worker extends Thread {public void run() {while (true) {lock.lock();try {Thread.sleep(1000L);System.out.println(Thread.currentThread());Thread.sleep(1000L);} catch (Exception ex) {} finally {lock.unlock();}}}}for (int i = 0; i < 10; i++) {Worker w = new Worker();w.start();}new Thread() {public void run() {while (true) {try {Thread.sleep(200L);System.out.println();} catch (Exception ex) {}}}}.start();try {Thread.sleep(20000L);} catch (InterruptedException e) {e.printStackTrace();}} }
?
上述測試用例的邏輯主要包括:
?1. 打印線程
Worker在兩次睡眠之間打印自身線程,如果一個時刻只能有兩個線程同時訪問,那么打印出來的內(nèi)容將是成對出現(xiàn)。
?2. 分隔線程
不停的打印換行,能讓W(xué)orker的輸出看起來更加直觀。
該測試的結(jié)果是在一個時刻,僅有兩個線程能夠獲得到鎖,并完成打印,而表象就是打印的內(nèi)容成對出現(xiàn)。
轉(zhuǎn)載于:https://www.cnblogs.com/x-jingxin/p/10671913.html
總結(jié)
以上是生活随笔為你收集整理的转:AbstractQueuedSynchronizer的介绍和原理分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 求一个好听的舞蹈队名字!
- 下一篇: 求一个有内涵个性签名