Java并发编程笔记之Semaphore信号量源码分析
JUC 中 Semaphore 的使用與原理分析,Semaphore 也是 Java 中的一個同步器,與 CountDownLatch 和 CycleBarrier 不同在于它內(nèi)部的計數(shù)器是遞增的,那么,Semaphore 的內(nèi)部實現(xiàn)是怎樣的呢?
Semaphore 信號量也是Java 中一個同步容器,與CountDownLatch 和 CyclicBarrier 不同之處在于它內(nèi)部的計數(shù)器是遞增的。為了能夠一覽Semaphore的內(nèi)部結(jié)構(gòu),我們首先要看一下Semaphore的類圖,類圖,如下所示:
?
?如上類圖可以知道Semaphoren內(nèi)部還是使用AQS來實現(xiàn)的,Sync只是對AQS的一個修飾,并且Sync有兩個實現(xiàn)類,分別代表獲取信號量的時候是否采取公平策略。創(chuàng)建Semaphore的時候會有一個變量標(biāo)示是否使用公平策略,源碼如下:
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}Sync(int permits) {setState(permits);}如上面代碼所示,Semaphore默認(rèn)使用的是非公平策略,如果你需要公平策略,則可以使用帶兩個參數(shù)的構(gòu)造函數(shù)來構(gòu)造Semaphore對象,另外和CountDownLatch一樣,構(gòu)造函數(shù)里面?zhèn)鬟f的初始化信號量個數(shù) permits 被賦值給了AQS 的state狀態(tài)變量,也就是說這里AQS的state值表示當(dāng)前持有的信號量個數(shù)。
?
接下來我們主要看看Semaphore實現(xiàn)的主要方法的源碼,如下:
1.void acquire() 當(dāng)前線程調(diào)用該方法的時候,目的是希望獲取一個信號量資源,如果當(dāng)前信號量計數(shù)個數(shù)大于 0 ,并且當(dāng)前線程獲取到了一個信號量則該方法直接返回,當(dāng)前信號量的計數(shù)會減少 1 。否則會被放入AQS的阻塞隊列,當(dāng)前線程被掛起,直到其他線程調(diào)用了release方法釋放了信號量,并且當(dāng)前線程通過競爭獲取到了改信號量。當(dāng)前線程被其他線程調(diào)用了 interrupte()方法中斷后,當(dāng)前線程會拋出 InterruptedException異常返回。源碼如下:
public void acquire() throws InterruptedException {//傳遞參數(shù)為1,說明要獲取1個信號量資源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {//(1)如果線程被中斷,則拋出中斷異常if (Thread.interrupted())throw new InterruptedException();//(2)否者調(diào)用sync子類方法嘗試獲取,這里根據(jù)構(gòu)造函數(shù)確定使用公平策略if (tryAcquireShared(arg) < 0)//如果獲取失敗則放入阻塞隊列,然后再次嘗試如果失敗則調(diào)用park方法掛起當(dāng)前線程 doAcquireSharedInterruptibly(arg);}如上代碼可知,acquire()內(nèi)部調(diào)用了sync的acquireSharedInterruptibly? 方法,后者是對中斷響應(yīng)的(如果當(dāng)前線程被中斷,則拋出中斷異常),嘗試獲取信號量資源的AQS的方法tryAcquireShared 是由 sync 的子類實現(xiàn),所以這里就要分公平性了,這里先討論非公平策略 NonfairSync 類的?tryAcquireShared 方法,源碼如下:
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}final int nonfairTryAcquireShared(int acquires) {for (;;) {//獲取當(dāng)前信號量值int available = getState();//計算當(dāng)前剩余值int remaining = available - acquires;//如果當(dāng)前剩余小于0或者CAS設(shè)置成功則返回if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }
如上代碼,先計算當(dāng)前信號量值(available)減去需要獲取的值(acquires) 得到剩余的信號量個數(shù)(remaining),如果剩余值小于 0 說明當(dāng)前信號量個數(shù)滿足不了需求,則直接返回負(fù)數(shù),然后當(dāng)前線程會被放入AQS的阻塞隊列,當(dāng)前線程被掛起。如果剩余值大于 0 則使用CAS操作設(shè)置當(dāng)前信號量值為剩余值,然后返回剩余值。另外可以知道NonFairSync是非公平性獲取的,是說先調(diào)用aquire方法獲取信號量的線程不一定比后來者先獲取鎖。
?
接下來我們要看看公平性的FairSync 類是如何保證公平性的,源碼如下:
protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}可以知道公平性還是靠 hasQueuedPredecessors 這個方法來做的,以前的隨筆已經(jīng)講過公平性是看當(dāng)前線程節(jié)點是否有前驅(qū)節(jié)點也在等待獲取該資源,如果是則自己放棄獲取的權(quán)力,然后當(dāng)前線程會被放入AQS阻塞隊列,否則就去獲取。hasQueuedPredecessors源碼如下:
public final boolean hasQueuedPredecessors() {Node t = tail; Node h = head;Node s;return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }如上面代碼所示,如果當(dāng)前線程節(jié)點有前驅(qū)節(jié)點則返回true,否則如果當(dāng)前AQS隊列為空 或者 當(dāng)前線程節(jié)點是AQS的第一個節(jié)點則返回 false ,其中,如果 h == t 則說明當(dāng)前隊列為空則直接返回 false,如果 h !=t 并且 s == null 說明有一個元素將要作為AQS的第一個節(jié)點入隊列(回顧下 enq 函數(shù)第一個元素入隊列是兩步操作,首先創(chuàng)建一個哨兵頭節(jié)點,然后第一個元素插入到哨兵節(jié)點后面),那么返回 true,如果? h !=t 并且 s != null 并且??s.thread != Thread.currentThread() 則說明隊列里面的第一個元素不是當(dāng)前線程則返回 true。
?
2.void acquire(int permits) 該方法與 acquire() 不同在與后者只需要獲取一個信號量值,而前者則獲取指定 permits 個,源碼如下:
public void acquire(int permits) throws InterruptedException {if (permits < 0)throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }
?
3.void acquireUninterruptibly() 該方法與 acquire() 類似,不同之處在于該方法對中斷不響應(yīng),也就是當(dāng)當(dāng)前線程調(diào)用了 acquireUninterruptibly 獲取資源過程中(包含被阻塞后)其它線程調(diào)用了當(dāng)前線程的 interrupt()方法設(shè)置了當(dāng)前線程的中斷標(biāo)志當(dāng)前線程并不會拋出 InterruptedException 異常而返回。源碼如下:
public void acquireUninterruptibly() {sync.acquireShared(1); }?
4.void acquireUninterruptibly(int permits) 該方法與 acquire(int permits) 不同在于該方法對中斷不響應(yīng)。源碼如如下:
public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}?
5.void release() 該方法作用是把當(dāng)前 semaphore對象的信號量值增加 1 ,如果當(dāng)前有線程因為調(diào)用 acquire 方法被阻塞放入了 AQS的阻塞隊列,則會根據(jù)公平策略選擇一個線程進(jìn)行激活,激活的線程會嘗試獲取剛增加的信號量,源碼如下:
public void release() {//(1)arg=1sync.releaseShared(1);}public final boolean releaseShared(int arg) {//(2)嘗試釋放資源if (tryReleaseShared(arg)) {//(3)資源釋放成功則調(diào)用park喚醒AQS隊列里面最先掛起的線程 doReleaseShared();return true;}return false;}protected final boolean tryReleaseShared(int releases) {for (;;) {//(4)獲取當(dāng)前信號量值int current = getState();//(5)當(dāng)前信號量值增加releases,這里為增加1int next = current + releases;if (next < current) // 移除處理throw new Error("Maximum permit count exceeded");//(6)使用cas保證更新信號量值的原子性if (compareAndSetState(current, next))return true;}}如上面代碼可以看到 release()方法中對 sync.releaseShared(1),可以知道release方法每次只會對信號量值增加 1 ,tryReleaseShared方法是無限循環(huán),使用CAS保證了 release 方法對信號量遞增 1 的原子性操作。當(dāng)tryReleaseShared 方法增加信號量成功后會執(zhí)行代碼(3),調(diào)用AQS的方法來激活因為調(diào)用acquire方法而被阻塞的線程。
?
6.void release(int permits) 該方法與不帶參數(shù)的不同之處在于前者每次調(diào)用會在信號量值原來基礎(chǔ)上增加 permits,而后者每次增加 1。源碼如下:
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits); }另外注意到這里調(diào)用的是 sync.releaseShared 是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程可以同時使用CAS去更新信號量的值而不會阻塞。
?
到目前已經(jīng)知道了其原理,接下來用一個例子來加深對Semaphore的理解,例子如下:
package com.hjc;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore;/*** Created by cong on 2018/7/8.*/ public class SemaphoreTest {// 創(chuàng)建一個Semaphore實例private static volatile Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 加入線程A到線程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() + " over");semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 加入線程B到線程池executorService.submit(new Runnable() {public void run() {try {System.out.println(Thread.currentThread() + " over");semaphore.release();} catch (Exception e) {e.printStackTrace();}}});// 等待子線程執(zhí)行完畢,返回semaphore.acquire(2);System.out.println("all child thread over!");//關(guān)閉線程池 executorService.shutdown();} }運行結(jié)果如下:
類似于 CountDownLatch,上面我們的例子也是在主線程中開啟兩個子線程進(jìn)行執(zhí)行,等所有子線程執(zhí)行完畢后主線程在繼續(xù)向下運行。
如上代碼首先首先創(chuàng)建了一個信號量實例,構(gòu)造函數(shù)的入?yún)?0,說明當(dāng)前信號量計數(shù)器為 0,然后 main 函數(shù)添加兩個線程任務(wù)到線程池,每個線程內(nèi)部調(diào)用了信號量的 release 方法,相當(dāng)于計數(shù)值遞增一,最后在 main 線程里面調(diào)用信號量的 acquire 方法,參數(shù)傳遞為 2 說明調(diào)用 acquire 方法的線程會一直阻塞,直到信號量的計數(shù)變?yōu)?2 時才會返回。
看到這里也就明白了,如果構(gòu)造 Semaphore 時候傳遞的參數(shù)為 N,在 M 個線程中調(diào)用了該信號量的 release 方法,那么在調(diào)用 acquire 對 M 個線程進(jìn)行同步時候傳遞的參數(shù)應(yīng)該是 M+N;
?
對CountDownLatch,CyclicBarrier,Semaphored這三者之間的比較總結(jié):
1.CountDownLatch 通過計數(shù)器提供了更靈活的控制,只要檢測到計數(shù)器為 0,而不管當(dāng)前線程是否結(jié)束調(diào)用 await 的線程就可以往下執(zhí)行,相比使用 jion 必須等待線程執(zhí)行完畢后主線程才會繼續(xù)向下運行更靈活。
2.CyclicBarrier 也可以達(dá)到 CountDownLatch 的效果,但是后者當(dāng)計數(shù)器變?yōu)?0 后,就不能在被復(fù)用,而前者則使用 reset 方法可以重置后復(fù)用,前者對同一個算法但是輸入?yún)?shù)不同的類似場景下比較適用。
3.而 semaphore 采用了信號量遞增的策略,一開始并不需要關(guān)心需要同步的線程個數(shù),等調(diào)用 aquire 時候在指定需要同步個數(shù),并且提供了獲取信號量的公平性策略。
轉(zhuǎn)載于:https://www.cnblogs.com/huangjuncong/p/9280646.html
總結(jié)
以上是生活随笔為你收集整理的Java并发编程笔记之Semaphore信号量源码分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 数据库多表查询之 where I
- 下一篇: Spring源码下载及安装