聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)
AQS是AbstractQueuedSynchronizer的縮寫,AQS是Java并包里大部分同步器的基礎構件,利用AQS可以很方便的創建鎖和同步器。它封裝了一個狀態,提供了一系列的獲取和釋放操作,這些獲取和釋放操作都是基于狀態的。它的基本思想是由AQS負責管理同步器類中的狀態,其他的同步器比如可重入鎖ReentrantLock, 信號量Semaphore基于各自的特點來調用AQS提供了基礎能力進行狀態的同步。
?
在AQS的Javadoc里面提到它是CLHLock的變種,在聊聊高并發(八)實現幾種自旋鎖(三)?這篇文章中我們說了如何利用CLH鎖來構件自旋鎖,回顧一下CLHLock的一些基本特點:
1. CLHLock是一種隊列自旋鎖的實現,提供了FIFO先來先服務的公平性
2. 利用一個原子變量AtomicReference tail的CAS操作來構件一個虛擬的鏈式結構
3. 節點Node維護一個volatile狀態,維護一個prev指針指向前一個節點,獲取鎖時每個線程在prev節點的狀態上自旋
4. 當線程釋放鎖時,只需要修改自身狀態即可,后續節點會觀察到volatile狀態的改動而獲取鎖
?
AQS既然是CLHLock的一種變種,那么
1. 也維護以了一個基本的隊列結構
2. 也是提供了一個Tail指針從隊尾通過CAS操作入隊列。
3. 提供了一個volatile類型的int值來維護狀態
?
?public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
?private transient volatile Node head;
??? private transient volatile Node tail;
??? private volatile int state;
??? protected final int getState() {
??????? return state;
??? }
??
??? protected final void setState(int newState) {
??????? state = newState;
??? }
??? protected final boolean compareAndSetState(int expect, int update) {
??????? // See below for intrinsics setup to support this
??????? return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
??? }
..................
}
?
與標準CLHLock實現不同的是,AQS不是一個自旋鎖,它提供了更加豐富的語意:
1. 提供了獨享(exclusive)方式和共享(share)方式來獲取/釋放,比如鎖是獨占方式的,信號量semaphore是共享方式的,可以有多個線程進入臨界區?
2. 支持可中斷和不可中斷的獲取/釋放
3. 支持普通的和具有時間限制的獲取/釋放
4. 提供了自旋和阻塞的切換,可以先自旋,如果等待時間長,可以阻塞
?
?/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
AQS定義了兩個內部類來輔助它的實現,一個是Node定義了隊列中的節點,另一個是ConditionObject,是Condition接口的實現類,負責管理條件隊列。關于條件隊列更多內容可以看這篇?聊聊高并發(十四)理解Java中的管程,條件隊列,Condition以及實現一個阻塞隊列
?
先看下Node類,它比CLHLock中的Node有更多屬性,除了完成基本的隊列功能,還維護了是獨享還是共享的模式信息
1. 維護了一個Node SHARED引用表示共享模式
2. 維護了一個Node EXCLUSIVE引用表示獨占模式
3. 維護了幾種節點等待的狀態 waitStatus, 其中CANCELLED = 1是正數,表示取消狀態,SIGNAL = -1,CONDITION = -2, PROPAGATE = -3都是負數,表示節點在條件隊列的某個狀態,SIGNAL表示后續節點需要被喚醒
4. 維護了Node prev引用,指向隊列中的前一個節點,通過Tail的CAS操作來創建
5. 維護了Node next引用,指向隊列中的下一個節點,也是在通過Tail入隊列的時候設置的,這樣就維護了一個雙向隊列
6. 維護了一個volatile的Thread引用,把一個節點關聯到一個線程
7. 維護了Node nextWaiter引用,指向在條件隊列中的下一個正在等待的節點,是給條件隊列使用的。值得注意的是條件隊列只有在獨享狀態下才使用
?
?static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
?
再看一下ConditionObject,它是條件Condition接口的具體實現,維護了一個條件隊列,條件隊列是通過Node來構件的一個單向鏈表結構。底層的條件操作(等待和喚醒)使用LockSupport類來實現,在這篇中我們說了LockSupport底層使用sun.misc.Unsafe來提供條件隊列的park和unpark操作。聊聊高并發(十七)解析java.util.concurrent各個組件(一) 了解sun.misc.Unsafe類
1. 維護了一個Node firstWaiter引用指向條件隊列的隊首節點
2. 維護了一個Node lastWaiter引用指向條件隊列的隊尾節點
3. 條件隊列支持節點的取消退出機制,CANCELLED節點來表示這種取消狀態
4. 支持限時等待機制
5. 支持可中斷和不可中斷的等待
我們來看幾個典型的條件隊列的操作實現
往條件隊列里面加入一個等待節點,這個是await()方法的基本操作
1. 判斷尾節點的狀態是不是等待某個條件的狀態(CONDITION),如果不是,就把CANCELLED節點從隊列中踢出,然后把自己標記為尾節點
?
?public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
???????? * Adds a new waiter to wait queue.
???????? * @return its new wait node
???????? */
??????? private Node addConditionWaiter() {
??????????? Node t = lastWaiter;
??????????? // If lastWaiter is cancelled, clean out.
??????????? if (t != null && t.waitStatus != Node.CONDITION) {
??????????????? unlinkCancelledWaiters();
??????????????? t = lastWaiter;
??????????? }
??????????? Node node = new Node(Thread.currentThread(), Node.CONDITION);
??????????? if (t == null)
??????????????? firstWaiter = node;
??????????? else
??????????????? t.nextWaiter = node;
??????????? lastWaiter = node;
??????????? return node;
??????? }
private void unlinkCancelledWaiters() {
??????????? Node t = firstWaiter;
??????????? Node trail = null;
??????????? while (t != null) {
??????????????? Node next = t.nextWaiter;
??????????????? if (t.waitStatus != Node.CONDITION) {
??????????????????? t.nextWaiter = null;
??????????????????? if (trail == null)
??????????????????????? firstWaiter = next;
??????????????????? else
??????????????????????? trail.nextWaiter = next;
??????????????????? if (next == null)
??????????????????????? lastWaiter = trail;
??????????????? }
??????????????? else
??????????????????? trail = t;
??????????????? t = next;
??????????? }
??????? }
.................
}
從條件隊列中喚醒一個節點,實際上doSignal只是把一個節點從條件隊列中移除,然后加入到同步隊列,并設置它在同步隊列的前置節點的waitStatus = SIGNAL, 如果設置失敗或者取消在條件隊列等待,直接把這個節點的線程unpark喚醒,需要注意的是unpark操作只是把線程從等待狀態轉化為可運行狀態,并不直接獲得鎖。
?
??public final void signal() {
??????????? if (!isHeldExclusively())
??????????????? throw new IllegalMonitorStateException();
??????????? Node first = firstWaiter;
??????????? if (first != null)
??????????????? doSignal(first);
??????? }
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
? ?final boolean transferForSignal(Node node) {
??????? /*
???????? * If cannot change waitStatus, the node has been cancelled.
???????? */
??????? if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
??????????? return false;
??????? /*
???????? * Splice onto queue and try to set waitStatus of predecessor to
???????? * indicate that thread is (probably) waiting. If cancelled or
???????? * attempt to set waitStatus fails, wake up to resync (in which
???????? * case the waitStatus can be transiently and harmlessly wrong).
???????? */
??????? Node p = enq(node);
??????? int ws = p.waitStatus;
??????? if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
??????????? LockSupport.unpark(node.thread);
??????? return true;
??? }
Java線程的幾種狀態如下
?
支持中斷的等待操作,?主要做了兩個事情:新建一個Node進入條件隊列等待被喚醒;從同步隊列中移除并釋放鎖。它會相應線程的中斷拋出中斷異常,并且記錄中斷狀態
?
?public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
不可中斷的等待,也是先進入條件隊列等待,并從同步隊列出隊列,釋放鎖。但是它不相應線程中斷狀態
?
?public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
限時等待,也是先進入條件隊列等待,然后釋放鎖。輪詢等待時間,當超時后再次進入同步隊列,等待獲得鎖。如果獲得了鎖,就返回false. 如果在等待時被喚醒,就進入同步隊列,等待獲得鎖,如果獲得鎖就返回true
?
?public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
if (unit == null)
throw new NullPointerException();
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
AQS使用了Unsafe直接操作內存來對字段進行CAS操作和設置值。
?
?private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
總結
以上是生活随笔為你收集整理的聊聊高并发(二十一)解析java.util.concurrent各个组件(三) 深入理解AQS(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(二十)解析java.util
- 下一篇: 聊聊高并发(二十二)解析java.uti