JUC锁-ReentrantReadWrite(五)
ReadWriteLock 和 ReentrantReadWrite介紹
ReadWriteLock,顧名思義,是讀寫鎖。
它維護了一對相關的鎖 — — “讀取鎖”和“寫入鎖”,一個用于讀取操作,另一個用于寫入操作。
“讀取鎖”用于只讀操作,它是“共享鎖”,能同時被多個線程獲取。
“寫入鎖”用于寫入操作,它是“獨占鎖”,寫入鎖只能被一個線程鎖獲取。
這是它的函數列表:
// 返回用于讀取操作的鎖。 Lock readLock() // 返回用于寫入操作的鎖。 Lock writeLock()而對于ReentrantReadWriteLock,是否覺得ReentrantReadWriteLock會實現Lock接口呢?
答案是否定的,ReentrantReadWriterLock采用組合的方式,采用兩個內部類實現Lock接口,分別是ReadLock,WriterLock類。
與ReentrantLock一樣,ReentrantReadWriterLock同樣使用自己的內部類Sync(繼承AbstractQueuedSynchronizer)實現CLH算法,同時讓sysn可以指向它的子類FairSync和NonFaireSync。
這是它的uml圖:
ReentrantReadWrite的屬性
為了更好地對讀寫鎖機制的了解和下面源碼的解讀,先介紹一下它的組件Sync的成員:
static final int SHARED_SHIFT = 16; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** Returns the number of shared holds represented in count */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }首先ReentrantReadWriterLock使用一個32位的int類型來表示鎖被占用的線程數(ReentrantLock中的state),用所以,采取的辦法是,高16位用來表示讀鎖占有的線程數量,用低16位表示寫鎖被同一個線程申請的次數。
- SHARED_SHIFT,表示讀鎖占用的位數,常量16。
- SHARED_UNIT, 增加一個讀鎖,按照上述設計,就相當于增加 SHARED_UNIT;
- MAX_COUNT ,表示申請讀鎖最大的線程數量,為65535
EXCLUSIVE_MASK :表示計算寫鎖的具體值時,該值為 15個1,用 getState & EXCLUSIVE_MASK算出寫鎖的線程數,大于1表示重入。
static int sharedCount(int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
舉例說明,比如,現在當前,申請讀鎖的線程數為13個,寫鎖一個,那state怎么表示?上文說過,用一個32位的int類型的高16位表示讀鎖線程數,13的二進制為 1101,那state的二進制表示為
00000000 00001101 00000000 00000001,十進制數為851969。
sharedCount方法獲取的是共享鎖,也就是讀鎖,只需要將state 無符號向左移位16位置,得出00000000 00001101,就出13。
exclusiveCount根據851969要算成低16位置,只需要用該00000000 00001101 00000000 00000001 & 111111111111111(15位),就可以得出00000001。
這幾個參數看懂了,下面來幾個有意思的成員:
private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null; private transient int firstReaderHoldCount;ThreadLocalHoldCounter和HoldCounter是什么?
/*** A counter for per-thread read hold counts.* Maintained as a ThreadLocal; cached in cachedHoldCounter*/static final class HoldCounter {int count = 0;// Use id, not reference, to avoid garbage retentionfinal long tid = getThreadId(Thread.currentThread());}/*** ThreadLocal subclass. Easiest to explicitly define for sake* of deserialization mechanics.*/static final class ThreadLocalHoldCounterextends ThreadLocal<HoldCounter> {public HoldCounter initialValue() {return new HoldCounter();}}HoldCounter存著讀鎖的數量。而ThreadLocalHoldCounter繼承了線程本地類ThreadLocal,持有一個HoldCounter對象,也就是說ThreadLocalHoldCounter持有當前線程的讀鎖的數量。
firstReader與firstReadHoldCount保存第一個獲取讀鎖的線程,也就是readHolds中并不會保存第一個獲取讀鎖的線程;cachedHoldCounter 緩存的是最后一個獲取線程的HolderCount信息,該變量主要是在如果當前線程多次獲取讀鎖時,減少從readHolds中獲取HoldCounter的次數。為什么要把第一個讀的線程的信息和最后一個的信息單獨拿出來呢,這是一種優化方法,緩存使用。看到后面就清楚了。
獲取共享鎖的過程
獲取共享鎖的思想(即lock函數的步驟),是先通過tryAcquireShared()嘗試獲取共享鎖。嘗試成功的話,則直接返回;嘗試失敗的話,則通過doAcquireShared()不斷的循環并嘗試獲取鎖,若有需要,則阻塞等待。doAcquireShared()在循環中每次嘗試獲取鎖時,都是通過tryAcquireShared()來進行嘗試的。下面看看“獲取共享鎖”的詳細流程。
1.lock方法
lock()在ReadLock中,切記共享鎖在ReadLock中,WriteLock持有的是獨占鎖。
public void lock() {sync.acquireShared(1); }2.acquireShared方法
Sync繼承于AQS,acquireShared()定義在AQS中。源碼如下:
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg); }與此類似地,我們可以對比一下同在AQS的獨占鎖的acquire()
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}acquireShared()首先會通過tryAcquireShared()來嘗試獲取鎖。
- 嘗試成功的話,則不再做任何動作(因為已經成功獲取到鎖了)。
- 嘗試失敗的話,則通過doAcquireShared()來獲取鎖。doAcquireShared()會獲取到鎖了才返回。
3. tryAcquire
tryAcquire定義在AQS中,由Sync類重寫了:
protected final int tryAcquireShared(int unused) {Thread current = Thread.currentThread();// 獲取“鎖”的狀態int c = getState();// 如果“鎖”是“寫鎖”,并且獲取鎖的線程不是current線程;則返回-1。if (exclusiveCount(c) != 0 &&getExclusiveOwnerThread() != current)return -1;// 獲取“讀取鎖”的共享計數int r = sharedCount(c);// 如果“不需要阻塞等待”,并且“讀取鎖”的共享計數小于MAX_COUNT;// 則通過CAS函數更新“鎖的狀態”,將“讀取鎖”的共享計數+1。if (!readerShouldBlock() &&r < MAX_COUNT &&compareAndSetState(c, c + SHARED_UNIT)) {// 第1次獲取“讀取鎖”。if (r == 0) { firstReader = current;firstReaderHoldCount = 1;// 如果想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程} else if (firstReader == current) { firstReaderHoldCount++;} else {// 獲得最后一個獲取讀線程的信息HoldCounter rh = cachedHoldCounter;//如果不存在或者不是當前線程,就從readHolds獲得當前線程信息 //加一并添加到緩存cachedHoldCounter中if (rh == null || rh.tid != current.getId())cachedHoldCounter = rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);// 將該線程獲取“讀取鎖”的次數+1。rh.count++;}return 1;}return fullTryAcquireShared(current); }說明:tryAcquireShared()的作用是嘗試獲取“共享鎖”。
如果在嘗試獲取鎖時,“不需要阻塞等待”并且“讀取鎖的共享計數小于MAX_COUNT”,則直接通過CAS函數更新“讀取鎖的共享計數”,然后更新下面的Sync類的成員:
從代碼中,我們可以清楚地看到下面的三個都是在為readHolds服務,如果沒有這三個,那么獲取當前線程的讀鎖數量的操作只能是readHolds.get(),所以通過這三個成員相當于優化,緩存。
獲取不到鎖,就要用fullTryAcquireShared獲得。
4. fullTryAcquireShared()
final int fullTryAcquireShared(Thread current) {HoldCounter rh = null;for (;;) {// 獲取“鎖”的狀態int c = getState();// 如果“鎖”是“互斥鎖”,并且獲取鎖的線程不是current線程;則返回-1。if (exclusiveCount(c) != 0) {if (getExclusiveOwnerThread() != current)return -1;// 如果“需要阻塞等待”。// (01) 當“需要阻塞等待”的線程是第1個獲取鎖的線程的話,則繼續往下執行。// (02) 當“需要阻塞等待”的線程獲取鎖的次數=0時,則返回-1。} else if (readerShouldBlock()) {// 如果想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程if (firstReader == current) {} else {if (rh == null) {rh = cachedHoldCounter;if (rh == null || rh.tid != current.getId()) {rh = readHolds.get();if (rh.count == 0)readHolds.remove();}}// 如果當前線程獲取鎖的計數=0,則返回-1。if (rh.count == 0)return -1;}}// 如果“不需要阻塞等待”,則獲取“讀取鎖”的共享統計數;// 如果共享統計數超過MAX_COUNT,則拋出異常。if (sharedCount(c) == MAX_COUNT)throw new Error("Maximum lock count exceeded");// 將線程獲取“讀取鎖”的次數+1。if (compareAndSetState(c, c + SHARED_UNIT)) {// 如果是第1次獲取“讀取鎖”,則更新firstReader和firstReaderHoldCount。if (sharedCount(c) == 0) {firstReader = current;firstReaderHoldCount = 1;// 如果想要獲取鎖的線程(current)是第1個獲取鎖(firstReader)的線程,// 則將firstReaderHoldCount+1。} else if (firstReader == current) {firstReaderHoldCount++;} else {if (rh == null)rh = cachedHoldCounter;if (rh == null || rh.tid != current.getId())rh = readHolds.get();else if (rh.count == 0)readHolds.set(rh);// 更新線程的獲取“讀取鎖”的共享計數rh.count++;cachedHoldCounter = rh; // cache for release}return 1;}} }說明:fullTryAcquireShared()會根據“是否需要阻塞等待”,“讀取鎖的共享計數是否超過限制”等等進行處理。如果不需要阻塞等待,并且鎖的共享計數沒有超過限制,則通過CAS嘗試獲取鎖,并返回1。
doAcquireShared方法
private void doAcquireShared(int arg) {// addWaiter(Node.SHARED)的作用是,創建“當前線程”對應的節點,并將該線程添加到CLH隊列中。final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {// 獲取“node”的前一節點final Node p = node.predecessor();// 如果“當前線程”是CLH隊列的表頭,則嘗試獲取共享鎖。if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}// 如果“當前線程”不是CLH隊列的表頭,則通過shouldParkAfterFailedAcquire()判斷是否需要等待,// 需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。若阻塞等待過程中,線程被中斷過,則設置interrupted為true。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);} }doAcquireShared()的作用是獲取共享鎖。
它會首先創建線程對應的CLH隊列的節點,然后將該節點添加到CLH隊列中。CLH隊列是管理獲取鎖的等待線程的隊列。
如果“當前線程”是CLH隊列的表頭,則嘗試獲取共享鎖;否則,則需要通過shouldParkAfterFailedAcquire()判斷是否阻塞等待,需要的話,則通過parkAndCheckInterrupt()進行阻塞等待。
doAcquireShared()會通過for循環,不斷的進行上面的操作;目的就是獲取共享鎖。
需要注意的是:doAcquireShared()在每一次嘗試獲取鎖時,是通過tryAcquireShared()來執行的!
釋放共享鎖的過程
釋放共享鎖的思想,是先通過tryReleaseShared()嘗試釋放共享鎖。嘗試成功的話,則通過doReleaseShared()喚醒“其他等待獲取共享鎖的線程”,并返回true;否則的話,返回flase。
1. unlock()
public void unlock() {sync.releaseShared(1); }2. releaseShared()
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false; }說明:releaseShared()的目的是讓當前線程釋放它所持有的共享鎖。
它首先會通過tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過doReleaseShared()去釋放共享鎖。這個過程實際上跟lock的操作相似。
3. tryReleaseShared()
protected final boolean tryReleaseShared(int unused) {// 獲取當前線程,即釋放共享鎖的線程。Thread current = Thread.currentThread();// 如果想要釋放鎖的線程(current)是第1個獲取鎖(firstReader)的線程,// 并且“第1個獲取鎖的線程獲取鎖的次數”=1,則設置firstReader為null;// 否則,將“第1個獲取鎖的線程的獲取次數”-1。if (firstReader == current) {// assert firstReaderHoldCount > 0;if (firstReaderHoldCount == 1)firstReader = null;elsefirstReaderHoldCount--;// 獲取rh對象,并更新“當前線程獲取鎖的信息”。} else {HoldCounter rh = cachedHoldCounter;if (rh == null || rh.tid != current.getId())rh = readHolds.get();int count = rh.count;if (count <= 1) {readHolds.remove();if (count <= 0)throw unmatchedUnlockException();}--rh.count;}for (;;) {// 獲取鎖的狀態int c = getState();// 將鎖的獲取次數-1。int nextc = c - SHARED_UNIT;// 通過CAS更新鎖的狀態。if (compareAndSetState(c, nextc))return nextc == 0;} }4. doReleaseShared()
private void doReleaseShared() {for (;;) {// 獲取CLH隊列的頭節點Node h = head;// 如果頭節點不為null,并且頭節點不等于tail節點。if (h != null && h != tail) {// 獲取頭節點對應的線程的狀態int ws = h.waitStatus;// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。if (ws == Node.SIGNAL) {// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒“頭節點的下一個節點所對應的線程”。unparkSuccessor(h);}// 如果頭節點對應的線程是空狀態,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}// 如果頭節點發生變化,則繼續循環。否則,退出循環。if (h == head) // loop if head changedbreak;} }doReleaseShared()會釋放“共享鎖”。它會從前往后的遍歷CLH隊列,依次“喚醒”然后“執行”隊列中每個節點對應的線程;最終的目的是讓這些線程釋放它們所持有的鎖。
公平共享鎖和非公平共享鎖
與互斥鎖一樣ReenTrantLock一樣,共享鎖ReadLock也分為公平鎖和非公平鎖。
我們回顧一下ReenTrantLock,公平鎖和非公平鎖的區別在于lock()和tryAcquire()方法允許了插隊。
我們來看一下ReenTrantReadWriteLock中的FairSync和NonSync的區別:

很容易看出來,ReadLock公平鎖和非公平鎖的區別在于readerShouldBlock方法:
公平鎖:
final boolean readerShouldBlock() {return hasQueuedPredecessors(); }在公平共享鎖中,如果在當前線程的前面有其他線程在等待獲取共享鎖,則返回true;否則,返回false。
非公平鎖:
final boolean readerShouldBlock() {return apparentlyFirstQueuedIsExclusive(); }使用ReentrantReadWriteLock的例子:
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadWriteLockTest1 { public static void main(String[] args) { // 創建賬戶MyCount myCount = new MyCount("4238920615242830", 10000); // 創建用戶,并指定賬戶User user = new User("Tommy", myCount); // 分別啟動3個“讀取賬戶金錢”的線程 和 3個“設置賬戶金錢”的線程for (int i=0; i<3; i++) {user.getCash();user.setCash((i+1)*1000);}} } class User {private String name; //用戶名 private MyCount myCount; //所要操作的賬戶 private ReadWriteLock myLock; //執行操作所需的鎖對象 User(String name, MyCount myCount) {this.name = name; this.myCount = myCount; this.myLock = new ReentrantReadWriteLock();}public void getCash() {new Thread() {public void run() {myLock.readLock().lock(); try {System.out.println(Thread.currentThread().getName() +" getCash start"); myCount.getCash();Thread.sleep(1);System.out.println(Thread.currentThread().getName() +" getCash end"); } catch (InterruptedException e) {} finally {myLock.readLock().unlock(); }}}.start();}public void setCash(final int cash) {new Thread() {public void run() {myLock.writeLock().lock(); try {System.out.println(Thread.currentThread().getName() +" setCash start"); myCount.setCash(cash);Thread.sleep(1);System.out.println(Thread.currentThread().getName() +" setCash end"); } catch (InterruptedException e) {} finally {myLock.writeLock().unlock(); }}}.start();} }class MyCount {private String id; //賬號 private int cash; //賬戶余額 MyCount(String id, int cash) { this.id = id; this.cash = cash; } public String getId() { return id; } public void setId(String id) { this.id = id; } public int getCash() { System.out.println(Thread.currentThread().getName() +" getCash cash="+ cash); return cash; } public void setCash(int cash) { System.out.println(Thread.currentThread().getName() +" setCash cash="+ cash); this.cash = cash; } }運行結果:
Thread-0 getCash start Thread-2 getCash start Thread-0 getCash cash=10000 Thread-2 getCash cash=10000 Thread-0 getCash end Thread-2 getCash end Thread-1 setCash start Thread-1 setCash cash=1000 Thread-1 setCash end Thread-3 setCash start Thread-3 setCash cash=2000 Thread-3 setCash end Thread-4 getCash start Thread-4 getCash cash=2000 Thread-4 getCash end Thread-5 setCash start Thread-5 setCash cash=3000 Thread-5 setCash end說明:
(01) 觀察Thread0和Thread-2的運行結果,我們發現,Thread-0啟動并獲取到“讀取鎖”,在它還沒運行完畢的時候,Thread-2也啟動了并且也成功獲取到“讀取鎖”。
因此,“讀取鎖”支持被多個線程同時獲取。
(02) 觀察Thread-1,Thread-3,Thread-5這三個“寫入鎖”的線程。“寫入鎖”不支持被多個線程同時獲取。
(03) 觀察Thread-3,Thread-4,Thread-5這三個“寫入鎖”的線程,讀寫互斥,只要“寫入鎖”被某線程獲取,則該線程運行完畢了,才釋放該鎖。
簡單來說: “讀-讀”不互斥,”讀-寫”互斥,”寫-寫”互斥
可能令人比較好奇的是,為什么readLock,writeLock兩個不同的對象,為什么他們之間是互斥的呢?
原因很簡單:
因為readLock,writeLock的構造方法都要傳入ReentrantReadWriteLock中的Sysn鎖,Sysn鎖也是ReentrantReadWriteLock的組件,是唯一的。
總結
以上是生活随笔為你收集整理的JUC锁-ReentrantReadWrite(五)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JUC锁-LockSupport(四)
- 下一篇: JUC锁-CountDownLatch(