多线程(三)之ReentrantLock源码解析
2019獨角獸企業重金招聘Python工程師標準>>>
?
今天分析ReentrantLock類的源碼,在看源碼之前,先學習AQS(AbstractQueuedSynchronizer)類。
一、AQS
什么是AQS?
AQS是一個類,要實現多線程鎖,可以繼承該類,并重寫其中的方法即可。該類也叫同步器。
同步器的主要使用方式是繼承,子類通過繼承同步器并實現它的抽象方法來管理同步狀態(state,標記線程的狀態)。
同步器的設計是基于模板方法模式, 使用者需要繼承同步器并重寫指定的方法,隨后將同步器組合在自定義同步組件的實現中,并調用同步器提供的模板方法,而這些模板方法將會調用使用者重寫的方法。
同步器需要重寫的重要方法:
tryAcquire? 獨占鎖獲取
tryRelease?? 獨占鎖釋放
tryAcquireShared? 共享鎖獲取
tryReleaseShared? 共享鎖釋放
isHeldExclusively 快速判斷被線程獨占
對同步狀態進行更改,這時就需要使用同步器提供的3個方法(這三個方法不需要重寫,只要子類進行調用,完成相應的操作)
getState?獲取同步狀態
setState 設置同步狀態
compareAndSetState 原子的設置同步狀態來進行操作。
?
二、結合ReentrantLock類來了解AQS的源碼
ReentrantLock的結構
解釋下,Sync是ReentrantLock類的內部類,且是實現AQS的類。ReentrantLock有兩種鎖,公平鎖和非公平鎖,分別繼承了Sync類。下面以非公平鎖為例,說明下AQS的源碼。(確實AQS的源碼比較難理解,我理解的可能也不是很深,畢竟才看了四五遍,我先把我的理解用圖文并茂的形式展現出來)
ReentrantLock源碼解析
final void lock() {if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}非公平鎖調用lock方法,是通過同步方法compareAndSetState(最終調用了unsafe類的方法)直接嘗試設置state為1,意思就是如果沒有鎖,這個方法就會返回true,執行setExclusiveOwnerThread方法,將當前線程設置為鎖的持有者。如果之前有鎖,就執行acquire這個方法。(state:0表示沒有鎖,1表示已經有鎖)
假設,這時有個thread1調用非公平鎖的lock方法,因為state的初始狀態位0,很明顯,執行了setExclusiveOwnerThread,將state設置為了1,并將當前線程設置為鎖的持有者。然后又來了一個線程thread2調用lock獲取鎖,設這時候的thread1并沒有釋放鎖。明顯compareAndSetState設置不成功,返回false,然后執行acquire方法。這個方法是AQS類里。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}?這個if寫的,比較高大上,好多源碼都是這么寫的。可以看成:
if(!tryAcquire(arg)) {Node node = addWaiter(Node.EXCLUSIVE);if(acquierQueued(node,arg))selfInterrupt(); }這時候,thread2先執行tryAcquire方法。那tryAcquire在哪呢?上面說了,子類要實現tryAcquire方法。所以執行的是在NonFairSync里面。
protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}我們現在看下nonfairTryAcquire方法
final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();//如果沒有鎖,就嘗試獲取鎖if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}//判斷當前線程是否為鎖的持有者,支持鎖的重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}thread2調用該方法,這時候state為1(因為thread1在持有鎖),并且thread2<>thread1,所以返回false。
然后再回到acquire方法。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}因為tryAcquire返回false,即!tryAcquire為true。這時候thread2執行addWaiter方法。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {/***和enq中的else中的方法一致,都是將節點插入到同步隊列末尾**/node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}首先,將thread2封裝成Node類(這個Node類定義在AQS里面,是同步隊列中的節點),因為thread1并沒有創建隊列,所以tail為null,即空的同步隊列。所以thread2執行enq方法。
private Node enq(final Node node) {for (;;) {Node t = tail;if (t == null) { // Must initializeif (compareAndSetHead(new Node()))tail = head;} else {node.prev = t;if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}這是一個自旋循環。。。。
通過上面分析,同步隊列為空,即t=null。所以,thread2新建了一個空的Node節點,該節點是“假”節點,然后通過CAS操作設置了線程的阻塞隊列的head節點就是我們剛才new出來的那個空的node節點,并將tail和head指向這個“假”節點,由此可知,該同步隊列是一個帶頭節點的同步隊列。至此,第一次循環結束,此時同步隊列的狀態為:
然后進行第二循環,t這時候不為空了,所以執行else分支。
一句一句的來:
node.prev = t;將尾部tail節點賦值給我們傳遞進來的節點Node的前驅節點,此時的結構如下:
compareAndSetTail(t, node),通過同步方法,將node設置為尾節點。
t.next=node;將原來的tail節點的后繼指向node節點。
此時跳出自旋,返回當前節點。
public final void acquire(int arg) {if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}在看這個acquire方法,執行完addWaiter方法后,然后thread2執行acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}哎,頭疼,又是一個自旋,不過沒有關系,把同步隊列的結構搞清楚,就很容易分析。
首先,獲取thread2對應的節點的前驅節點,這時,就是head節點,即“假”節點。如果前驅節點是head節點,就調用之前的tryAcquire方法,嘗試獲取鎖。因為thread1沒有釋放鎖,很明顯tryAcquire方法返回false。因此thread2方法執行下面的if方法。看下shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {int ws = pred.waitStatus;//Node.SIGNAL表示-1if (ws == Node.SIGNAL)/** This node has already set status asking a release* to signal it, so it can safely park.*/return true;if (ws > 0) {/** Predecessor was cancelled. Skip over predecessors and* indicate retry.*/do {node.prev = pred = pred.prev;} while (pred.waitStatus > 0);pred.next = node;} else {/** waitStatus must be 0 or PROPAGATE. Indicate that we* need a signal, but don't park yet. Caller will need to* retry to make sure it cannot acquire before parking.*/compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}獲取前驅節點的waitStatus值。-1:表示當前節點正處于等待signal喚醒。>0:表示前驅節點應該被移除。其他情況:將調用同步方法將前驅的waitStatus設置為-1
因為head節點的waitStatus=0,所以設置head的waitStatus為-1,然后返回false。此時,結束了acquireQueued的第一次循環。現在執行第二次循環,和第一循環一樣,沒有執行第一個if,繼續執行shouldParkAfterFailedAcquire方法。這時ws不再是0了,而是-1了。因此返回true。然后調用acquireQueued中的parkAndCheckInterrupt方法。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}由代碼可以看出,調用了LockSupport的park方法。
這里介紹下LockSuppor類。
LockSupport定義了一組的公共靜態方法,這些方法提供了最基本的線程阻塞和喚醒功能,而LockSupport也成為構建同步組件的基礎工具。LockSupport定義了一組以park開頭的方法用來阻塞當前線程,以及unpark(Thread thread)方法來喚醒一個被阻塞的線程。
由此可知,thread2被阻塞起來,不在執行。。。。
假設這時,thread1還沒有釋放鎖(感覺時間過了半個世紀,其實這些操作,以計算機的速度,我們看了還沒有這些操作差不多),此時thread3線程調用lock方法。因為thread1還有沒有釋放鎖,所以調用AQS的acquire方法。進而調用tryAcquire方法,同樣返回false,所以繼續調用addWaiter方法。
private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}enq(node);return node;}因為此時同步隊列中有thread2方法,所以執行if里面的方法,直接將thread3對應的節點插入到同步隊列的末尾,然后返回。繼續調用acquireQueued方法。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}進入自旋,第一次循環,很明顯thread3節點的前驅節點是thread2,不是head,所以執行shouldParkAfterFailedAcquire方法。因為thread2結點的waitStatus為0,所以通過同步方法設置waitStatus為-1,然后返回false。此時,第一次循環結束。執行第二次循環,hread3的前繼任然不是thread2,所以繼續執行shouldParkAfterFailedAcquire方法。此時thread2的waitStatus為-1,所以直接返回true。然后thread3執行parkAndCheckInterrupt方法。然后將thread3線程進行阻塞。
此時thread1終于結束了,調用unlock釋放鎖。
public void unlock() {sync.release(1);}進而調用AQS的release放釋放鎖。
public final boolean release(int arg) {if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}首先,調用NonFairSync中的tryRelease方法。
protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}現將當前的state-1,如果為0,表示沒有線程持有鎖了,將持有鎖的線程設置為?null,設置state為0,放回true。如果state不等0,表示有重入鎖,設置state(不為0),然后返回false。設thread1沒有重入鎖,所以該方法返回true。然后進行if判斷,如果同步隊列有結點,并且頭結點的waitStatus不為0,則調用unparkSuccessor方法。
private void unparkSuccessor(Node node) {/** If status is negative (i.e., possibly needing signal) try* to clear in anticipation of signalling. It is OK if this* fails or if status is changed by waiting thread.*/int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);/** Thread to unpark is held in successor, which is normally* just the next node. But if cancelled or apparently null,* traverse backwards from tail to find the actual* non-cancelled successor.*/Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}第一步,將頭結點的waitStatus設為0,然后判斷head的下一個結點,即thead2結點是否為null,如果不為空,thread2的waitStatus值是否>0。很明顯,不執行這個if,那就執行下一個if,將thread2進行喚醒。我們知道,thread2在acquireQueued方法里被阻塞,然后喚醒后,繼續執行。進行第下一次的循環。
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}然后調用第一if,執行if里面的代碼。即將thread2結點設為頭結點,讓原來的head結點從同步隊列中清除。然后返回false。至此,thread2處于運行狀態。。。。
公平鎖和非公平鎖的區別:
在公平鎖中FairSync中,tryAcquire方法:
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}對比非公平鎖發現,
if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {這個判斷多了一個!hasQueuedPredecessors方法,判斷同步隊列中是否還有等待的線程,如果沒有,當前線程就獲取state,如果有,就返回false,意味著將當前線程插入到同步隊列末尾。而非公平鎖就不看有沒有隊列,直接嘗試獲取鎖,如果沒有獲取到才插入到隊列的末尾。
可中斷的獲取鎖
當調用lockInterruptibly時:
public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);}然后調用AQS中的acquireInterruuptibly方法。
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (!tryAcquire(arg))doAcquireInterruptibly(arg);}首先查看當前線程的中斷標記為,如果為true,就拋異常。
private void doAcquireInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.EXCLUSIVE);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}} final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}對比lock獲取鎖的acquireQueued方法和lockInterruptibly獲取鎖的doAcquireInterruptibly方法,發現當線程處于等待鎖的阻塞中,如果被中斷,doAcquireInterruptibly直接拋出異常,而acquireQueued只是將中斷標記位設為true,具體中斷不中斷看線程的心情。。。。。。。
轉載于:https://my.oschina.net/littlestyle/blog/1605294
總結
以上是生活随笔為你收集整理的多线程(三)之ReentrantLock源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BIM机器人来袭、你害怕了吗
- 下一篇: oracle之 SYSAUX表空间维护