聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)
上一篇介紹了AQS的基本設計思路以及兩個內部類Node和ConditionObject的實現?聊聊高并發(二十一)解析java.util.concurrent各個組件(三) 深入理解AQS(一)?這篇說一說AQS的主要方法的實現。AQS和CLHLock的最大區別是,CLHLock是自旋鎖,而AQS使用Unsafe的park操作讓線程進入等待(阻塞)。
?
線程加入同步隊列,和CLHLock一樣,從隊尾入隊列,使用CAS+輪詢的方式實現無鎖化。入隊列后設置節點的prev和next引用,形成雙向鏈表的結構
?
?private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
線程指定獨享還是共享方式加入隊列,先嘗試加入一次,如果失敗再用enq()輪詢地嘗試,比如addWaiter(Node.EXCLUSIVE), addWaiter(Node.SHARED)
?
?
?private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
?
喚醒后繼節點,最典型的情況就是在線程釋放鎖后,會喚醒后繼節點。會從節點的next開始,找到一個后繼節點,如果next是null,就從隊尾開始往head找,直到找到最靠近當前節點的后續節點。 waitStatus <= 0的隱含意思是線程沒有被取消。 然后用LockSupport喚醒這個找到的后繼節點的線程。
這個方法類似于CLHLock里面釋放鎖時,通知后續節點來獲取鎖。AQS使用了阻塞的方式,所以這個方法的后續方法是acquireXXX方法,它負責將后續節點喚醒,后續節點再根據狀態去判斷是否獲得鎖
?
?
?private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
共享模式下的釋放操作,從隊首開始向隊尾擴散,如果節點的waitStatu是SIGNAL,就喚醒后繼節點,如果waitStatus是0,就設置標記成PROPAGATE
?
?
?private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
取消獲取操作,要把節點從同步隊列中去除,通過鏈表操作將它的前置節點的next指向它的后繼節點集合。如果該節點是在隊尾,直接刪除即可,否則要通知后繼節點去獲取鎖
?
?
?private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
?
獨占模式并且不可中斷地獲取隊列鎖的操作,這個方法在ConditionObject.await()中被使用,當線程被Unsafe.unpark喚醒后,需要調用acquireQueued來獲取鎖,從而結束await(). accquireQueued()方法要么獲得鎖,要么被tryAcquire方法拋出的異常打斷,如果拋出異常,最后在finally里面取消獲取
值得注意的是只有節點的前驅節點是head的時候,才能獲得鎖。這里隱含了一個意思,就是head指向當前獲得鎖的節點。當程序進入if(p == head and tryAcquire(arg))這個分支時,表示線程獲得了鎖或者被中斷,將自己設置為head,將next設置為null.
shouldParkAfterFailedAcquired()方法的目的是將節點的前驅節點的waitStatus設置為SIGNAL,表示會通知后續節點,這樣后續節點才能放心去park,而不用擔心被丟失喚醒的通知。
parkAndCheckInterupt()方法會真正執行阻塞,并返回中斷狀態,這個方法有兩種情況返回,一種是park被unpark喚醒,這時候中斷狀態為false。另一種情況是park被中斷了,由于這個accquireQueued方法是不可中斷的版本,所以即使線程被中斷了,也只是設置了中斷標志為true,沒有跑出中斷異常。在支持中斷的獲取版本里,這時會拋出中斷異常。
這個方法可以理解為Lock的lock里沒有獲取鎖的分支,在CLHLock自旋鎖的實現里,是對前驅節點的狀態自旋,而AQS是阻塞,所以這里是在同步隊列里面進入了阻塞狀態,等待被前驅節點釋放鎖時喚醒。
釋放鎖時會根據狀態調用unparkSuccessor()方法來喚醒后續節點,這樣就會在這個方法里面把阻塞的線程喚醒并獲得鎖。
隊列鎖的好處是線程都在多個共享狀態上自旋或阻塞,所以unparkSuccessor()方法只會喚醒它后繼沒有取消的節點。
而取消只有兩種情況,中斷或者超時
?
?
?final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
獨占模式支持中斷的獲取隊列鎖操作,可以看到和不支持中斷版本的區別,這里如果parkAndCheckInterrupt()方法返回時顯示被中斷了,就拋出中斷異常
?
?
?private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
獨占模式限時獲取隊列鎖操作, 這個獲取的整體邏輯和前面的類似,區別是它支持限時操作,如果等待時間大于spinForTimeoutThreshold,就使用阻塞的方式等待,否則用自旋等待。使用了LockSupport.parkNanos()方法來實現限時地等待,并支持中斷
這里隱含的一個含義是parkNanos方法退出有3種方式,
1. 限時到了自動退出,這時候會超時
2. 沒有到限時被喚醒了,這時候是不超時的
3. 被中斷
?
?private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
long lastTime = System.nanoTime();
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
if (nanosTimeout <= 0)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
?
?
共享模式獲得隊列鎖操作,獲得操作也是從head的下一個節點開始,和獨占模式只unparkSuccessor一個節點不同,共享模式下,等head的后續節點被喚醒了,它要擴散這種共享的獲取,使用setHeadAndPropagate操作,把自己設置為head,并且把釋放的狀態往下傳遞,這里采用了鏈式喚醒的方法,1個節點負責喚醒1個后續節點,直到不能喚醒。當后繼節點是共享模式isShared,就調用doReleaseShared來喚醒后繼節點
doReleaseShared會從head開始往后檢查狀態,如果節點是SIGNAL狀態,就喚醒它的后繼節點。如果是0就標記為PROPAGATE, 等它釋放鎖的時候會再次喚醒后繼節點。
這里有個隱含的意思:
1. 加入同步隊列并阻塞的節點,它的前驅節點只會是SIGNAL,表示前驅節點釋放鎖時,后繼節點會被喚醒。shouldParkAfterFailedAcquire()方法保證了這點,如果前驅節點不是SIGNAL,它會把它修改成SIGNAL。這里不是SIGNAL就有可能是PROPAGATE
2. 造成前驅節點是PROPAGATE的情況是前驅節點獲得鎖時,會喚醒一次后繼節點,但這時候后繼節點還沒有加入到同步隊列,所以暫時把節點狀態設置為PROPAGATE,當后繼節點加入同步隊列后,會把PROPAGATE設置為SIGNAL,這樣前驅節點釋放鎖時會再次doReleaseShared,這時候它的狀態已經是SIGNAL了,就可以喚醒后續節點了
?
?
?private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
??????? Node h = head; // Record old head for check below
??????? setHead(node);
??????? /*
???????? * Try to signal next queued node if:
???????? *?? Propagation was indicated by caller,
???????? *???? or was recorded (as h.waitStatus) by a previous operation
???????? *???? (note: this uses sign-check of waitStatus because
???????? *????? PROPAGATE status may transition to SIGNAL.)
???????? * and
???????? *?? The next node is waiting in shared mode,
???????? *???? or we don't know, because it appears null
???????? *
???????? * The conservatism in both of these checks may cause
???????? * unnecessary wake-ups, but only when there are multiple
???????? * racing acquires/releases, so most need signals now or soon
???????? * anyway.
???????? */
??????? if (propagate > 0 || h == null || h.waitStatus < 0) {
??????????? Node s = node.next;
??????????? if (s == null || s.isShared())
??????????????? doReleaseShared();
??????? }
??? }
private void doReleaseShared() {
??????? for (;;) {
??????????? Node h = head;
??????????? if (h != null && h != tail) {
??????????????? int ws = h.waitStatus;
??????????????? if (ws == Node.SIGNAL) {
??????????????????? if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
??????????????????????? continue;??????????? // loop to recheck cases
??????????????????? unparkSuccessor(h);
??????????????? }
??????????????? else if (ws == 0 &&
???????????????????????? !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
??????????????????? continue;??????????????? // loop on failed CAS
??????????? }
??????????? if (h == head)?????????????????? // loop if head changed
??????????????? break;
??????? }
??? }
tryXXXX 方法,這幾個方法是給子類重寫的,用來擴展響應的同步器操作
?
?
?protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
??????? throw new UnsupportedOperationException();
??? }
protected int tryAcquireShared(int arg) {
??????? throw new UnsupportedOperationException();
??? }
protected boolean tryReleaseShared(int arg) {
??????? throw new UnsupportedOperationException();
??? }
獨占模式獲取操作的頂層方法,如果沒有tryAcquired,或者沒有獲得隊列鎖,就中斷
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
獨占模式釋放操作的頂層方法,如果tryRelease()成功,那么就喚醒后繼節點去獲取鎖
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
總結
以上是生活随笔為你收集整理的聊聊高并发(二十二)解析java.util.concurrent各个组件(四) 深入理解AQS(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 聊聊高并发(二十一)解析java.uti
- 下一篇: 聊聊高并发(二十三)解析java.uti