Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理源码剖析
文章目錄
- ReentrantLock VS ReentrantReadWriteLock
- 類圖結(jié)構(gòu)
- 非公平的讀寫鎖實現(xiàn)
- 寫鎖的獲取與釋放
- void lock()
- void lockInterruptibly()
- boolean tryLock()
- boolean tryLock(long timeout, TimeUnit unit)
- void unlock()
- 讀鎖的獲取與釋放
- void lock()
- void lockInterruptibly()
- boolean tryLock()
- boolean tryLock(long timeout, TimeUnit unit)
- void unlock()
- Demo
- 小結(jié)
ReentrantLock VS ReentrantReadWriteLock
解決線程安全問題使用ReentrantLock就可以,但是ReentrantLock是獨占鎖,某時只有一個線程可以獲取該鎖,而實際中會有寫少讀多的場景,顯然ReentrantLock滿足不了這個需求,所以ReentrantReadWriteLock應(yīng)運而生。ReentrantReadWriteLock采用讀寫分離的策略,允許多個線程可以同時獲取讀鎖。
類圖結(jié)構(gòu)
為了了解ReentrantReadWriteLock的內(nèi)部構(gòu)造,我們先看下它的類圖結(jié)構(gòu)
讀寫鎖的內(nèi)部維護了一個ReadLock和一個WriteLock,它們依賴Sync實現(xiàn)具體功能。
Sync繼承自AQS,并且也提供了公平和非公平的實現(xiàn)。
非公平的讀寫鎖實現(xiàn)
下面只介紹非公平的讀寫鎖實現(xiàn)。
我們知道AQS中只維護了一個state狀態(tài),而ReentrantReadWriteLock則需要維護讀狀態(tài)和寫狀態(tài),一個state怎么表示寫和讀兩種狀態(tài)呢?
ReentrantReadWriteLock巧妙地使用state的高16位表示讀狀態(tài),也就是獲取到讀鎖的次數(shù);使用低16位表示獲取到寫鎖的線程的可重入次數(shù)。
/** Read vs write count extraction constants and functions.* Lock state is logically divided into two unsigned shorts:* The lower one representing the exclusive (writer) lock hold count,* and the upper the shared (reader) hold count.*/static final int SHARED_SHIFT = 16;//共享鎖(讀鎖)狀態(tài)單位值65536static final int SHARED_UNIT = (1 << SHARED_SHIFT);//共享鎖線程最大個數(shù)65535static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;//排它鎖(寫鎖)掩碼,二進制,15個1static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;// 返回讀鎖線程數(shù)/** Returns the number of shared holds represented in count */static int sharedCount(int c) { return c >>> SHARED_SHIFT; }// 返回寫鎖線程數(shù)/** Returns the number of exclusive holds represented in count */static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }Sync中的
- firstReader用來記錄第一個獲取到讀鎖的線程
- firstReaderHoldCount則記錄第一個獲取到讀鎖的線程獲取讀鎖的可重入次數(shù)。
- cachedHoldCounter用來記錄最后一個獲取讀鎖的線程獲取讀鎖的可重入次數(shù)
readHolds是ThreadLocal變量,用來存放除去第一個獲取讀鎖線程外的其他線程獲取讀鎖的可重入次數(shù)。
ThreadLocalHoldCounter繼承了ThreadLocal,因而initialValue方法返回一個HoldCounter對象。
/*** ThreadLocal subclass. Easiest to explicitly define for sake* of deserialization mechanics.*/static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}寫鎖的獲取與釋放
在ReentrantReadWriteLock中寫鎖使用WriteLock來實現(xiàn)。
void lock()
寫鎖是個獨占鎖,某時只有一個線程可以獲取該鎖。
- 如果當(dāng)前沒有線程獲取到讀鎖和寫鎖,則當(dāng)前線程可以獲取到寫鎖然后返回。
- 如果當(dāng)前已經(jīng)有線程獲取到讀鎖和寫鎖,則當(dāng)前請求寫鎖的線程會被阻塞掛起。
- 另外,寫鎖是可重入鎖,如果當(dāng)前線程已經(jīng)獲取了該鎖,再次獲取只是簡單地把可重入次數(shù)加1后直接返回。
如以上代碼所示,在 lock()內(nèi)部調(diào)用了AQS的acquire方法,其中tryAcquire是ReentrantReadWriteLock內(nèi)部的sync類重寫的,代碼如下。
protected final boolean tryAcquire(int acquires) {/** Walkthrough:* 1. If read count nonzero or write count nonzero* and owner is a different thread, fail.* 2. If count would saturate, fail. (This can only* happen if count is already nonzero.)* 3. Otherwise, this thread is eligible for lock if* it is either a reentrant acquire or* queue policy allows it. If so, update state* and set owner.*/Thread current = Thread.currentThread();int c = getState();int w = exclusiveCount(c);// 1 c!=0 說明讀鎖或者寫鎖已經(jīng)被其他線程獲取 if (c != 0) {// (Note: if c != 0 and w == 0 then shared count != 0)// 2 w = 0 說明有線程已經(jīng)獲取了寫鎖,w!=0 且 當(dāng)前線程不是寫鎖的擁有者,返回falseif (w == 0 || current != getExclusiveOwnerThread())return false;// 3 說明當(dāng)前線程已經(jīng)獲取到了寫鎖,判斷可重入次數(shù)if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");// Reentrant acquire// 設(shè)置可重入次數(shù)(1 )setState(c + acquires);return true;}// 5 第一個寫線程獲取寫鎖 if (writerShouldBlock() ||!compareAndSetState(c, c + acquires))return false;setExclusiveOwnerThread(current);return true;}-
代碼(1)中,如果當(dāng)前AQS狀態(tài)值不為0則說明當(dāng)前已經(jīng)有線程獲取到了讀鎖或者寫鎖。
-
代碼(2)中,如果w==0說明狀態(tài)值的低16位為0,而AQS狀態(tài)值不為0,則說明高16位不為0,這暗示已經(jīng)有線程獲取了讀鎖,所以直接返回false 。 如果w!=0則說明當(dāng)前已經(jīng)有線程獲取了該寫鎖,再看當(dāng)前線程是不是該鎖的持有者,如果不是則返回false。
-
執(zhí)行到代碼(3)說明當(dāng)前線程之前已經(jīng)獲取到了該鎖,所以判斷該線程的可重入次數(shù)是不是超過了最大值,是則拋出異常,否則執(zhí)行代碼 (4)增加當(dāng)前線程的可重入次數(shù),然后返回true.
-
如果AQS的狀態(tài)值等于0則說明目前沒有線程獲取到讀鎖和寫鎖,所以執(zhí)行代碼(5)。其中,對于writerShouldBlock方法,
非公平鎖的實現(xiàn)為
final boolean writerShouldBlock() {return false; // writers can always barge}如果代碼對于非公平鎖來說總是返回false,則說明代碼(5)搶占式執(zhí)行CAS嘗試獲取寫鎖,獲取成功則設(shè)置當(dāng)前鎖的持有者為當(dāng)前線程并返回true,否則返回false。
公平鎖的實現(xiàn)為
final boolean writerShouldBlock() {return hasQueuedPredecessors();}這里還是使用hasQueuedPredecessors來判斷當(dāng)前線程節(jié)點是否有前驅(qū)節(jié)點,如果有則當(dāng)前線程放棄獲取寫鎖的權(quán)限,直接返回false。
void lockInterruptibly()
類似于lock()方法,它的不同之處在于,它會對中斷進行響應(yīng),也就是當(dāng)其他線程調(diào)用了該線程的interrupt()方法中斷了當(dāng)前線程時,當(dāng)前線程會拋出異常InterruptedException異常。
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}boolean tryLock()
嘗試獲取寫鎖,如果當(dāng)前沒有其他線程持有寫鎖或者讀鎖,則當(dāng)前線程獲取寫鎖會成功,然后返回true。 如果當(dāng)前已經(jīng)有其他線程持有寫鎖或者讀鎖則該方法直接返回false,且當(dāng)前線程并不會被阻塞。 如果當(dāng)前線程已經(jīng)持有了該寫鎖則簡單增加AQS的狀態(tài)值后直接返回true。
public boolean tryLock( ) {return sync.tryWriteLock();} /*** Performs tryLock for write, enabling barging in both modes.* This is identical in effect to tryAcquire except for lack* of calls to writerShouldBlock.*/final boolean tryWriteLock() {Thread current = Thread.currentThread();int c = getState();if (c != 0) {int w = exclusiveCount(c);if (w == 0 || current != getExclusiveOwnerThread())return false;if (w == MAX_COUNT)throw new Error("Maximum lock count exceeded");}if (!compareAndSetState(c, c + 1))return false;setExclusiveOwnerThread(current);return true;}如上代碼與tryAcquire方法類似,不同在于這里使用的是非公平策略。
boolean tryLock(long timeout, TimeUnit unit)
與tryAcquire()的不同之處在于,多了超時時間參數(shù),如果嘗試獲取寫鎖失敗則會把當(dāng)前線程掛起指定時間,待超時時間到后當(dāng)前線程被激活,如果還是沒有獲取到寫鎖則返回false。另外,該方法會對中斷進行響應(yīng),也就是當(dāng)其他線程調(diào)用了該線程的interrupt()方法中斷了當(dāng)前線程時,當(dāng)前線程會拋出InterruptedException異常。
public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}void unlock()
嘗試釋放鎖,如果當(dāng)前線程持有該鎖,調(diào)用該方法會讓該線程對該線程持有的AQS狀態(tài)值減1,如果減去1后當(dāng)前狀態(tài)值為0則當(dāng)前線程會釋放該鎖,否則僅僅減1而已。如果當(dāng)前線程沒有持有該鎖而調(diào)用了該方法則會拋出IllegalMonitorStateException異常
public void unlock() {sync.release(1);} public final boolean release(int arg) {// 調(diào)用ReentrantReadWriteLock#Sync重寫的tryReleaseif (tryRelease(arg)) {// 激活阻塞隊列里面的一個線程Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;} protected final boolean tryRelease(int releases) {// 6 看是否是擁有寫鎖的線程調(diào)用的unLockif (!isHeldExclusively())throw new IllegalMonitorStateException();// 7 獲取可重入值,沒有考慮高16位,因為獲取寫鎖時讀鎖的值肯定為0int nextc = getState() - releases;boolean free = exclusiveCount(nextc) == 0;// 如果可重入鎖值為0則釋放鎖,否則只是簡單的更新狀態(tài)值 if (free)setExclusiveOwnerThread(null);setState(nextc);return free;}-
tryRelease首先通過isHeldExclusively判斷是否當(dāng)前線程是該寫鎖的持有者,如果不是則拋出異常
-
否則執(zhí)行代碼(7),這說明當(dāng)前線程持有寫鎖,持有寫鎖說明狀態(tài)值的高16位為0,所以這里nextc值就是當(dāng)前線程寫鎖的剩余可重入次數(shù)。
-
代碼(8)判斷當(dāng)前可重入次數(shù)是否為0,如果free為true則說明可重入次數(shù)為0,所以當(dāng)前線程會釋放寫鎖,將當(dāng)前鎖的持有者設(shè)置為null。如果free為false則簡單地更新可重入次數(shù)。
讀鎖的獲取與釋放
ReentrantReadWriteLock中的讀鎖是使用ReadLock來實現(xiàn)的。
void lock()
獲取讀鎖,如果當(dāng)前沒有其他線程持有寫鎖,則當(dāng)前線程可以獲取讀鎖,AQS的狀態(tài)值state的高16位的值會增加1,然后方法返回。否則如果其他一個線程持有寫鎖,則當(dāng)前線程會被阻塞。
/*** Acquires the read lock.** <p>Acquires the read lock if the write lock is not held by* another thread and returns immediately.** <p>If the write lock is held by another thread then* the current thread becomes disabled for thread scheduling* purposes and lies dormant until the read lock has been acquired.*/public void lock() {sync.acquireShared(1);} public final void acquireShared(int arg) {// 調(diào)用ReentrantReadWriteLock中syn的tryAcquireSharedif (tryAcquireShared(arg) < 0)// 調(diào)用AQS的doAcquireShareddoAcquireShared(arg);} protected final int tryAcquireShared(int unused) {/** Walkthrough:* 1. If write lock held by another thread, fail.* 2. Otherwise, this thread is eligible for* lock wrt state, so ask if it should block* because of queue policy. If not, try* to grant by CASing state and updating count.* Note that step does not check for reentrant* acquires, which is postponed to full version* to avoid having to check hold count in* the more typical non-reentrant case.* 3. If step 2 fails either because thread* apparently not eligible or CAS fails or count* saturated, chain to version with full retry loop.*/Thread current = Thread.currentThread();// 1 獲取當(dāng)前狀態(tài)值int c = getState();// 2 判斷是否寫鎖被占用if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 3 獲取讀鎖計數(shù)int r = sharedCount(c);// 4 嘗試獲取鎖,多個讀線程只有一個會成功,不成功的進入fullTryAcquireShared進行重試if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 5 第一個線程獲取到鎖if (r == 0) {firstReader = current;firstReaderHoldCount = 1;// 6 如果當(dāng)前線程是第一個獲取讀鎖的線程} else if (firstReader == current) {firstReaderHoldCount++;} else {// 7 記錄最后一個獲取讀鎖的線程或記錄其他線程讀鎖的可重入次數(shù)HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;}return 1;}// 8 類似tryAcquireShared,但是自旋獲取return fullTryAcquireShared(current);}- 首先獲取了當(dāng)前AQS的狀態(tài)值
- 然后代碼(2)查看是否有其他線程獲取到了寫鎖,如果是則直接返回-1,而后調(diào)用AQS的doAcquireShared方法把當(dāng)前線程放入AQS阻塞隊列。
如果當(dāng)前要獲取讀鎖的線程已經(jīng)持有了寫鎖,則也可以獲取讀鎖。但是需要注意,當(dāng)一個線程先獲取了寫鎖,然后獲取了讀鎖處理事情完畢后,要記得把讀鎖和寫鎖都釋放掉,不能只釋放寫鎖。
- 否則執(zhí)行代碼(3),得到獲取到的讀鎖的個數(shù),到這里說明目前沒有線程獲取到寫鎖,但是可能有線程持有讀鎖,然后執(zhí)行代碼(4)
其中非公平鎖的readerShouldBlock實現(xiàn)代碼如下
final boolean readerShouldBlock() {/* As a heuristic to avoid indefinite writer starvation,* block if the thread that momentarily appears to be head* of queue, if one exists, is a waiting writer. This is* only a probabilistic effect since a new reader will not* block if there is a waiting writer behind other enabled* readers that have not yet drained from the queue.*/return apparentlyFirstQueuedIsExclusive();}} final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null &&(s = h.next) != null &&!s.isShared() &&s.thread != null;}如上代碼的作用是,如果隊列里面存在一個元素,則判斷第一個元素是不是正在嘗試獲取寫鎖,如果不是,則當(dāng)前線程判斷當(dāng)前獲取讀鎖的線程是否達(dá)到了最大值。最后執(zhí)行CAS操作將AQS狀態(tài)值的高16位值增加1。
-
代碼(5)(6)記錄第一個獲取讀鎖的線程并統(tǒng)計該線程獲取讀鎖的可重入數(shù)。
-
代碼(7)使用cachedHoldCounter記錄最后一個獲取到讀鎖的線程和該線程獲取讀鎖的可重入數(shù),readHolds記錄了當(dāng)前線程獲取讀鎖的可重入數(shù)。
-
如果readerShouldBlock返回true則說明有線程正在獲取寫鎖,所以執(zhí)行代碼(8)。
-
fullTryAcquireShared的代碼與tryAcquireShared類似,它們的不同之處在于,前者通過循環(huán)自旋獲取。
void lockInterruptibly()
類似于lock()方法,不同之處在于,該方法會對中斷進行響應(yīng),也就是當(dāng)其他線程調(diào)用了該線程的interrupt()方法中斷了當(dāng)前線程時,當(dāng)前線程會拋出InterruptedException異常。
boolean tryLock()
-
嘗試獲取讀鎖,如果當(dāng)前沒有其他線程持有寫鎖,則當(dāng)前線程獲取讀鎖會成功,然后返回true 。
-
如果當(dāng)前已經(jīng)有其他線程持有寫鎖則該方法直接返回false,但當(dāng)前線程并不會被阻塞。
-
如果當(dāng)前線程已經(jīng)持有了該讀鎖則簡單增加AQS的狀態(tài)值高16位后直接返回true。
其代碼類似tryLock的代碼,這里不再講述。
boolean tryLock(long timeout, TimeUnit unit)
與tryLock()的不同之處在于,多了超時時間參數(shù),如果嘗試獲取讀鎖失敗則會把當(dāng)前線程掛起指定時間,待超時時間到后當(dāng)前線程被激活,如果此時還沒有獲取到讀鎖則返回false。
另外,該方法對中斷響應(yīng),也就是當(dāng)其他線程調(diào)用了該線程的interrupt()方法中斷了當(dāng)前線程時,當(dāng)前線程會拋出InterruptedException異常。
void unlock()
public void unlock() {sync.releaseShared(1);}如上代碼具體釋放鎖的操作是委托給Sync類來做的,sync.releaseShared方法的代碼如下:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;} protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}// 循環(huán)直到自己的讀計數(shù)器為-1 ,CAS更新成功 for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}如以上代碼所示,在無限循環(huán)里面,首先獲取當(dāng)前AQS狀態(tài)值并將其保存到變量c,然后變量c被減去一個讀計數(shù)單位后使用CAS操作更新AQS狀態(tài)值,如果更新成功則查看當(dāng)前AQS狀態(tài)值是否為0,為0則說明當(dāng)前已經(jīng)沒有讀線程占用讀鎖,則tryReleaseShared返回true。
然后會調(diào)用doReleaseShared方法釋放一個由于獲取寫鎖而被阻塞的線程,如果當(dāng)前AQS狀態(tài)值不為0,則說明當(dāng)前還有其他線程持有了讀鎖,所以tryReleaseShared返回false。
如果tryReleaseShared中的CAS更新AQS狀態(tài)值失敗,則自旋重試直到成功。
Demo
Java Review - 并發(fā)編程_獨占鎖ReentrantLock原理&源碼剖析
介紹了如何使用ReentrantLock實現(xiàn)線程安全的list,但是由于ReentrantLock是獨占鎖,所以在讀多寫少的情況下性能很差。下面使用ReentrantReadWriteLock來改造它,代碼如下
以上代碼調(diào)用get方法時使用的是讀鎖,這樣運行多個讀線程來同時訪問list的元素,這在讀多寫少的情況下性能會更好。
小結(jié)
我們這里介紹了讀寫鎖ReentrantReadWriteLock的原理,它的底層是使用AQS實現(xiàn)的。ReentrantReadWriteLock巧妙地使用AQS的狀態(tài)值的高16位表示獲取到讀鎖的個數(shù),低16位表示獲取寫鎖的線程的可重入次數(shù),并通過CAS對其進行操作實現(xiàn)了讀寫分離,這在讀多寫少的場景下比較適用。
總結(jié)
以上是生活随笔為你收集整理的Java Review - 并发编程_读写锁ReentrantReadWriteLock的原理源码剖析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_独
- 下一篇: Java Review - 并发编程_抽