Java Review - 并发编程_ 信号量Semaphore原理源码剖析
文章目錄
- 概述
- 小Demo
- 類關系概述
- 核心方法源碼解讀
- void acquire()
- 非公平策略NonfairSync類的`tryAcquireShared`方法
- 公平策略`FairSync`類的`tryAcquireShared`方法
- void acquire(int permits)
- void acquireUninterruptibly()
- void acquireUninterruptibly(int permits)
- void release()
- void release(int permits)
- 小結
概述
Semaphore信號量也是Java中的一個同步器,與CountDownLatch和CycleBarrier不同的是,它內部的計數器是遞增的,并且在一開始初始化Semaphore時可以指定一個初始值,但是并不需要知道需要同步的線程個數,而是在需要同步的地方調用acquire方法時指定需要同步的線程個數。
小Demo
同樣下面的例子也是在主線程中開啟兩個子線程讓它們執行,等所有子線程執行完畢后主線程再繼續向下運行。
import java.time.LocalTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/14 23:59* @mark: show me the code , change the world*/ public class SemphoreTest {// 1 創建Sempaphore實例 當前信號量計數器的值為0private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 線程1 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程2 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 1 等待子線程執行任務完成后返回semaphore.acquire(2);System.out.println(Thread.currentThread().getName() + "任務執行結束 " + LocalTime.now()) ;// 關閉線程池executorService.shutdown();} }-
首先創建了一個信號量實例,構造函數的入參為0,說明當前信號量計數器的值為0
-
然后main函數向線程池添加兩個線程任務,在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1
-
最后在main線程里面調用信號量的acquire方法,傳參為2說明調用acquire方法的線程會一直阻塞,直到信號量的計數變為2才會返回
看到這里也就明白了,如果構造Semaphore時傳遞的參數為N,并在M個線程中調用了該信號量的release方法,那么在調用acquire使M個線程同步時傳遞的參數應該是M+N。
下面舉個例子來模擬【CyclicBarrier復用】的功能,代碼如下
import java.time.LocalTime; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/14 23:59* @mark: show me the code , change the world*/ public class SemphoreTest2 {// 1 創建Sempaphore實例private static Semaphore semaphore = new Semaphore(0);public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newFixedThreadPool(2);// 線程1 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程2 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 1 等待子線程執行任務完成后返回semaphore.acquire(2);// 線程3 提交到線程池executorService.submit(() -> {System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();});// 線程4 提交到線程池executorService.submit(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + " 執行結束 " + LocalTime.now());// 在每個線程內部調用信號量的release方法,這相當于讓計數器值遞增1semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}});// 2等待子線程執行任務完成后返回semaphore.acquire(2);System.out.println(Thread.currentThread().getName() + "任務執行結束 " + LocalTime.now()) ;// 關閉線程池executorService.shutdown();} }-
首先將線程1和線程2加入到線程池。主線程執行代碼(1)后被阻塞。線程1和線程2調用release方法后信號量的值變為了2,這時候主線程的aquire方法會在獲取到2個信號量后返回(返回后當前信號量值為0)。
-
然后主線程添加線程3和線程4到線程池,之后主線程執行代碼(2)后被阻塞(因為主線程要獲取2個信號量,而當前信號量個數為0)。當線程3和線程4執行完release方法后,主線程才返回。
從本例子可以看出,Semaphore在某種程度上實現了CyclicBarrier的復用功能。
類關系概述
由該類圖可知,Semaphore還是使用AQS實現的。Sync只是對AQS的一個修飾,并且Sync有兩個實現類,用來指定獲取信號量時是否采用公平策略。
例如,下面的代碼在創建Semaphore時會使用一個變量指定是否使用公平策略。
public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}Sync(int permits) {setState(permits);}Semaphore默認采用非公平策略,如果需要使用公平策略則可以使用帶兩個參數的構造函數來構造Semaphore對象。
另外,如CountDownLatch構造函數傳遞的初始化信號量個數permits被賦給了AQS的state狀態變量一樣,這里AQS的state值也表示當前持有的信號量個數。
核心方法源碼解讀
void acquire()
public void acquire() throws InterruptedException {// 傳遞參數為1 ,說明要獲取一個信號量資源sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 1 . 如果線程被中斷,拋出被中斷異常 if (Thread.interrupted())throw new InterruptedException();// 2 否則調用Syn子類方法嘗試重新獲取 if (tryAcquireShared(arg) < 0)// 如果獲取失敗,則放入阻塞隊列,然后再次嘗試,如果失敗則調用park方法掛起當前線程doAcquireSharedInterruptibly(arg);}acquire()在內部調用了Sync的acquireSharedInterruptibly方法,后者會對中斷進行響應(如果當前線程被中斷,則拋出中斷異常)。
嘗試獲取信號量資源的AQS的方法tryAcquireShared是由Sync的子類實現的,所以這里分別從兩方面來討論。
非公平策略NonfairSync類的tryAcquireShared方法
繼續看下 nonfairTryAcquireShared
-
先獲取當前信號量值(available),然后減去需要獲取的值(acquires),得到剩余的信號量個數(remaining)
-
如果剩余值小于0則說明當前信號量個數滿足不了需求,那么直接返回負數,這時當前線程會被放入AQS的阻塞隊列而被掛起。
-
如果剩余值大于0,則使用CAS操作設置當前信號量值為剩余值,然后返回剩余值。
另外,由于NonFairSync是非公平獲取的,也就是說先調用aquire方法獲取信號量的線程不一定比后來者先獲取到信號量。
舉個例子:
如果采用非公平策略,由nonfairTryAcquireShared的代碼可知,線程C完全可以在線程A被激活前,或者激活后先于線程A獲取到該信號量,也就是在這種模式下阻塞線程和當前請求的線程是競爭關系,而不遵循先來先得的策略。
公平策略FairSync類的tryAcquireShared方法
/*** Fair version*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}可見公平性還是靠hasQueuedPredecessors這個函數來保證的。前幾篇博文里重點介紹了hasQueuedPredecessors。 公平策略是看當前線程節點的前驅節點是否也在等待獲取該資源,如果是則自己放棄獲取的權限,然后當前線程會被放入AQS阻塞隊列,否則就去獲取。
void acquire(int permits)
該方法與acquire()方法不同,后者只需要獲取一個信號量值,而前者則獲取permits個。
public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}void acquireUninterruptibly()
該方法與acquire()類似,不同之處在于該方法對中斷不響應,也就是當當前線程調用了acquireUninterruptibly獲取資源時(包含被阻塞后),其他線程調用了當前線程的interrupt()方法設置了當前線程的中斷標志,此時當前線程并不會拋出InterruptedException異常而返回。
public void acquireUninterruptibly() {sync.acquireShared(1);}看看響應中斷的
void acquireUninterruptibly(int permits)
該方法與acquire(int permits)方法的不同之處在于,該方法對中斷不響應。
public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}void release()
該方法的作用是把當前Semaphore對象的信號量值增加1,如果當前有線程因為調用aquire方法被阻塞而被放入了AQS的阻塞隊列,則會根據公平策略選擇一個信號量個數能被滿足的線程進行激活,激活的線程會嘗試獲取剛增加的信號量。
public void release() {// 默認釋放1個信號量 sync.releaseShared(1);} /*** Releases in shared mode. Implemented by unblocking one or more* threads if {@link #tryReleaseShared} returns true.** @param arg the release argument. This value is conveyed to* {@link #tryReleaseShared} but is otherwise uninterpreted* and can represent anything you like.* @return the value returned from {@link #tryReleaseShared}*/public final boolean releaseShared(int arg) {// 2嘗試釋放資源if (tryReleaseShared(arg)) {// 3 資源釋放成功,則調用park方法喚醒AQS 隊列里最先掛起的線程 doReleaseShared();return true;}return false;} protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next)) // cas return true;}由代碼release()->sync.releaseShared(1)可知,release方法每次只會對信號量值增加1,tryReleaseShared方法是無限循環,使用CAS保證了release方法對信號量遞增1的原子性操作。tryReleaseShared方法增加信號量值成功后會執行代碼(3)doReleaseShared();,即調用AQS的方法來激活因為調用aquire方法而被阻塞的線程。
private void doReleaseShared() {/** Ensure that a release propagates, even if there are other* in-progress acquires/releases. This proceeds in the usual* way of trying to unparkSuccessor of head if it needs* signal. But if it does not, status is set to PROPAGATE to* ensure that upon release, propagation continues.* Additionally, we must loop in case a new node is added* while we are doing this. Also, unlike other uses of* unparkSuccessor, we need to know if CAS to reset status* fails, if so rechecking.*/for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue; // loop to recheck casesunparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}void release(int permits)
該方法與不帶參數的release方法的不同之處在于,前者每次調用會在信號量值原來的基礎上增加permits,而后者每次增加1。
public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}另外可以看到,這里的sync.releaseShared是共享方法,這說明該信號量是線程共享的,信號量沒有和固定線程綁定,多個線程可以同時使用CAS去更新信號量的值而不會被阻塞。
小結
Semaphore也是使用AQS實現的,并且獲取信號量時有公平策略和非公平策略之分。
總結
以上是生活随笔為你收集整理的Java Review - 并发编程_ 信号量Semaphore原理源码剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_
- 下一篇: Java Review - Java进程