java 可重入读写锁 ReentrantReadWriteLock 详解
詳見:http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt206
讀寫鎖 ReadWriteLock讀寫鎖維護了一對相關的鎖,一個用于只讀操作,一個用于寫入操作。只要沒有writer,讀取鎖可以由多個reader線程同時保持。寫入鎖是獨占的。
互斥鎖一次只允許一個線程訪問共享數(shù)據(jù),哪怕進行的是只讀操作;讀寫鎖允許對共享數(shù)據(jù)進行更高級別的并發(fā)訪問:對于寫操作,一次只有一個線程(write線程)可以修改共享數(shù)據(jù),對于讀操作,允許任意數(shù)量的線程同時進行讀取。
與互斥鎖相比,使用讀寫鎖能否提升性能則取決于讀寫操作期間讀取數(shù)據(jù)相對于修改數(shù)據(jù)的頻率,以及數(shù)據(jù)的爭用——即在同一時間試圖對該數(shù)據(jù)執(zhí)行讀取或寫入操作的線程數(shù)。
讀寫鎖適用于讀多寫少的情況。
可重入讀寫鎖 ReentrantReadWriteLock
屬性ReentrantReadWriteLock 也是基于 AbstractQueuedSynchronizer 實現(xiàn)的,它具有下面這些屬性(來自Java doc文檔):
???? * 獲取順序:此類不會將讀取者優(yōu)先或寫入者優(yōu)先強加給鎖訪問的排序。
????????? * 非公平模式(默認):連續(xù)競爭的非公平鎖可能無限期地推遲一個或多個reader或writer線程,但吞吐量通常要高于公平鎖。
????????? * 公平模式:線程利用一個近似到達順序的策略來爭奪進入。當釋放當前保持的鎖時,可以為等待時間最長的單個writer線程分配寫入鎖,如果有一組等待時間大于所有正在等待的writer線程的reader,將為該組分配讀者鎖。
????????? * 試圖獲得公平寫入鎖的非重入的線程將會阻塞,除非讀取鎖和寫入鎖都自由(這意味著沒有等待線程)。
???? * 重入:此鎖允許reader和writer按照 ReentrantLock 的樣式重新獲取讀取鎖或寫入鎖。在寫入線程保持的所有寫入鎖都已經(jīng)釋放后,才允許重入reader使用讀取鎖。
writer可以獲取讀取鎖,但reader不能獲取寫入鎖。
???? * 鎖降級:重入還允許從寫入鎖降級為讀取鎖,實現(xiàn)方式是:先獲取寫入鎖,然后獲取讀取鎖,最后釋放寫入鎖。但是,從讀取鎖升級到寫入鎖是不可能的。
???? * 鎖獲取的中斷:讀取鎖和寫入鎖都支持鎖獲取期間的中斷。
???? * Condition 支持:寫入鎖提供了一個 Condition 實現(xiàn),對于寫入鎖來說,該實現(xiàn)的行為與 ReentrantLock.newCondition() 提供的Condition 實現(xiàn)對 ReentrantLock 所做的行為相同。當然,此 Condition 只能用于寫入鎖。
讀取鎖不支持 Condition,readLock().newCondition() 會拋出 UnsupportedOperationException。
???? * 監(jiān)測:此類支持一些確定是讀取鎖還是寫入鎖的方法。這些方法設計用于監(jiān)視系統(tǒng)狀態(tài),而不是同步控制。
實現(xiàn)AQS 回顧在之前的文章已經(jīng)提到,AQS以單個 int 類型的原子變量來表示其狀態(tài),定義了4個抽象方法( tryAcquire(int)、tryRelease(int)、tryAcquireShared(int)、tryReleaseShared(int),前兩個方法用于獨占/排他模式,后兩個用于共享模式 )留給子類實現(xiàn),用于自定義同步器的行為以實現(xiàn)特定的功能。
對于 ReentrantLock,它是可重入的獨占鎖,內(nèi)部的 Sync 類實現(xiàn)了 tryAcquire(int)、tryRelease(int) 方法,并用狀態(tài)的值來表示重入次數(shù),加鎖或重入鎖時狀態(tài)加 1,釋放鎖時狀態(tài)減 1,狀態(tài)值等于 0 表示鎖空閑。
對于 CountDownLatch,它是一個關卡,在條件滿足前阻塞所有等待線程,條件滿足后允許所有線程通過。內(nèi)部類 Sync 把狀態(tài)初始化為大于 0 的某個值,當狀態(tài)大于 0 時所有wait線程阻塞,每調(diào)用一次 countDown 方法就把狀態(tài)值減 1,減為 0 時允許所有線程通過。利用了AQS的共享模式。
現(xiàn)在,要用AQS來實現(xiàn) ReentrantReadWriteLock。
一點思考問題
???? * AQS只有一個狀態(tài),那么如何表示 多個讀鎖 與 單個寫鎖 呢?
???? * ReentrantLock 里,狀態(tài)值表示重入計數(shù),現(xiàn)在如何在AQS里表示每個讀鎖、寫鎖的重入次數(shù)呢?
???? * 如何實現(xiàn)讀鎖、寫鎖的公平性呢?
一點提示
???? * 一個狀態(tài)是沒法既表示讀鎖,又表示寫鎖的,不夠用啊,那就辦成兩份用了,客家話說一個飯粒咬成兩半吃,狀態(tài)的高位部分表示讀鎖,低位表示寫鎖,由于寫鎖只有一個,所以寫鎖的重入計數(shù)也解決了,這也會導致寫鎖可重入的次數(shù)減小。
???? * 由于讀鎖可以同時有多個,肯定不能再用辦成兩份用的方法來處理了,但我們有 ThreadLocal,可以把線程重入讀鎖的次數(shù)作為值存在 ThreadLocal 里。
???? * 對于公平性的實現(xiàn),可以通過AQS的等待隊列和它的抽象方法來控制,在狀態(tài)值的另一半里存儲當前持有讀鎖的線程數(shù)。如果讀線程申請讀鎖,當前寫鎖重入次數(shù)不為 0 時,則等待,否則可以馬上分配;如果是寫線程申請寫鎖,當前狀態(tài)為 0 則可以馬上分配,否則等待。
源碼分析現(xiàn)在來看看具體的實現(xiàn)源碼。
辦成兩份AQS 的狀態(tài)是32位(int 類型)的,辦成兩份,讀鎖用高16位,表示持有讀鎖的線程數(shù)(sharedCount),寫鎖低16位,表示寫鎖的重入次數(shù) (exclusiveCount)。狀態(tài)值為 0 表示鎖空閑,sharedCount不為 0 表示分配了讀鎖,exclusiveCount 不為 0 表示分配了寫鎖,sharedCount和exclusiveCount 肯定不會同時不為 0。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{ ?? ???????static?final?int?SHARED_SHIFT???=?16; ???????//?由于讀鎖用高位部分,所以讀鎖個數(shù)加1,其實是狀態(tài)值加?2^16 ???????static?final?int?SHARED_UNIT????=?(1?<<?SHARED_SHIFT); ???????//?寫鎖的可重入的最大次數(shù)、讀鎖允許的最大數(shù)量 ???????static?final?int?MAX_COUNT??????=?(1?<<?SHARED_SHIFT)?-?1; ???????//?寫鎖的掩碼,用于狀態(tài)的低16位有效值 ???????static?final?int?EXCLUSIVE_MASK?=?(1?<<?SHARED_SHIFT)?-?1; ???????//?讀鎖計數(shù),當前持有讀鎖的線程數(shù) ????static?int?sharedCount(int?c)????{?return?c?>>>?SHARED_SHIFT;?} ????//?寫鎖的計數(shù),也就是它的重入次數(shù) ????static?int?exclusiveCount(int?c)?{?return?c?&?EXCLUSIVE_MASK;?} } |
讀鎖重入計數(shù)
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | abstract?static?class?Sync?extends?AbstractQueuedSynchronizer?{ ????/** ?????*?每個線程特定的?read?持有計數(shù)。存放在ThreadLocal,不需要是線程安全的。 ?????*/ ????static?final?class?HoldCounter?{ ????????int?count?=?0; ????????//?使用id而不是引用是為了避免保留垃圾。注意這是個常量。 ????????final?long?tid?=?Thread.currentThread().getId(); ????} ????/** ?????*?采用繼承是為了重寫?initialValue?方法,這樣就不用進行這樣的處理: ?????*?如果ThreadLocal沒有當前線程的計數(shù),則new一個,再放進ThreadLocal里。 ?????*?可以直接調(diào)用?get。 ?????*?*/ ????static?final?class?ThreadLocalHoldCounter ????????extends?ThreadLocal<HoldCounter>?{ ????????public?HoldCounter?initialValue()?{ ????????????return?new?HoldCounter(); ????????} ????} ????/** ?????*?保存當前線程重入讀鎖的次數(shù)的容器。在讀鎖重入次數(shù)為?0?時移除。 ?????*/ ????private?transient?ThreadLocalHoldCounter?readHolds; ????/** ?????*?最近一個成功獲取讀鎖的線程的計數(shù)。這省卻了ThreadLocal查找, ?????*?通常情況下,下一個釋放線程是最后一個獲取線程。這不是?volatile?的, ?????*?因為它僅用于試探的,線程進行緩存也是可以的 ?????*?(因為判斷是否是當前線程是通過線程id來比較的)。 ?????*/ ????private?transient?HoldCounter?cachedHoldCounter; ????/** ?????*?firstReader是這樣一個特殊線程:它是最后一個把?共享計數(shù)?從?0?改為?1?的 ?????*?(在鎖空閑的時候),而且從那之后還沒有釋放讀鎖的。如果不存在則為null。 ?????*?firstReaderHoldCount?是?firstReader?的重入計數(shù)。 ?????* ?????*?firstReader?不能導致保留垃圾,因此在?tryReleaseShared?里設置為null, ?????*?除非線程異常終止,沒有釋放讀鎖。 ?????* ?????*?作用是在跟蹤無競爭的讀鎖計數(shù)時非常便宜。 ?????* ?????*?firstReader及其計數(shù)firstReaderHoldCount是不會放入?readHolds?的。 ?????*/ ????private?transient?Thread?firstReader?=?null; ????private?transient?int?firstReaderHoldCount; ????Sync()?{ ????????readHolds?=?new?ThreadLocalHoldCounter(); ????????setState(getState());?//?確保?readHolds?的內(nèi)存可見性,利用?volatile?寫的內(nèi)存語義。 ????} } |
寫鎖獲取與釋放寫鎖的獲取與釋放通過 tryAcquire 和 tryRelease 方法實現(xiàn),源碼文件里有這么一段說明:tryRelease 和 tryAcquire 可能被Conditions 調(diào)用。因此可能出現(xiàn)參數(shù)里包含在條件等待和用 tryAcquire 重新獲取到鎖的期間內(nèi)已經(jīng)釋放的 讀和寫 計數(shù)。
這說明看起來像是在 tryAcquire 里設置狀態(tài)時要考慮方法參數(shù)(acquires)的高位部分,其實是不需要的。由于寫鎖是獨占的,acquires 表示的只能是寫鎖的計數(shù),如果當前線程成功獲取寫鎖,只需要簡單地把當前狀態(tài)加上 acquires 的值即可,tryRelease 里直接減去其參數(shù)值即可。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | protected?final?boolean?tryAcquire(int?acquires)?{ ????Thread?current?=?Thread.currentThread(); ????int?c?=?getState(); ????int?w?=?exclusiveCount(c); ????if?(c?!=?0)?{?//?狀態(tài)不為0,表示鎖被分配出去了。 ????????//?(Note:?if?c?!=?0?and?w?==?0?then?shared?count?!=?0) ??????//?c?!=?0?and?w?==?0?表示分配了讀鎖 ??????//?w?!=?0?&&?current?!=?getExclusiveOwnerThread()?表示其他線程獲取了寫鎖。 ????????if?(w?==?0?||?current?!=?getExclusiveOwnerThread()) ????????????return?false?; ????????//?寫鎖重入 ????????//?檢測是否超過最大重入次數(shù)。 ????????if?(w?+?exclusiveCount(acquires)?>?MAX_COUNT) ????????????throw?new?Error("Maximum?lock?count?exceeded"); ????????//?更新寫鎖重入次數(shù),寫鎖在低位,直接加上?acquire?即可。 ????????//?Reentrant?acquire ????????setState(c?+?acquires); ????????return?true?; ????} ????//?writerShouldBlock?留給子類實現(xiàn),用于實現(xiàn)公平性策略。 ????//?如果允許獲取寫鎖,則用?CAS?更新狀態(tài)。 ????if?(writerShouldBlock()?|| ????????!compareAndSetState(c,?c?+?acquires)) ????????return?false?;?//?不允許獲取鎖?或?CAS?失敗。 ????//?獲取寫鎖超過,設置獨占線程。 ????setExclusiveOwnerThread(current); ????return?true; } protected?final?boolean?tryRelease(int?releases)?{ ????if?(!isHeldExclusively())?//?是否是當前線程持有寫鎖 ????????throw?new?IllegalMonitorStateException(); ????//?這里不考慮高16位是因為高16位肯定是?0。 ????int?nextc?=?getState()?-?releases; ????boolean?free?=?exclusiveCount(nextc)?==?0; ????if?(free) ????????setExclusiveOwnerThread(?null);?//?寫鎖完全釋放,設置獨占線程為null。 ????setState(nextc); ????return?free; } |
讀鎖獲取與釋放// 參數(shù)變?yōu)?unused 是因為讀鎖的重入計數(shù)是內(nèi)部維護的。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | protected?final?int?tryAcquireShared(int?unused)?{ ????Thread?current?=?Thread.currentThread(); ????int?c?=?getState(); ????//?這個if語句是說:持有寫鎖的線程可以獲取讀鎖。 ????if?(exclusiveCount(c)?!=?0?&&?//?已分配了寫鎖 ????????getExclusiveOwnerThread()?!=?current)?//?且當前線程不是持有寫鎖的線程 ????????return?-1; ????int?r?=?sharedCount(c);?//?取讀鎖計數(shù) ????if?(!readerShouldBlock()?&&?//?由子類根據(jù)其公平策略決定是否允許獲取讀鎖 ????????r?<?MAX_COUNT?&&???????????//?讀鎖數(shù)量還沒達到最大值 ????????//?嘗試獲取讀鎖。注意讀線程計數(shù)的單位是??2^16 ????????compareAndSetState(c,?c?+?SHARED_UNIT))?{ ?????????//?成功獲取讀鎖 ?????//?注意下面對firstReader的處理:firstReader是不會放到readHolds里的 ?????//?這樣,在讀鎖只有一個的情況下,就避免了查找readHolds。 ????????if?(r?==?0)?{?//?是?firstReader,計數(shù)不會放入??readHolds。 ????????????firstReader?=?current; ????????????firstReaderHoldCount?=?1; ????????}?else?if?(firstReader?==?current)?{?//?firstReader?重入 ????????????firstReaderHoldCount++; ????????}?else?{ ?????????????//?非?firstReader?讀鎖重入計數(shù)更新 ????????????HoldCounter?rh?=?cachedHoldCounter;?//?首先訪問緩存 ????????????if?(rh?==?null?||?rh.tid?!=?current.getId()) ????????????????cachedHoldCounter?=?rh?=?readHolds.get(); ????????????else?if?(rh.count?==?0) ????????????????readHolds.set(rh); ????????????rh.count++; ????????} ????????return?1; ????} ????//?獲取讀鎖失敗,放到循環(huán)里重試。 ????return?fullTryAcquireShared(current); } final?int?fullTryAcquireShared(Thread?current)?{ ????HoldCounter?rh?=?null; ????for?(;;)?{ ????????int?c?=?getState(); ????????if?(exclusiveCount(c)?!=?0)?{ ????????????if?(getExclusiveOwnerThread()?!=?current) ???????????//?寫鎖被分配,非寫鎖線程獲取讀鎖,失敗 ????????????????return?-1; ????????????//?否則,當前線程持有寫鎖,在這里阻塞將導致死鎖。 ????????}?else?if?(readerShouldBlock())?{ ????????????//?寫鎖空閑??且??公平策略決定?線程應當被阻塞 ????????????//?下面的處理是說,如果是已獲取讀鎖的線程重入讀鎖時, ????????????//?即使公平策略指示應當阻塞也不會阻塞。 ????????????//?否則,這也會導致死鎖的。 ????????????if?(firstReader?==?current)?{ ????????????????//?assert?firstReaderHoldCount?>?0; ????????????}?else?{ ????????????????if?(rh?==?null)?{ ????????????????????rh?=?cachedHoldCounter; ????????????????????if?(rh?==?null?||?rh.tid?!=?current.getId())?{ ????????????????????????rh?=?readHolds.get(); ????????????????????????if?(rh.count?==?0) ????????????????????????????readHolds.remove(); ????????????????????} ????????????????} ????????????????//?需要阻塞且是非重入(還未獲取讀鎖的),獲取失敗。 ????????????????if?(rh.count?==?0) ????????????????????return?-1; ????????????} ????????} ????????//?寫鎖空閑??且??公平策略決定線程可以獲取讀鎖 ????????if?(sharedCount(c)?==?MAX_COUNT)?//?讀鎖數(shù)量達到最多 ????????????throw?new?Error(?"Maximum?lock?count?exceeded"); ????????if?(compareAndSetState(c,?c?+?SHARED_UNIT))?{ ????????????//?申請讀鎖成功,下面的處理跟tryAcquireShared是類似的。 ????????????if?(sharedCount(c)?==?0)?{ ????????????????firstReader?=?current; ????????????????firstReaderHoldCount?=?1; ????????????}?else?if?(firstReader?==?current)?{ ????????????????firstReaderHoldCount++; ????????????}?else?{ ???????????//?設定最后一次獲取讀鎖的緩存 ????????????????if?(rh?==?null) ????????????????????rh?=?cachedHoldCounter; ????????????????if?(rh?==?null?||?rh.tid?!=?current.getId()) ????????????????????rh?=?readHolds.get(); ????????????????else?if?(rh.count?==?0) ????????????????????readHolds.set(rh); ????????????????rh.count++; ????????????????cachedHoldCounter?=?rh;?//?緩存起來用于釋放 ????????????} ????????????return?1; ????????} ????} } protected?final?boolean?tryReleaseShared(int?unused)?{ ????Thread?current?=?Thread.currentThread(); ????//?清理firstReader緩存?或?readHolds里的重入計數(shù) ????if?(firstReader?==?current)?{ ????????//?assert?firstReaderHoldCount?>?0; ????????if?(firstReaderHoldCount?==?1) ????????????firstReader?=?null; ????????else ????????????firstReaderHoldCount--; ????}?else?{ ????????HoldCounter?rh?=?cachedHoldCounter; ????????if?(rh?==?null?||?rh.tid?!=?current.getId()) ????????????rh?=?readHolds.get(); ????????int?count?=?rh.count; ????????if?(count?<=?1)?{ ????????????//?完全釋放讀鎖 ????????????readHolds.remove(); ????????????if?(count?<=?0) ????????????????throw?unmatchedUnlockException(); ????????} ????????--rh.count;?//?主要用于重入退出 ????} ????//?循環(huán)在CAS更新狀態(tài)值,主要是把讀鎖數(shù)量減?1 ????for?(;;)?{ ????????int?c?=?getState(); ????????int?nextc?=?c?-?SHARED_UNIT; ????????if?(compareAndSetState(c,?nextc)) ????????????//?釋放讀鎖對其他讀線程沒有任何影響, ????????????//?但可以允許等待的寫線程繼續(xù),如果讀鎖、寫鎖都空閑。 ????????????return?nextc?==?0; ????} } 公平性策略公平與非公平策略是由?Sync?的子類?FairSync?和?NonfairSync?實現(xiàn)的。 /** *?這個非公平策略的同步器是寫鎖優(yōu)先的,申請寫鎖時總是不阻塞。 */ static?final?class?NonfairSync?extends?Sync?{ ????private?static?final?long?serialVersionUID?=?-8159625535654395037L; ????final?boolean?writerShouldBlock()?{ ????????return?false;?//?寫線程總是可以突入 ????} ????final?boolean?readerShouldBlock()?{ ????????/*?作為一個啟發(fā)用于避免寫線程饑餓,如果線程臨時出現(xiàn)在等待隊列的頭部則阻塞, ?????????*?如果存在這樣的,則是寫線程。 ?????????*/ ????????return?apparentlyFirstQueuedIsExclusive(); ????} } /** *?公平的?Sync,它的策略是:如果線程準備獲取鎖時, *?同步隊列里有等待線程,則阻塞獲取鎖,不管是否是重入 *?這也就需要tryAcqire、tryAcquireShared方法進行處理。 */ static?final?class?FairSync?extends?Sync?{ ????private?static?final?long?serialVersionUID?=?-2274990926593161451L; ????final?boolean?writerShouldBlock()?{ ????????return?hasQueuedPredecessors(); ????} ????final?boolean?readerShouldBlock()?{ ????????return?hasQueuedPredecessors(); ????} } |
現(xiàn)在用奇數(shù)表示申請讀鎖的讀線程,偶數(shù)表示申請寫鎖的寫線程,每個數(shù)都表示一個不同的線程,存在下面這樣的申請隊列,假設開始時鎖空閑:
1? 3? 5? 0? 7? 9? 2? 4
讀線程1申請讀鎖時,鎖是空閑的,馬上分配,讀線程3、5申請時,由于已分配讀鎖,它們也可以馬上獲取讀鎖。
假設此時有線程11申請讀鎖,由于它不是讀鎖重入,只能等待。而線程1再次申請讀鎖是可以的,因為它的重入。
寫線程0申請寫鎖時,由于分配了讀鎖,只能等待,當讀線程1、3、5都釋放讀鎖后,線程0可以獲取寫鎖。
線程0釋放后,線程7、9獲取讀鎖,它們釋放后,線程2獲取寫鎖,此時線程4必須等待線程2釋放。
線程4在線程2釋放寫鎖后獲取寫鎖,它釋放寫鎖后,鎖恢復空閑。
轉載于:https://www.cnblogs.com/grefr/p/6094922.html
總結
以上是生活随笔為你收集整理的java 可重入读写锁 ReentrantReadWriteLock 详解的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [原创软件]体验组批量加分工具
- 下一篇: php 跨二级域名 设置cookie