ReentrantReadWriteLock源码解析
概述
ReentrantReadWriteLock是Lock的另一種實現方式,我們已經知道了ReentrantLock是一個排他鎖,同一時間只允許一個線程訪問,而ReentrantReadWriteLock允許多個讀線程同時訪問,但不允許寫線程和讀線程、寫線程和寫線程同時訪問。相對于排他鎖,提高了并發性。在實際應用中,大部分情況下對共享數據(如緩存)的訪問都是讀操作遠多于寫操作,這時ReentrantReadWriteLock能夠提供比排他鎖更好的并發性和吞吐量。
讀寫鎖內部維護了兩個鎖,一個用于讀操作,一個用于寫操作。所有 ReadWriteLock實現都必須保證 writeLock操作的內存同步效果也要保持與相關 readLock的聯系。也就是說,成功獲取讀鎖的線程會看到寫入鎖之前版本所做的所有更新。
ReentrantReadWriteLock支持以下功能:
1)支持公平和非公平的獲取鎖的方式;
2)支持可重入。讀線程在獲取了讀鎖后還可以獲取讀鎖;寫線程在獲取了寫鎖之后既可以再次獲取寫鎖又可以獲取讀鎖;
3)還允許從寫入鎖降級為讀取鎖,其實現方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不允許的;
4)讀取鎖和寫入鎖都支持鎖獲取期間的中斷;
5)Condition支持。僅寫入鎖提供了一個 Conditon 實現;讀取鎖不支持 Conditon?,readLock().newCondition() 會拋出 UnsupportedOperationException。?
示例(JDK自帶示例)
示例1:利用重入來執行升級緩存后的鎖降級
class CachedData {
??? Object data;
??? volatile boolean cacheValid;??? //緩存是否有效
??? ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
??? void processCachedData() {
??????? rwl.readLock().lock();??? //獲取讀鎖
??????? //如果緩存無效,更新cache;否則直接使用data
??????? if (!cacheValid) {
??????????? // Must release read lock before acquiring write lock
??????????? //獲取寫鎖前須釋放讀鎖
??????????? rwl.readLock().unlock();
??????????? rwl.writeLock().lock();?? ?
??????????? // Recheck state because another thread might have acquired
??????????? //?? write lock and changed state before we did.
??????????? if (!cacheValid) {
??????????????? data = ...
??????????????? cacheValid = true;
??????????? }
??????????? // Downgrade by acquiring read lock before releasing write lock
??????????? //鎖降級,在釋放寫鎖前獲取讀鎖
??????????? rwl.readLock().lock();
??????????? rwl.writeLock().unlock(); // Unlock write, still hold read
??????? }
??????? use(data);
??????? rwl.readLock().unlock();??? //釋放讀鎖
??? }
}
示例2:使用 ReentrantReadWriteLock 來提高 Collection 的并發性
class RWDictionary {
??? private final Map<String, Data> m = new TreeMap<String, Data>();
??? private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
??? private final Lock r = rwl.readLock();??? //讀鎖
??? private final Lock w = rwl.writeLock();??? //寫鎖
??? public Data get(String key) {
??????? r.lock();
??????? try { return m.get(key); }
??????? finally { r.unlock(); }
??? }
??? public String[] allKeys() {
??????? r.lock();
??????? try { return m.keySet().toArray(); }
??????? finally { r.unlock(); }
??? }
??? public Data put(String key, Data value) {
??????? w.lock();
??????? try { return m.put(key, value); }
??????? finally { w.unlock(); }
??? }
??? public void clear() {
??????? w.lock();
??????? try { m.clear(); }
??????? finally { w.unlock(); }
??? }
}
?
源碼解析
ReentrantReadWriteLock 是基于AQS實現的(參考AbstractQueuedSynchronizer源碼解析),它的自定義同步器(繼承AQS)需要在同步狀態(一個整型變量state)上維護多個讀線程和一個寫線程的狀態,使得該狀態的設計成為讀寫鎖實現的關鍵。如果在一個整型變量上維護多種狀態,就一定需要“按位切割使用”這個變量,讀寫鎖將變量切分成了兩個部分,高16位表示讀,低16位表示寫。
public class ReentrantReadWriteLockimplements ReadWriteLock, java.io.Serializable {} { /** Inner class providing readlock */private final ReentrantReadWriteLock.ReadLock readerLock;/** Inner class providing writelock */private final ReentrantReadWriteLock.WriteLock writerLock;/** Performs all synchronization mechanics */final Sync sync; }ReadLock與WriteLock
public static class ReadLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -5992448646407690164L;private final Sync sync;protected ReadLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquireShared(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public boolean tryLock() {return sync.tryReadLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public void unlock() {sync.releaseShared(1);}public Condition newCondition() {throw new UnsupportedOperationException();}} public static class WriteLock implements Lock, java.io.Serializable {private static final long serialVersionUID = -4992448646407690164L;private final Sync sync;protected WriteLock(ReentrantReadWriteLock lock) {sync = lock.sync;}public void lock() {sync.acquire(1);}public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}public boolean tryLock( ) {return sync.tryWriteLock();}public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));}public void unlock() {sync.release(1);}public Condition newCondition() {return sync.newCondition();}public boolean isHeldByCurrentThread() {return sync.isHeldExclusively();}public int getHoldCount() {return sync.getWriteHoldCount();}}?ReadLock與WriteLock比較:
- ReadLock使用共享模式,WriteLock使用獨占模式
- ReadLock與WriteLock使用同一個Sync
?
Sync
數據結構
?? abstract static class Sync extends AbstractQueuedSynchronizer {
??????? private static final long serialVersionUID = 6317671515068378041L;
??????? /*
?? ??? ?SHARED_SHIFT相關變量與方法為鎖狀態訪問邏輯。
???????? */
?? ??? ?//低16位為寫鎖狀態,高16位為讀鎖狀態。
??????? static final int SHARED_SHIFT?? = 16;
?? ??? ?//讀鎖每次增加的單位(因為在高16位,所以加1,即相當于整個int加 1個SHARED_UNIT??? )
??????? static final int SHARED_UNIT??? = (1 << SHARED_SHIFT);
?? ??? ?//讀鎖最大數(2^16 -1 )
??????? static final int MAX_COUNT????? = (1 << SHARED_SHIFT) - 1;
?? ??? ?//寫鎖狀態掩碼(2^16-1)
??????? static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
??????? /**返回讀鎖數量 ,高16位(無符號右移16位) */
??????? static int sharedCount(int c)??? { return c >>> SHARED_SHIFT; }
??????? /** 返回寫鎖數量
? ? ? ? *EXCLUSIVE_MASK,高位全是0,與操作返回寫狀態數量
??????? */
??????? static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
??????? /**
???????? * A counter for per-thread read hold counts.
???????? * Maintained as a ThreadLocal; cached in cachedHoldCounter
???????? */
?????? //當前讀線程的計數器
??????? static final class HoldCounter {
??????????? int count = 0;??? //當前讀線程總的重入次數。
??????????? //線程ID
??????????? final long tid = getThreadId(Thread.currentThread());
??????? }
??????? /**
???????? * ThreadLocal 子類
???????? */
??????? static final class ThreadLocalHoldCounter
??????????? extends ThreadLocal<HoldCounter> {
??????????? public HoldCounter initialValue() {
??????????????? return new HoldCounter();
??????????? }
??????? }
??????? /**
???????? */
??????? private transient ThreadLocalHoldCounter readHolds;
?????? //當前線程緩存的HoldCounter ,
??????? private transient HoldCounter cachedHoldCounter;
??????? /**
???????? */
??????? private transient Thread firstReader = null;
??????? private transient int firstReaderHoldCount;
??????? Sync() {
??????????? readHolds = new ThreadLocalHoldCounter();
??????????? setState(getState()); // ensures visibility of readHolds
??????? }
}
讀鎖的獲取與釋放
讀鎖的lock和unlock的實際實現對應Sync的 tryAcquireShared 和 tryReleaseShared方法。
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();int c = getState();如果寫鎖線程數 != 0 ,且獨占鎖不是當前線程則返回失敗,因為存在鎖降級(先write,后read),if (exclusiveCount(c) != 0 && //獲取寫鎖數量getExclusiveOwnerThread() != current)return -1;int r = sharedCount(c); //獲取讀鎖數量。if (!readerShouldBlock() && //讀操作不被阻塞。r < MAX_COUNT && //讀鎖數量未到最大值compareAndSetState(c, c + SHARED_UNIT) //競爭成功) {//r == 0,表示第一個讀鎖線程,第一個讀鎖firstRead是不會加入到readHolds中if (r == 0) {firstReader = current; // 設置第一個讀線程firstReaderHoldCount = 1; // 讀線程占用的資源數為1} else if (firstReader == current) {// 當前線程為第一個讀線程,表示第一個讀鎖線程重入,則第一個線程占用資源+1firstReaderHoldCount++;} else {// 獲取計數器HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))// 計數器為空或者計數器的tid不為當前正在運行的線程的tid,則獲取當前 線程對應的計數器cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)//計數為0,則表示未加入到緩存,加入到readHolds中readHolds.set(rh);//把當前線程的計數加1rh.count++;}//返回1,則表示acquire成功,return 1;}return fullTryAcquireShared(current);}final int fullTryAcquireShared(Thread current) {/** This code is in part redundant with that in* tryAcquireShared but is simpler overall by not* complicating tryAcquireShared with interactions between* retries and lazily reading hold counts.*/HoldCounter rh = null;for (;;) {int c = getState();if (exclusiveCount(c) != 0) {//寫線程不是本線程,則返回-1if (getExclusiveOwnerThread() != current)return -1;// else we hold the exclusive lock; blocking here// would cause deadlock.} else if (readerShouldBlock()) { // 寫線程數量為0并且讀線程被阻塞// Make sure we're not acquiring read lock reentrantlyif (firstReader == current) { //第一個讀線程是本線程。// assert firstReaderHoldCount > 0;} else {//第一個讀線程不是本線程。if (rh == null) {//獲取緩存holdrh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current)) {//獲取本線程的Holdrh = readHolds.get();if (rh.count == 0)readHolds.remove(); //如果為0,則需要移出。}}if (rh.count == 0)return -1;}}if (sharedCount(c) == MAX_COUNT) // 讀鎖數量為最大值,拋出異常throw new Error("Maximum lock count exceeded");if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較并且設置成功if (sharedCount(c) == 0) { // 讀線程數量為0,設置第一個讀線程firstReader = current;firstReaderHoldCount = 1;} else if (firstReader == current) {//第一個讀線程計數加1.firstReaderHoldCount++;} else {//不是第一個線程,則獲取計數器。if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}}}?
讀鎖acquire流程圖?
注意:
更新成功后會在firstReaderHoldCount中或readHolds(ThreadLocal類型的)的本線程副本中記錄當前線程重入數(23行至43行代碼),這是為了實現jdk1.6中加入的getReadHoldCount()方法的,這個方法能獲取當前線程重入共享鎖的次數(state中記錄的是多個線程的總重入次數),加入了這個方法讓代碼復雜了不少,但是其原理還是很簡單的:如果當前只有一個線程的話,還不需要動用ThreadLocal,直接往firstReaderHoldCount這個成員變量里存重入數,當有第二個線程來的時候,就要動用ThreadLocal變量readHolds了,每個線程擁有自己的副本,用來保存自己的重入數。
?
protected final boolean tryReleaseShared(int unused) {Thread current = Thread.currentThread();if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != getThreadId(current))rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {int c = getState();int nextc = c - SHARED_UNIT;if (compareAndSetState(c, nextc))// Releasing the read lock has no effect on readers,// but it may allow waiting writers to proceed if// both read and write locks are now free.return nextc == 0;}}?讀鎖的釋放工作,主要是包括更新計數器的值(重入次數),以及更新state(總的讀鎖重入次數)
計數器:HoldCounter
在讀鎖的獲取、釋放過程中,總是會有一個對象存在著,同時該對象在獲取線程獲取讀鎖是+1,釋放讀鎖時-1,該對象就是HoldCounter。
要明白HoldCounter就要先明白讀鎖。前面提過讀鎖的內在實現機制就是共享鎖,對于共享鎖其實我們可以稍微的認為它不是一個鎖的概念,它更加像一個計數器的概念。一次共享鎖操作就相當于一次計數器的操作,獲取共享鎖計數器+1,釋放共享鎖計數器-1。只有當線程獲取共享鎖后才能對共享鎖進行釋放、重入操作。所以HoldCounter的作用就是當前線程持有共享鎖的數量,這個數量必須要與線程綁定在一起,否則操作其他線程鎖就會拋出異常。
static final class HoldCounter {int count = 0;final long tid = Thread.currentThread().getId(); }static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();} }private transient ThreadLocalHoldCounter readHolds;private transient HoldCounter cachedHoldCounter;在HoldCounter中僅有count和tid兩個變量,其中count代表著計數器,tid是線程的id。但是如果要將一個對象和線程綁定起來僅記錄tid肯定不夠的,而且HoldCounter根本不能起到綁定對象的作用,只是記錄線程tid而已。
誠然,在java中,我們知道如果要將一個線程和對象綁定在一起只有ThreadLocal才能實現 。ThreadLocalHoldCounter繼承ThreadLocal,并且重寫了initialValue方法。
故而,HoldCounter應該就是綁定線程上的一個計數器,而ThradLocalHoldCounter則是線程綁定的ThreadLocal。從上面我們可以看到ThreadLocal將HoldCounter綁定到當前線程上,同時HoldCounter也持有線程Id,這樣在釋放鎖的時候才能知道ReadWriteLock里面緩存的上一個讀取線程(cachedHoldCounter)是否是當前線程。這樣做的好處是可以減少ThreadLocal.get()的次數,因為這也是一個耗時操作。需要說明的是這樣HoldCounter綁定線程id而不綁定線程對象的原因是避免HoldCounter和ThreadLocal互相綁定而GC難以釋放它們(盡管GC能夠智能的發現這種引用而回收它們,但是這需要一定的代價),所以其實這樣做只是為了幫助GC快速回收對象而已。
寫鎖的獲取與釋放
寫鎖的lock和unlock調用的獨占式同步狀態的獲取與釋放,因此真實的實現就是Sync的 tryAcquire和 tryRelease。
protected final boolean tryAcquire(int acquires) {Thread current = Thread.currentThread();int c = getState();//寫線程數量(即獲取獨占鎖的重入數)int w = exclusiveCount(c);//當前同步狀態state != 0,說明已經有其他線程獲取了讀鎖或寫鎖if (c != 0) {// 當前state不為0,此時:如果寫鎖狀態為0說明讀鎖此時被占用返回false;// 如果寫鎖狀態不為0且寫鎖沒有被當前線程持有返回falseif (w == 0 // w==0 ,說明寫鎖為0,c!=0 說明讀鎖正在操作||current != getExclusiveOwnerThread() // w != 0 說明有寫鎖在操作,但是不是本線程,則返回false。)return false;//判斷同一線程獲取寫鎖是否超過最大次數(65535),支持可重入if (w + exclusiveCount(acquires) > MAX_COUNT)throw new Error("Maximum lock count exceeded");//此時當前線程已持有寫鎖,現在是重入,所以只需要修改鎖的數量即可。setState(c + acquires);return true;}//到這里說明此時c=0,讀鎖和寫鎖都沒有被獲取if (writerShouldBlock() //判斷寫是否阻塞||!compareAndSetState(c, c + acquires) //競爭失敗,)return false;//設置寫鎖為當前線程所有setExclusiveOwnerThread(current);return true; }更新寫鎖重入次數以及state
protected final boolean tryRelease(int releases) {//若鎖的持有者不是當前線程,拋出異常if (!isHeldExclusively())throw new IllegalMonitorStateException();//寫鎖的新線程數int nextc = getState() - releases;//如果獨占模式重入數為0了,說明獨占模式被釋放boolean free = exclusiveCount(nextc) == 0;if (free)//若寫鎖的新線程數為0,則將鎖的持有者設置為nullsetExclusiveOwnerThread(null);//設置寫鎖的新線程數//不管獨占模式是否被釋放,更新獨占重入數setState(nextc);return free; }公平鎖與非公平鎖
NonfairSync
static final class NonfairSync extends Sync {private static final long serialVersionUID = -8159625535654395037L;final boolean writerShouldBlock() {return false; // writers can always barge}final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive();}}FairSync
static final class FairSync extends Sync {private static final long serialVersionUID = -2274990926593161451L;final boolean writerShouldBlock() {return hasQueuedPredecessors();}final boolean readerShouldBlock() {return hasQueuedPredecessors();}}NonFairSync與FairSync的區別:writerShouldBlock()與readerShouldBlock()方法的實現不同
readerShouldBlock
- NonfairSync.readerShouldBlock
?????? 調用:apparentlyFirstQueuedIsExclusive,如果隊列中第一個節點是獨占式,則返回true,堵塞讀鎖。
//判斷是否:隊列中第一個等待節點是獨占模式的final boolean apparentlyFirstQueuedIsExclusive() {Node h, s;return (h = head) != null && //head節點不為空。(s = h.next) != null && //head的后置節點不為空!s.isShared() && //head的后置節點不是共享模式s.thread != null; //head的后置節點關聯了線程。}調用apparentlyFirstQueuedIsExclusive方法, 為了防止寫線程饑餓等待,如果同步隊列中的第一個線程是以獨占模式獲取鎖(寫鎖),那么當前獲取讀鎖的線程需要阻塞,讓隊列中的第一個線程先執行。
- FairSync.readerShouldBlock
?????? 調用:hasQueuedPredecessors,判斷是否是等待隊列中是否有前置節點,有則返回true
writerShouldBlock
- NonfairSync.writerShouldBlock
?????? 直接返回false,說明非公平鎖不堵塞寫鎖。
- FairSync.writerShouldBlock
?????? 調用:hasQueuedPredecessors,判斷是否是等待隊列中是否有前置節點,有則返回true
?
?
?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的ReentrantReadWriteLock源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ReentrantLock源码
- 下一篇: Java并发包--阻塞队列(Blocki