Java Review - 并发编程_独占锁ReentrantLock原理源码剖析
文章目錄
- Synchronized vs ReentrantLock
- ReentrantLock概述
- 獲取鎖
- void lock()
- 非公平鎖的實現代碼
- 非公平鎖是如何體現的?
- 公平鎖是怎么實現公平的
- void lockInterruptibly() 方法
- boolean tryLock() 方法
- boolean tryLock(long timeout, TimeUnit unit)
- 釋放鎖
- void unlock() 方法
- Demo : 使用ReentrantLock來實現一個簡單的線程安全的list
- 小結
Synchronized vs ReentrantLock
ReentrantLock概述
ReentrantLock是可重入的獨占鎖,同時只能有一個線程可以獲取該鎖,其他獲取該鎖的線程會被阻塞而被放入該鎖的AQS阻塞隊列里面。
類圖結構如下
-
底層基于AQS實現,ReentrantLock的lock等方法,委托給其依賴sync的lock方法
-
AQS 是典型的模板方法設計模式,父類(AQS)定義好骨架和內部操作細節,具體規則由子類去實現
從類圖可以看出,ReentrantLock最終還是使用AQS來實現的,并且根據參數來決定其內部是一個公平還是非公平鎖,默認是非公平鎖。
/*** Creates an instance of {@code ReentrantLock}.* This is equivalent to using {@code ReentrantLock(false)}.*/public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}其中Sync類直接繼承自AQS,它的子類NonfairSync和FairSync分別實現了獲取鎖的非公平與公平策略。
在這里,AQS的state狀態值表示線程獲取該鎖的可重入次數。
-
在默認情況下,state的值為0表示當前鎖沒有被任何線程持有。
-
當一個線程第一次獲取該鎖時會嘗試使用CAS設置state的值為1,如果CAS成功則當前線程獲取了該鎖,然后記錄該鎖的持有者為當前線程。
-
在該線程沒有釋放鎖的情況下第二次獲取該鎖后,狀態值被設置為2,這就是可重入次數。
-
在該線程釋放該鎖時,會嘗試使用CAS讓狀態值減1,如果減1后狀態值為0,則當前線程釋放該鎖。
獲取鎖
void lock()
.
當一個線程調用該方法時,說明該線程希望獲取該鎖。
-
如果鎖當前沒有被其他線程占用并且當前線程之前沒有獲取過該鎖,則當前線程會獲取到該鎖,然后設置當前鎖的擁有者為當前線程,并設置AQS的狀態值為1,然后直接返回。
-
如果當前線程之前已經獲取過該鎖,則這次只是簡單地把AQS的狀態值加1后返回。
-
如果該鎖已經被其他線程持有,則調用該方法的線程會被放入AQS隊列后阻塞掛起。
加鎖過程如下
/*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/public void lock() {sync.lock();}在如上代碼中 ,ReentrantLock的lock()委托給了sync類,根據創建ReentrantLock構造函數選擇sync的實現是NonfairSync還是FairSync,這個鎖是一個非公平鎖或者公平鎖。
非公平鎖的實現代碼
先看sync的子類NonfairSync的情況,也就是非公平鎖時。
/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {// 1 cas 設置狀態值if (compareAndSetState(0, 1))// 設置線程為獨占鎖線程setExclusiveOwnerThread(Thread.currentThread());else// 2 調用AQS的acquire方法acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}在代碼(1)中,因為默認AQS的狀態值為0,所以第一個調用Lock的線程會通過CAS設置狀態值為1,CAS成功則表示當前線程獲取到了鎖,然后setExclusiveOwnerThread設置該鎖持有者是當前線程.
如果這時候有其他線程調用lock方法企圖獲取該鎖,CAS會失敗,然后會調用AQS的acquire方法。注意,傳遞參數為1,這里再貼下AQS的acquire的核心代碼。
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.*/public final void acquire(int arg) {// 3 調用ReentrantLock重寫的tryAcquire方法if (!tryAcquire(arg) &&// tryAcquire返回false會把當前線程放入AQS阻塞隊列acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}AQS并沒有提供可用的tryAcquire方法,tryAcquire方法需要子類自己定制化,所以這里代碼(3)會調用ReentrantLock重寫的tryAcquire方法。我們先看下非公平鎖的代碼。
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);} /*** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 4 當前AQS狀態值state為0if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {// 5 當前線程是該鎖的持有者int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}-
代碼(4)會查看當前鎖的狀態值是否為0,為0則說明當前該鎖空閑,那么就嘗試CAS獲取該鎖,將AQS的狀態值從0設置為1,并設置當前鎖的持有者為當前線程然后返回,true。
-
如果當前狀態值不為0則說明該鎖已經被某個線程持有,所以代碼(5)查看當前線程是否是該鎖的持有者,如果當前線程是該鎖的持有者,則狀態值加1,然后返回true,這里需要注意,nextc<0說明可重入次數溢出了。
-
如果當前線程不是鎖的持有者則返回false,然后其會被放入AQS阻塞隊列。
非公平鎖是如何體現的?
上面看完了非公平鎖的實現代碼,回過頭來看看非公平在這里是怎么體現的。
首先非公平是說先嘗試獲取鎖的線程并不一定比后嘗試獲取鎖的線程優先獲取鎖。
這里假設線程A調用lock()方法時執行到nonfairTryAcquire的代碼(4),發現當前狀態值不為0,所以執行代碼(5),發現當前線程不是線程持有者,則執行代碼(6)返回false,然后當前線程被放入AQS阻塞隊列。
這時候線程B也調用了lock()方法執行到nonfairTryAcquire的代碼(4),發現當前狀態值為0了(假設占有該鎖的其他線程釋放了該鎖),所以通過CAS設置獲取到了該鎖。明明是線程A先請求獲取該鎖呀,這就是非公平的體現。
這里線程B在獲取鎖前并沒有查看當前AQS隊列里面是否有比自己更早請求該鎖的線程,而是使用了搶奪策略。
公平鎖是怎么實現公平的
那么下面看看公平鎖是怎么實現公平的。公平鎖的話只需要看FairSync重寫的tryAcquire方法。
/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();// 7 當前state的狀態為0 if (c == 0) {// 8 公平策略if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 9 當前線程是鎖的持有者 else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}如以上代碼所示,公平的tryAcquire策略與非公平的類似,不同之處在于,代碼(8)在設置CAS前添加了hasQueuedPredecessors方法,該方法是實現公平性的核心代碼,
public final boolean hasQueuedPredecessors() {// The correctness of this depends on head being initialized// before tail and on head.next being accurate if the current// thread is first in queue.Node t = tail; // Read fields in reverse initialization orderNode h = head;Node s;return h != t &&((s = h.next) == null || s.thread != Thread.currentThread());}在如上代碼中,如果當前線程節點有前驅節點則返回true,否則如果當前AQS隊列為空或者當前線程節點是AQS的第一個節點則返回false。
- 其中如果h==t則說明當前隊列為空,直接返回false;
- 如果h!=t并且s==null則說明有一個元素將要作為AQS的第一個節點入隊列( 回想一下 enq函數的第一個元素入隊列是兩步操作:首先創建一個哨兵頭節點,然后將第一個元素插入哨兵節點后面),那么返回true
- 如果h!=t并且s!=null和s.thread != Thread.currentThread()則說明隊列里面的第一個元素不是當前線程,那么返回true。
void lockInterruptibly() 方法
該方法與lock()方法類似,它的不同在于,它對中斷進行響應,就是當前線程在調用該方法時,如果其他線程調用了當前線程的interrupt()方法,則當前線程會拋出InterruptedException異常,然后返回。
/*** Acquires the lock unless the current thread is* {@linkplain Thread#interrupt interrupted}.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds this lock then the hold count* is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until one of two things happens:** <ul>** <li>The lock is acquired by the current thread; or** <li>Some other thread {@linkplain Thread#interrupt interrupts} the* current thread.** </ul>** <p>If the lock is acquired by the current thread then the lock hold* count is set to one.** <p>If the current thread:** <ul>** <li>has its interrupted status set on entry to this method; or** <li>is {@linkplain Thread#interrupt interrupted} while acquiring* the lock,** </ul>** then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>In this implementation, as this method is an explicit* interruption point, preference is given to responding to the* interrupt over normal or reentrant acquisition of the lock.** @throws InterruptedException if the current thread is interrupted*/public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);} /*** Acquires in exclusive mode, aborting if interrupted.* Implemented by first checking interrupt status, then invoking* at least once {@link #tryAcquire}, returning on* success. Otherwise the thread is queued, possibly repeatedly* blocking and unblocking, invoking {@link #tryAcquire}* until success or the thread is interrupted. This method can be* used to implement method {@link Lock#lockInterruptibly}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.* @throws InterruptedException if the current thread is interrupted*/public final void acquireInterruptibly(int arg)throws InterruptedException {// 若果當前線程被打斷,直接拋出異常if (Thread.interrupted())throw new InterruptedException();// 嘗試獲取資源if (!tryAcquire(arg))// 調用AQS可被中斷的方法doAcquireInterruptibly(arg);}boolean tryLock() 方法
嘗試獲取鎖,如果當前該鎖沒有被其他線程持有,則當前線程獲取該鎖并返回true,否則返回false。注意,該方法不會引起當前線程阻塞。
/*** Acquires the lock only if it is not held by another thread at the time* of invocation.** <p>Acquires the lock if it is not held by another thread and* returns immediately with the value {@code true}, setting the* lock hold count to one. Even when this lock has been set to use a* fair ordering policy, a call to {@code tryLock()} <em>will</em>* immediately acquire the lock if it is available, whether or not* other threads are currently waiting for the lock.* This "barging" behavior can be useful in certain* circumstances, even though it breaks fairness. If you want to honor* the fairness setting for this lock, then use* {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) }* which is almost equivalent (it also detects interruption).** <p>If the current thread already holds this lock then the hold* count is incremented by one and the method returns {@code true}.** <p>If the lock is held by another thread then this method will return* immediately with the value {@code false}.** @return {@code true} if the lock was free and was acquired by the* current thread, or the lock was already held by the current* thread; and {@code false} otherwise*/public boolean tryLock() {return sync.nonfairTryAcquire(1);} /*** Performs non-fair tryLock. tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}如上代碼與非公平鎖的tryAcquire()方法代碼類似,所以tryLock()使用的是非公平策略。
boolean tryLock(long timeout, TimeUnit unit)
嘗試獲取鎖,與tryLock()的不同之處在于,它設置了超時時間,如果超時時間到沒有獲取到該鎖則返回false。
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {// 調用AQS的tryAcquireNanosreturn sync.tryAcquireNanos(1, unit.toNanos(timeout));}釋放鎖
void unlock() 方法
-
嘗試釋放鎖,如果當前線程持有該鎖,則調用該方法會讓該線程對該線程持有的AQS狀態值減1,如果減去1后當前狀態值為0,則當前線程會釋放該鎖,否則僅僅減1而已。
-
如果當前線程沒有持有該鎖而調用了該方法則會拋出IllegalMonitorStateException異常
- 代碼(11)所示,如果當前線程不是該鎖持有者則直接拋出異常
- 否則查看狀態值是否為0,為0則說明當前線程要放棄對該鎖的持有權,則執行代碼(12)把當前鎖持有者設置為null。
- 如果狀態值不為0,則僅僅讓當前線程對該鎖的可重入次數減1。
Demo : 使用ReentrantLock來實現一個簡單的線程安全的list
import java.util.ArrayList; import java.util.concurrent.locks.ReentrantLock;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/4 22:05* @mark: show me the code , change the world*/ public class ReentrantLockList {//線程不安全的Listprivate ArrayList<String> list = new ArrayList<String>();//獨占鎖,默認是非公平鎖,傳入true可以是公平鎖private volatile ReentrantLock lock = new ReentrantLock();//往集合中添加元素public void add(String str) {lock.lock();try {list.add(str);} finally {lock.unlock();}}//刪除集合中的元素public void remove(String str) {lock.lock();try {list.remove(str);} finally {lock.unlock();}}//根據索引獲取集合中某個元素public String get(int index) {lock.lock();try {return list.get(index);} finally {lock.unlock();}} }如上代碼通過在操作array元素前進行加鎖保證同一時間只有一個線程可以對array數組進行修改,但是也只能有一個線程對array元素進行訪問。
同樣最后使用圖 來加深理解。
假如線程Thread1、Thread2和Thread3同時嘗試獲取獨占鎖ReentrantLock,假設Thread1獲取到了,則Thread2和Thread3就會被轉換為Node節點并被放入ReentrantLock對應的AQS阻塞隊列,而后被阻塞掛起。 如上圖。
假設Thread1獲取鎖后調用了對應的鎖創建的條件變量1,那么Thread1就會釋放獲取到的鎖,然后當前線程就會被轉換為Node節點插入條件變量1的條件隊列。
由于Thread1釋放了鎖,所以阻塞到AQS隊列里面的Thread2和Thread3就有機會獲取到該鎖,假如使用的是公平策略,那么這時候Thread2會獲取到該鎖,從而從AQS隊列里面移除Thread2對應的Node節點。 如下圖
小結
我們梳理了ReentrantLock的實現原理,ReentrantLock的底層是使用AQS實現的可重入獨占鎖。
在這里AQS狀態State值為0表示當前鎖空閑,為大于等于1的值則說明該鎖已經被占用。該鎖內部有公平與非公平實現,默認情況下是非公平的實現。
另外,由于該鎖是獨占鎖,所以某時只有一個線程可以獲取該鎖。
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Java Review - 并发编程_独占锁ReentrantLock原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_L
- 下一篇: Java Review - 并发编程_读