深入理解ReentrantLock
在Java中通常實現(xiàn)鎖有兩種方式,一種是synchronized關鍵字,另一種是Lock。二者其實并沒有什么必然聯(lián)系,但是各有各的特點,在使用中可以進行取舍的使用。首先我們先對比下兩者。
實現(xiàn):
首先最大的不同:synchronized是基于JVM層面實現(xiàn)的,而Lock是基于JDK層面實現(xiàn)的。曾經(jīng)反復的找過synchronized的實現(xiàn),可惜最終無果。但Lock卻是基于JDK實現(xiàn)的,我們可以通過閱讀JDK的源碼來理解Lock的實現(xiàn)。
使用:
對于使用者的直觀體驗上Lock是比較復雜的,需要lock和realse,如果忘記釋放鎖就會產(chǎn)生死鎖的問題,所以,通常需要在finally中進行鎖的釋放。但是synchronized的使用十分簡單,只需要對自己的方法或者關注的同步對象或類使用synchronized關鍵字即可。但是對于鎖的粒度控制比較粗,同時對于實現(xiàn)一些鎖的狀態(tài)的轉移比較困難。例如:
特點:
| 鎖獲取超時 | 不支持 | 支持 |
| 獲取鎖響應中斷 | 不支持 | 支持 |
優(yōu)化:
在JDK1.5之后synchronized引入了偏向鎖,輕量級鎖和重量級鎖,從而大大的提高了synchronized的性能,同時對于synchronized的優(yōu)化也在繼續(xù)進行。期待有一天能更簡單的使用java的鎖。
在以前不了解Lock的時候,感覺Lock使用實在是太復雜,但是了解了它的實現(xiàn)之后就被深深吸引了。
Lock的實現(xiàn)主要有ReentrantLock、ReadLock和WriteLock,后兩者接觸的不多,所以簡單分析一下ReentrantLock的實現(xiàn)和運行機制。
ReentrantLock類在java.util.concurrent.locks包中,它的上一級的包java.util.concurrent主要是常用的并發(fā)控制類.
下面是ReentrantLock的UML圖,從圖中可以看出,ReentrantLock實現(xiàn)Lock接口,在ReentrantLock中引用了AbstractQueuedSynchronizer的子類,所有的同步操作都是依靠AbstractQueuedSynchronizer(隊列同步器)實現(xiàn)。
研究一個類,需要從一個類的靜態(tài)域,靜態(tài)類,靜態(tài)方法和成員變量開始。
private static final long serialVersionUID = 7373984872572414699L;/** Synchronizer providing all implementation mechanics */private final Sync sync;/*** Base of synchronization control for this lock. Subclassed* into fair and nonfair versions below. Uses AQS state to* represent the number of holds on the lock.*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;/*** Performs {@link Lock#lock}. The main reason for subclassing* is to allow fast path for nonfair version.*/abstract void lock();/*** Performs non-fair tryLock. tryAcquire is* implemented in subclasses, but both need nonfair* try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}protected final boolean isHeldExclusively() {// While we must in general read state before owner,// we don't need to do so to check if current thread is ownerreturn getExclusiveOwnerThread() == Thread.currentThread();}final ConditionObject newCondition() {return new ConditionObject();}// Methods relayed from outer classfinal Thread getOwner() {return getState() == 0 ? null : getExclusiveOwnerThread();}final int getHoldCount() {return isHeldExclusively() ? getState() : 0;}final boolean isLocked() {return getState() != 0;}/*** Reconstitutes this lock instance from a stream.* @param s the stream*/private void readObject(java.io.ObjectInputStream s)throws java.io.IOException, ClassNotFoundException {s.defaultReadObject();setState(0); // reset to unlocked state}}從上面的代碼可以看出來首先ReentrantLock是可序列化的,其次是ReentrantLock里有一個對AbstractQueuedSynchronizer的引用。
看完了成員變量和靜態(tài)域,我們需要了解下構造方法:
/*** Creates an instance of {@code ReentrantLock}.* This is equivalent to using {@code ReentrantLock(false)}.*/public ReentrantLock() {sync = new NonfairSync();}/*** Creates an instance of {@code ReentrantLock} with the* given fairness policy.** @param fair {@code true} if this lock should use a fair ordering policy*/public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}從上面代碼可以看出,ReentrantLock支持兩種鎖模式,公平鎖和非公平鎖。默認的實現(xiàn)是非公平的。公平和非公平鎖的實現(xiàn)如下:
/*** Sync object for non-fair locks*/static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}/*** Sync object for fair locks*/static final class FairSync extends Sync {private static final long serialVersionUID = -3000897897090466540L;final void lock() {acquire(1);}/*** Fair version of tryAcquire. Don't grant access unless* recursive call or no waiters or is first.*/protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}}AbstractQueuedSynchronizer 是一個抽象類,所以在使用這個同步器的時候,需要通過自己實現(xiàn)預期的邏輯,Sync、FairSync和NonfairSync都是ReentrantLock為了實現(xiàn)自己的需求而實現(xiàn)的內(nèi)部類,之所以做成內(nèi)部類,我認為是只在ReentrantLock使用上述幾個類,在外部沒有使用到。
我們著重關注默認的非公平鎖的實現(xiàn):
在ReentrantLock調(diào)用lock()的時候,調(diào)用的是下面的代碼:
sync的實現(xiàn)是NonfairSync,所以調(diào)用的是NonfairSync的lock方法:
/*** Sync object for non-fair locks* tips:調(diào)用Lock的時候,嘗試獲取鎖,這里采用的CAS去嘗試獲取鎖,如果獲取鎖成功* 那么,當前線程獲取到鎖,如果失敗,調(diào)用acquire處理。* */static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}接下來看看compareAndSetState方法是怎么進行鎖的獲取操作的:
/*** Atomically sets synchronization state to the given updated* value if the current state value equals the expected value.* This operation has memory semantics of a <tt>volatile</tt> read* and write.** @param expect the expected value* @param update the new value* @return true if successful. False return indicates that the actual* value was not equal to the expected value.* * tips: 1.compareAndSetState的實現(xiàn)主要是通過Unsafe類實現(xiàn)的。* 2.之所以命名為Unsafe,是因為這個類對于JVM來說是不安全的,我們平時也是使用不了這個類的。* 3.Unsafe類內(nèi)封裝了一些可以直接操作指定內(nèi)存位置的接口,是不是感覺和C有點像了?* 4.Unsafe類封裝了CAS操作,來達到樂觀的鎖的爭搶的效果*/protected final boolean compareAndSetState(int expect, int update) {// See below for intrinsics setup to support thisreturn unsafe.compareAndSwapInt(this, stateOffset, expect, update);}主要的說明都在方法的注釋中,接下來簡單的看一下 compareAndSwapInt的實現(xiàn):
/*** Atomically update Java variable to <tt>x</tt> if it is currently* holding <tt>expected</tt>.* @return <tt>true</tt> if successful*/public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x);一個native方法,沮喪.....但是從注釋看意思是,以CAS的方式將制定字段設置為指定的值。同時我們也明白了這個方法可能是用java實現(xiàn)不了,只能依賴JVm底層的C代碼實現(xiàn)。下面看看操作的stateOffset:
private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long stateOffset;private static final long headOffset;private static final long tailOffset;private static final long waitStatusOffset;private static final long nextOffset;static {try {//這個方法很有意思,主要的意思是獲取AbstractQueuedSynchronizer的state成員的偏移量//通過這個偏移量來更新state成員,另外state是volatile的來保證可見性。stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));} catch (Exception ex) { throw new Error(ex); }}stateOffset 是AbstractQueuedSynchronizer內(nèi)部定義的一個狀態(tài)量,AbstractQueuedSynchronizer是線程的競態(tài)條件,所以只要某一個線程CAS改變狀態(tài)成功,同時在沒有釋放的情況下,其他線程必然失敗(對于Unsafe類還不是很熟悉,后面還需要系統(tǒng)的學習)。
對于競爭成功的線程會調(diào)用 setExclusiveOwnerThread方法:
這個實現(xiàn)是比較簡單的,只是獲取當前線程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到當前線程。競爭失敗的線程,會調(diào)用acquire方法,這個方法也是ReentrantLock設計的精華之處:
/*** Acquires in exclusive mode, ignoring interrupts. Implemented* by invoking at least once {@link #tryAcquire},* returning on success. Otherwise the thread is queued, possibly* repeatedly blocking and unblocking, invoking {@link* #tryAcquire} until success. This method can be used* to implement method {@link Lock#lock}.** @param arg the acquire argument. This value is conveyed to* {@link #tryAcquire} but is otherwise uninterpreted and* can represent anything you like.* tips:此處主要是處理沒有獲取到鎖的線程* tryAcquire:重新進行一次鎖獲取和進行鎖重入的處理。* addWaiter:將線程添加到等待隊列中。* acquireQueued:自旋獲取鎖。 * selfInterrupt:中斷線程。* 三個條件的關系為and,如果 acquireQueued返回true,那么線程被中斷selfInterrupt會中斷線程*/public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}AbstractQueuedSynchronizer為抽象方法,調(diào)用tryAcquire時,調(diào)用的為NonfairSync的tryAcquire。
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);} /*** Performs non-fair tryLock. tryAcquire is* implemented in subclasses, but both need nonfair* try for trylock method.*/final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}nonfairTryAcquire方法主要是做重入鎖的實現(xiàn),synchronized本身支持鎖的重入,而ReentrantLock則是通過此處實現(xiàn)。在鎖狀態(tài)為0時,重新嘗試獲取鎖。如果已經(jīng)被占用,那么做一次是否當前線程為占用鎖的線程的判斷,如果是一樣的那么進行計數(shù),當然在鎖的relase過程中會進行遞減,保證鎖的正常釋放。
如果沒有重新獲取到鎖或者鎖的占用線程和當前線程是一個線程,方法返回false。那么把線程添加到等待隊列中,調(diào)用addWaiter:
這里主要是用當前線程構建一個Node的等待隊列雙向鏈表,這里addWaiter中和enq中的部分邏輯是重復的,個人感覺可能是如果能一次成功就避免了enq中的死循環(huán)。因為tail節(jié)點是volatile的同時node也是不會發(fā)生競爭的所以node.prev = pred;是安全的。但是tail的next是不斷競爭的,所以利用compareAndSetTail保證操作的串行化。接下來調(diào)用acquireQueued方法:
/*** Acquires in exclusive uninterruptible mode for thread already in* queue. Used by condition wait methods as well as acquire.** @param node the node* @param arg the acquire argument* @return {@code true} if interrupted while waiting*/final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}此處是做Node節(jié)點線程的自旋過程,自旋過程主要檢查當前節(jié)點是不是head節(jié)點的next節(jié)點,如果是,則嘗試獲取鎖,如果獲取成功,那么釋放當前節(jié)點,同時返回。至此一個非公平鎖的鎖獲取過程結束。
如果這里一直不斷的循環(huán)檢查,其實是很耗費性能的,JDK的實現(xiàn)肯定不會這么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,這兩個方法就實現(xiàn)了線程的等待從而避免無限的輪詢:
首先,檢查一下當前Node的前置節(jié)點pred是否是SIGNAL,如果是SIGNAL,那么證明前置Node的線程已經(jīng)Park了,如果waitStatus>0,那么當前節(jié)點已經(jīng)Concel或者中斷。那么不斷調(diào)整當前節(jié)點的前置節(jié)點,將已經(jīng)Concel的和已經(jīng)中斷的線程移除隊列。如果waitStatus<0,那么設置waitStatus為SIGNAL,因為調(diào)用shouldParkAfterFailedAcquire的方法為死循環(huán)調(diào)用,所以終將返回true。接下來看parkAndCheckInterrupt方法,當shouldParkAfterFailedAcquire返回True的時候執(zhí)行parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}此方法比較簡單,其實就是使當前的線程park,即暫停了線程的輪詢。當Unlock時會做后續(xù)節(jié)點的Unpark喚醒線程繼續(xù)爭搶鎖。
接下來看一下鎖的釋放過程,鎖釋放主要是通過unlock方法實現(xiàn):
主要是調(diào)用AbstractQueuedSynchronizer同步器的release方法:
/*** Releases in exclusive mode. Implemented by unblocking one or* more threads if {@link #tryRelease} returns true.* This method can be used to implement method {@link Lock#unlock}.** @param arg the release argument. This value is conveyed to* {@link #tryRelease} but is otherwise uninterpreted and* can represent anything you like.* @return the value returned from {@link #tryRelease}*/public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}tryRelease方法為ReentrantLock中的Sync的tryRelease方法:
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}tryRelease方法主要是做了一個釋放鎖的過程,將同步狀態(tài)state -1,直到減到0為止,這主要是兼容重入鎖設計的,同時setExclusiveOwnerThread(null)清除當前占用的線程。這些head節(jié)點后的線程和新進的線程就可以開始爭搶。這里需要注意的是對于同步隊列中的線程來說在setState(c),且c為0的時候,同步隊列中的線程是沒有競爭鎖的,因為線程被park了還沒有喚醒。但是此時對于新進入的線程是有機會獲取到鎖的。
下面代碼是進行線程的喚醒:
因為在setState(c)釋放了鎖之后,是沒有線程競爭的,所以head是當前的head節(jié)點,先檢查當前的Node是否合法,如果合法則unpark it。開始鎖的獲取。就回到了上面的for循環(huán)執(zhí)行獲取鎖邏輯:
至此鎖的釋放就結束了,可以看到ReentrantLock是一個不斷的循環(huán)的狀態(tài)模型,里面有很多東西值得我們學習和思考。
ReentrantLock具有公平和非公平兩種模式,也各有優(yōu)缺點:
公平鎖是嚴格的以FIFO的方式進行鎖的競爭,但是非公平鎖是無序的鎖競爭,剛釋放鎖的線程很大程度上能比較快的獲取到鎖,隊列中的線程只能等待,所以非公平鎖可能會有“饑餓”的問題。但是重復的鎖獲取能減小線程之間的切換,而公平鎖則是嚴格的線程切換,這樣對操作系統(tǒng)的影響是比較大的,所以非公平鎖的吞吐量是大于公平鎖的,這也是為什么JDK將非公平鎖作為默認的實現(xiàn)。
最后:
關于并發(fā)和Lock還有很多的點還是比較模糊,我也會繼續(xù)學習,繼續(xù)總結,如果文章中有什么問題,還請各位看客及時指出,共同學習。
from:?https://www.cnblogs.com/zhimingyang/p/5702752.html?
總結
以上是生活随笔為你收集整理的深入理解ReentrantLock的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 重入锁:ReentrantLock 详解
- 下一篇: ReentrantLock实现原理深入探