ReentrantLock之公平锁源码分析
本文分析的ReentrantLock所對應的Java版本為JDK8。
在閱讀本文前,讀者應該知道什么是CAS、自旋。
本文大綱
1.ReentrantLock公平鎖簡介
2.AQS
3.lock方法
4.unlock方法
1. ReentrantLock公平鎖簡介
ReentrantLock是JUC(java.util.concurrent)包中Lock接口的一個實現類,它是基于AbstractQueuedSynchronizer(下文簡稱AQS)來實現鎖的功能。ReentrantLock的內部類Sync繼承了AbstractQueuedSynchronizer,Sync又有FairSync和NonFairSync兩個子類。FairSync實現了公平鎖相關的操作,NonFairSync實現了非公平鎖相關的操作。它們之間的關系如下:
?
公平鎖的公平之處主要體現在,對于一個新來的線程,如果鎖沒有被占用,它會判斷等待隊列中是否還有其它的等待線程,如果有的話,就加入等待隊列隊尾,否則就去搶占鎖。
下面這段代碼展示了公平鎖的使用方法:
private final Lock lock = new ReentrantLock(true); // 參數true代表創建公平鎖public void method() {lock.lock(); // block until condition holdstry {// ... method body} finally {lock.unlock();} }2. AQS
下面簡單介紹一下AQS中的Node內部類和幾個重要的成員變量。
2.1 Node
AQS中,維護了一個Node內部類,用于包裝我們的線程。我們需要關注Node中的如下屬性:
- pre:當前節點的前驅節點。
- next:當前節點的后繼節點。
- thread:thread表示被包裝的線程。
- waitStatus:waitStatus是一個int整型,可以被賦予如下幾種值:
另外,當一個新的Node被創建時,waitStatus是0。
2.2 head
head指向隊列中的隊首元素,可以理解為當前持有鎖的線程。
2.3 tail
tail指向隊列中的隊尾元素。
2.4 state
state表示在ReentrantLock中可以理解為鎖的狀態,0表示當前鎖沒有被占用,大于0的數表示鎖被當前線程重入的次數。例如,當state等于2時,表示當前線程在這把鎖上進入了兩次。
2.5 exclusiveOwnerThread
表示當前占用鎖的線程。
2.6 等待隊列
下圖簡單展示了AQS中的等待隊列:
3. lock方法
有了上面的AQS的基礎知識后,我們就可以展開對ReentrantLock公平鎖的分析了,先從lock方法入手。
ReentrantLock中的lock方法很簡單,只是調用了Sync類(本文研究公平鎖,所以應該是FairSync類)中的lock方法:
public void lock() {sync.lock(); }我們跟到FairSync的lock方法,代碼也很簡單,調用了AQS中的acquire方法:
final void lock() {acquire(1); }acquire方法:
public final void acquire(int arg) {if (!tryAcquire(arg) && // 調用tryAcquire嘗試去獲取鎖,如果獲取成功,則方法結束acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 如果獲取鎖失敗,執行acquireQueued方法,將把當前線程排入隊尾 selfInterrupt(); }tryAcquire方法:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState(); // 獲取鎖的狀態if (c == 0) { // 如果狀態是0,表示鎖沒有被占用if (!hasQueuedPredecessors() && // 判斷是隊列中是否有排隊中的線程compareAndSetState(0, acquires)) { // 隊列中沒有排隊的線程,則嘗試用CAS去獲取一下鎖setExclusiveOwnerThread(current); // 獲取鎖成功,則將當前占有鎖的線程設置為當前線程return true;}}// 鎖被占用、隊列中有排隊的線程或者當前線程在獲取鎖的時候失敗將執行下面的代碼else if (current == getExclusiveOwnerThread()) { // 當前線程是否是占有鎖的線程int nextc = c + acquires; // 是的話,表示當前線程是重入這把鎖,將鎖的狀態進行加1if (nextc < 0)throw new Error("Maximum lock count exceeded"); // 鎖的重入次數超過int能夠表示最大的值,拋出異常setState(nextc); // 設置鎖的狀態return true;}return false; // 沒有獲取到鎖 }hasQueuedPredecessors方法:
public final boolean hasQueuedPredecessors() {Node t = tail;Node h = head;Node s;return h != t && // 隊列中的隊首和隊尾元素不相同((s = h.next) == null || s.thread != Thread.currentThread()); // 隊列中的第二個元素不為null,且第二個元素中的線程不是當前線程。這里如果返回true,說明隊列中至少存在tail、head兩個節點,就會執行acquireQueued將當前線程加入隊尾 }如果tryAcquire沒有獲取到鎖,將執行:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)我們先分析addWaiter方法:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode); // 將當前線程包裝成Node,mode參數值為null,表示獨占模式// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred; // 如果隊列中的尾節點不為空,將當前node的前驅節點設置為之前隊列中的tailif (compareAndSetTail(pred, node)) { // 用CAS把當前node設置為隊尾元素pred.next = node; // 成功的話,則將之前隊尾元素的后繼節點設置為當前節點。如果這里不清楚的話,請結合前面講等待隊列的那張圖進行理解。return node;}}enq(node); // 隊尾節點為空,或者用CAS設置隊尾元素失敗,則用自旋的方式入隊return node; }enq方法:
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) {if (compareAndSetHead(new Node())) // 隊尾元素為空,創建一個空的Node,并設置為隊首tail = head; // 設置隊首和隊尾為同一個空Node,進入下一次循環} else {node.prev = t; // 如果隊列中的尾節點不為空,將當前node的前驅節點設置為之前隊列中的tailif (compareAndSetTail(t, node)) { // 用CAS把當前node設置為隊尾元素t.next = node; // 成功的話,則將之前隊尾元素的后繼節點設置為當前節點return t; }}} }? 下面這張圖反應了上面enq方法的處理流程:
經過上面的方法,當前node已經加入等待隊列的隊尾,接下來將執行acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor(); // 獲取node的前驅節點if (p == head && tryAcquire(arg)) { // 如果node的前驅是head,它將去嘗試獲取鎖(tryAcquire方法在前面已經分析過)setHead(node); // 獲取成功,則將node設置為headp.next = null; // 將之前的head的后繼節點置空failed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) && // 當前node的前驅不是head,將為當前node找到一個能夠將其喚醒的前驅節點;或者當前node的前驅是head,但是獲取鎖失敗parkAndCheckInterrupt()) // 將當前線程掛起interrupted = true;}} finally {if (failed)cancelAcquire(node);} }shouldParkAfterFailedAcquire方法的作用就是找到一個能夠喚醒當前node的節點:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus; // 開始時是0if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true; // 前驅節點的狀態是-1,會喚醒后繼節點,可以將線程掛起if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev; // 前驅節點中的線程被取消,那就需要一直循環直到找到一個沒有被設置為取消狀態的前驅節點} while (pred.waitStatus > 0);pred.next = node; // 從后向前找,將第一個非取消狀態的節點,設置這個節點的后繼節點設置為當前node} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // waitStatus是0或者-3的時候,這時waitStatus都將被設置為-1// 即后繼節點需要前驅節點喚醒 }return false; // 上層代碼再進行一次循環,下次進入此方法時,將進入第一個if條件 }找到了合適的前驅節點,parkAndCheckInterrupt方法當前線程掛起:
private final boolean parkAndCheckInterrupt() { // 將線程掛起,等待前驅節點的喚醒LockSupport.park(this);return Thread.interrupted(); }?4. unlock方法
ReentrantLock的unlock方法調用AQS中的release方法:
public void unlock() {sync.release(1); // 調用AQS的release方法 }release方法:
public final boolean release(int arg) {if (tryRelease(arg)) { // 嘗試去釋放鎖Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h); // 釋放鎖成功,head不為空,并且head的waitStatus不為0的情況下,將喚醒后繼節點return true;}return false; }tryRelease方法:
protected final boolean tryRelease(int releases) {int c = getState() - releases; // 將鎖的狀態減1if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException(); // 準備釋放鎖的線程不是持有鎖的線程,拋出異常boolean free = false;if (c == 0) {free = true; // 鎖的狀態是0,說明不存在重入的情況了,可以直接釋放了setExclusiveOwnerThread(null);}setState(c);return free; }鎖釋放成功,將喚醒后繼節點,unparkSuccessor方法:
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus; // 注意,這個node是head節點if (ws < 0)compareAndSetWaitStatus(node, ws, 0); // 當前node的狀態是小于0,將其狀態設置為0/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next; // head節點的后繼節點if (s == null || s.waitStatus > 0) {s = null; // 執行到這表示head的后繼節點是1,處于取消的狀態for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t; // 從等待隊列的隊尾向前找,找到倒序的最后一個處于非取消狀態的節點 }if (s != null)LockSupport.unpark(s.thread); // 喚醒head后面的處于非取消狀態的第一個(正序)節點 }到此,全文結束,大家看代碼的時候結合圖來理解會容易很多。
轉載于:https://www.cnblogs.com/pedlar/p/10731516.html
總結
以上是生活随笔為你收集整理的ReentrantLock之公平锁源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: vue与node和npm关系
- 下一篇: 21-matlab 迷宫题