并发编程之 Semaphore 源码分析
前言
并發 JUC 包提供了很多工具類,比如之前說的 CountDownLatch,CyclicBarrier ,今天說說這個 Semaphore——信號量,關于他的使用請查看往期文章并發編程之 線程協作工具類,今天的任務就是從源碼層面分析一下他的原理。
源碼分析
如果先不看源碼,根據以往我們看過的 CountDownLatch CyclicBarrier 的源碼經驗來看,Semaphore 會怎么設計呢?
首先,他要實現多個線程線程同時訪問一個資源,類似于共享鎖,并且,要控制進入資源的線程的數量。
如果根據 JDK 現有的資源,我們是否可以使用 AQS 的 state 變量來控制呢?類似 CountDownLatch 一樣,有幾個線程我們就為這個 state 變量設置為幾,當 state 達到了閾值,其他線程就不能獲取鎖了,就需要等待。當 Semaphore 調用 release 方法的時候,就釋放鎖,將 state 減一,并喚醒 AQS 上的線程。
以上,就是我們的猜想,那我們看看 JDK 是不是和我們想的一樣。
首先看看 Semaphore 的 UML 結構:
內部有 3 個類,繼承了 AQS。一個公平鎖,一個非公平鎖,這點和 ReentrantLock 一摸一樣。
看看他的構造器:
public Semaphore(int permits) {sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }兩個構造器,兩個參數,一個是許可線程數量,一個是是否公平鎖,默認非公平。
而 Semaphore 有 2 個重要的方法,也是我們經常使用的 2 個方法:
semaphore.acquire(); // doSomeing..... semaphore.release();acquire 和 release 方法,我們今天重點看這兩個方法的源碼,一窺 Semaphore 的全貌。
acquire 方法源碼分析
代碼如下:
public void acquire() throws InterruptedException {// 嘗試獲取一個鎖sync.acquireSharedInterruptibly(1); }// 這是抽象類 AQS 的方法 public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 如果小于0,就獲取鎖失敗了。加入到AQS 等待隊列中。// 如果大于0,就直接執行下面的邏輯了。不用進行阻塞等待。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); } // 這是抽象父類 Sync 的方法,默認是非公平的 protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); } // 非公平鎖的釋放鎖的方法 final int nonfairTryAcquireShared(int acquires) {// 死循環for (;;) {// 獲取鎖的狀態int available = getState();int remaining = available - acquires;// state 變量是否還足夠當前獲取的// 如果小于 0,獲取鎖就失敗了。// 如果大于 0,就循環嘗試使用 CAS 將 state 變量更新成減去輸入參數之后的。if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }這里的釋放就是對 state 變量減一(或者更多)的。
返回了剩余的 state 大小。
當返回值小于 0 的時候,說明獲取鎖失敗了,那么就需要進入 AQS 的等待隊列了。代碼入下:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 添加一個節點 AQS 隊列尾部final Node node = addWaiter(Node.SHARED);boolean failed = true;try {// 死循環for (;;) {// 找到新節點的上一個節點final Node p = node.predecessor();// 如果這個節點是 head,就嘗試獲取鎖if (p == head) {// 繼續嘗試獲取鎖,這個方法是子類實現的int r = tryAcquireShared(arg);// 如果大于0,說明拿到鎖了。if (r >= 0) {// 將 node 設置為 head 節點// 如果大于0,就說明還有機會獲取鎖,那就喚醒后面的線程,稱之為傳播setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 如果他的上一個節點不是 head,就不能獲取鎖// 對節點進行檢查和更新狀態,如果線程應該阻塞,返回 true。if (shouldParkAfterFailedAcquire(p, node) &&// 阻塞 park,并返回是否中斷,中斷則拋出異常parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)// 取消節點cancelAcquire(node);} }總的邏輯就是:
創建一個分享類型的 node 節點包裝當前線程追加到 AQS 隊列的尾部。
如果這個節點的上一個節點是 head ,就是嘗試獲取鎖,獲取鎖的方法就是子類重寫的方法。如果獲取成功了,就將剛剛的那個節點設置成 head。
如果沒搶到鎖,就阻塞等待。
release 方法源碼分析
該方法用于釋放鎖,代碼如下:
public void release() {sync.releaseShared(1); }public final boolean releaseShared(int arg) {// 死循環釋放成功if (tryReleaseShared(arg)) {// 喚醒 AQS 等待對列中的節點,從 head 開始 doReleaseShared();return true;}return false; } // Sync extends AbstractQueuedSynchronizer protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();// 對 state 變量 + 1int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;} }這里釋放鎖的邏輯寫在了抽象類 Sync 中。邏輯簡單,就是對 state 變量做加法。
在加法成功后,執行 doReleaseShared方法,這個方法是 AQS 的。
private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {// 設置 head 的等待狀態為 0 ,并喚醒 head 上的線程if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}// 成功設置成 0 之后,將 head 狀態設置成傳播狀態else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;} }該方法的主要作用就是從 AQS 的 head 節點開始喚醒線程,注意,這里喚醒是 head 節點的下一個節點,需要和 doAcquireSharedInterruptibly方法對應,因為 doAcquireSharedInterruptibly 方法喚醒的當前節點的上一個節點,也就是 head 節點。
至此,釋放 state 變量,喚醒 AQS 頭節點結束。
總結
總結一下 Semaphore 的原理吧。
總的來說,Semaphore 就是一個共享鎖,通過設置 state 變量來實現對這個變量的共享。當調用 acquire 方法的時候,state 變量就減去一,當調用 release 方法的時候,state 變量就加一。當 state 變量為 0 的時候,別的線程就不能進入代碼塊了,就會在 AQS 中阻塞等待。
轉載于:https://www.cnblogs.com/stateis0/p/9062042.html
總結
以上是生活随笔為你收集整理的并发编程之 Semaphore 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: TensorFlow Java+ecli
- 下一篇: 区块链学习之区块链思想的诞生(一)