从源码理解ReentrantLock
在了解ReenTrantLock之前,我們首先需要理解一下為什么出現了這個工具。我們知道java官方給了我們一個Synchronized工具,但是在1.6之前,如果需要加鎖的話就需要調用Linux系統的metex()函數,也就是切換到內核態,從用戶態切換到內核態是比較費時間的一個事情。所以Doug Lea就寫了ReentrantLock這個函數,不過其實java.util.concurrent這個包大部分都是他寫的。這個函數可以在JVM的層面上去解決并發沖突的問題,而不用切換到內核態。不過隨著JAVA版本的升級Synchronized關鍵字的性能也得到了很大的提升,在線程沖突比較嚴重的時候,反而Synchronized也會擁有不錯的性能。
如果要理解ReenTrantLock,我們必須對它和AQS的結構有一個大概的認識:
ReenTrantLock:
ReenTrantLock有公平鎖和非公平鎖兩種,在代碼中它是這樣實現的:
private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer{....}static final class NonfairSync extends Sync{....} static final class FairSync extends Sync{....}public ReentrantLock() {sync = new NonfairSync();} public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}public void lock() {sync.lock();} public void unlock() {sync.release(1);}從這幾個類和變量中可以看出,首先在ReenTrantLock中是使用sync來操作的,而根據構造參數的不同,選擇不同的具體實現類。
AbstractQueuedSynchronizer:
AbstractQueuedSynchronizer中有幾個主要的內容:
private transient volatile Node head; private transient volatile Node tail; private volatile int state; static final class Node{ volatile Node prev;volatile Node next;volatile Thread thread;int ws; }他們分別代表了隊頭,隊尾,鎖狀態(加鎖狀態則為1,重入+1,解鎖狀態則為0),Node中的ws意為waitStatus,是一個狀態標識;ws是一個過渡狀態,在不同方法里面判斷ws的狀態做不同的處理。他們看起來像這樣:
如果要分析ReentrantLock的話,我們來帶入場景,以公平鎖為例。首先只有一個線程t1,它調用了ReentrantLock對象的lock方法,這個時候我們來看看發生了什么:
1.首先調用sync的lock方法,
public void lock() {sync.lock();}2.sync的lock方法如下
final void lock() {acquire(1); }3.acquire()是AQS實現的方法:
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }acquire的意思類似于將,AQS中的state變為傳入的參數。tryAcquire函數將嘗試去修改,如果修改失敗則執行入隊操作(acquireQueued()),tryAcquire()是AQS要求子類實現的方法,我們看看公平鎖中是如何實現的:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }首先我們直接查看state狀態,這個時候只有t1來進行lock,所以state為0,hasQueuedPredecessors方法判斷隊列是否被初始化,如果沒有初始化顯然不需要排隊,我們可以來看看它的源碼:
public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread()); }如果隊列沒有被初始化過,tail=head=null,hasQueuedPredecessors返回false。之后tryAcquire將使用CAS設置state為傳入的參數,如果設置成功,則將AQS中的持有鎖線程設置為本線程并返回true,那么acquire也將得以返回,之后lock正常執行。這就是只有一個線程或者多個線程交替執行的情況,所以你可以看到在線程沖突不嚴重的情況下,lock的過程對性能影響非常小。如果是1.6之前版本的Synchronized每次加鎖都要進入內核態,對性能影響非常大。現在我們來總結一下這第一個獲得鎖的線程都干了什么,首先設置了state,并將AQS中的持有鎖線程設置為本線程,AQS中的隊列依舊沒有初始化。
如果t1沒有釋放鎖,這個時候有來了t2,再來看看lock將如何執行(公平鎖):
從上面的代碼可以看到,一直到tryAcquire()都是和t1一樣的,但是在tryAcquire()中將會執行:
else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true; }它判斷持有鎖線程是否是當前線程,如果是則將state設置為state+acquires,這也體現了ReentrantLock是可重入鎖。如果不是tryAcquire返回false;
這個時候再來看acquire將會執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg));
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt(); }它首先執行addWaiter:
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node; }他首先實例化一個node,這個時候tail和head都是null,所以pred為null,執行enq(node):
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}} }這個時候tail還是為null的,所以t == null;之后執行compareAndSetHead(new Node())將Head指向一個空節點,并且將tail也指向這個空節點。他看起來像這樣:
完成之后再次循環,這一次 t != null執行else內容,之后變成:
請務必記住一個原則在AQS的隊列中第一個節點的值永遠是null,之后enq返回,addWaiter返回,執行acquire中的acquireQueued():
這個函數將會取出node的前一個節點賦值給p,如果p==head,則表示當前節點是第一個排隊的線程,他會再次嘗試獲取鎖,這個時候,如果t1還是沒有釋放鎖,則獲取鎖失敗,t2執行shouldParkAfterFailedAcquire,注意看這個函數的名字,可能停止在獲取鎖失敗之后。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;if (ws == Node.SIGNAL)return true;if (ws > 0) {do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false; }當前節點的前一個節點的waitStatus被賦值給ws,它至始至終都沒有被賦值,所以為初始化的時候的值0,而Node.SIGNAL為-1;所以執行compareAndSetWaitStatus(pred, ws, Node.SIGNAL);它把前一個節點的waitStatus設置為-1;之后返回false;回到acquireQueued,再次執行其中的for(;;)這個線程又一次去獲取鎖,如果又獲取失敗,再次進入shouldParkAfterFailedAcquire這一次返回的是true。所以回到acquireQueued之后執行:parkAndCheckInterrupt();
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }LockSupport.park(this);將會阻塞,直到被喚醒,繼續執行。
現在總結一下,t2獲取AQS狀態值,發現不等于0,則直接入隊,這時候隊里有一個值為空的節點,和值為t2的節點。之后t2兩次嘗試去獲取鎖,失敗后被park;
這個時候又來了一個t3,直到addWaiter()之前t3的執行邏輯和t2一樣,在addWaiter()中pred != null;執行其中的:
if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;} }將t3直接入隊,返回執行acquireQueued:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }這個時候t3節點的前一個節點不等于head。直接執行shouldParkAfterFailedAcquire,由于前一個t2節點的waitStatus也從來都沒有被設置過,所以初始值為0,所以執行shouldParkAfterFailedAcquire函數中的compareAndSetWaitStatus(pred, ws, Node.SIGNAL);并返回false,再次循環,這次返回true,進入休眠。
這里需要注意waitStatus的狀態,可以想象,在AQS的隊列中,第一個值為空節點的waitStatus = -1;t2節點的waitStatus = -1 ;t3節點的waitStatus = 0;
這里我們來復習一下t3入隊做了什么,獲取鎖失敗,直接入隊,然后設置前面一個節點waitStatus = -1,休眠。
但是以上是t1一直持有鎖的情況,下面我們來看看在t2,t3獲取鎖的時候,t1釋放了鎖的情況,讓我們回到tryAcquire代碼:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }無論t1是否釋放鎖,其他線程想要獲得鎖,必須執行tryAcquire,如果持有鎖線程釋放了鎖,那么c == 0,我們會去執行hasQueuedPredecessors:
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread()); }之前只有t2調用lock的時候,我們才會去執行hasQueuedPredecessors,那個時候隊列還沒有被初始化,直接返回false,但是現在不同了,任何情況,只要c == 0,就有可能執行hasQueuedPredecessors。
我們先來回憶一下,當什么時候tryAcquire代碼會被調用呢?首先是每一個線程執行lock函數的時候,調用1次。第一個需要排隊的線程入隊后需要調用2次,第一個排隊的線程被喚醒的時候(持有鎖的線程unlock的時候,unpack第一個在排隊的線程,第一個排隊的線程得以被喚醒),繼續調用tryAcquire,關于這一點你可以回憶一下下面這個函數:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }第一個排隊的線程在parkAndCheckInterrupt())這里被阻塞,喚醒后繼續執行 if (p == head && tryAcquire(arg))中的tryAcquire(arg)。
每次調用tryAcquire(arg)的時候,都有可能鎖被釋放,也就是c == 0從而進入hasQueuedPredecessors,從注釋中我們可以看到,這個方法返回false,在隊列沒有被初始化或者當前線程是隊列中第一個在等待的線程的情況下。Predecessors意味前驅,他實際上是判斷有沒有線程比當前線程等待的久的線程。注釋中也給出了這個解釋:
Queries whether any threads have been waiting to acquire longer than the current thread.實際上你也可想象的到,這個函數是為公平鎖所設計的,非公平鎖看到鎖是釋放的,直接加鎖,根本不管前面有沒有線程等的比它久,你可在ReentrantLock中看到非公平鎖的TryAcquire實現如下,印證了這個說法:
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false; }最后我想說說ReentrantLock中的interrupted
在acquireQueued函數中有很多interrupted:
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }不過在了解interrupted之前,我們先來看看interrupted怎么用,
- interrupt():中斷當前線程,實際上它沒有任何動作,只是設置一下一個線程標記
- interrupted():查詢當前線程中斷狀態,中斷狀態則返回true,否則返回false,并且清除這個標記,也就是無論之前是什么,現在置為false;
在線程執行parkAndCheckInterrupt()休眠的過程中,被中斷,醒來后Thread.interrupted()返回為true,并且將線程中斷置為false,注意這個置為false的行為,這意味著用戶設置的線程中斷狀態將不再生效,如下:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted(); }返回后acquireQueued函數中的interrupted被置為true,線程獲取鎖后將返回給acquire這個true,acquire將調用selfInterrupt()函數。將線程中斷標志位再次設置為true。
static void selfInterrupt() {Thread.currentThread().interrupt(); }到現在看來,你肯定很迷惑如果parkAndCheckInterrupt中不調用Thread.interrupted()不就可以沒有其余的這些步驟了嗎了嗎?這也是迷惑很多人的一點,但是如果你知道lock的lockInterruptibly();函數你就會知道Doug Lea為什么這樣做,我們來對比一下lockInterruptibly()和lock()調用流程的區別:
lock() -> sync.lock() -> acquire() -> acquireQueued() -> parkAndCheckInterrupt()
lockInterruptibly() -> sync.acquireInterruptibly() -> doAcquireInterruptibly() -> parkAndCheckInterrupt()
可以看到最后這兩個方法都調用了 parkAndCheckInterrupt()這個方法,而在lockInterruptibly()的調用中parkAndCheckInterrupt()是有意義的,
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();你可以看到,在doAcquireInterruptibly中如果parkAndCheckInterrupt顯示線程在隊列等待的過程中被中斷過,則會拋出InterruptedException();只是在lock()中parkAndCheckInterrupt返回值是沒有意義的,反而有副作用(將用戶設置的中斷狀態清除)。所以現在你也知道了,如果在執行lock()和lockInterruptibly() 過程中如果線程被中斷了會如何。
以上就是ReentrantLock的部分內容,如果讀者有興趣,可以繼續分析ReentrantLock()和AQS源碼中的內容,相信你一定會敬佩Doug Lea這位大神強大的編碼技術。
參考內容:https://www.bilibili.com/video/av67194367?p=3
總結
以上是生活随笔為你收集整理的从源码理解ReentrantLock的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot与消息
- 下一篇: InnoDB多版本控制实现