JUC锁-CountDownLatch(六)
CountDownLatch介紹
CountDownLatch是一個(gè)同步輔助類(lèi),在完成一組正在其他線(xiàn)程中執(zhí)行的操作之前,它允許一個(gè)或多個(gè)線(xiàn)程一直等待。
uml類(lèi)圖
CountDownLatch的數(shù)據(jù)結(jié)構(gòu)很簡(jiǎn)單,它是通過(guò)”共享鎖”實(shí)現(xiàn)的。它包含了sync對(duì)象,sync是Sync類(lèi)型。Sync是實(shí)例類(lèi),它繼承于AQS。
CountDownLatch源碼分析
1. CountDownLatch(int count)
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count); }該函數(shù)是創(chuàng)建一個(gè)Sync對(duì)象,而Sync是繼承于AQS類(lèi)。Sync構(gòu)造函數(shù)如下:
Sync(int count) {setState(count); }setState()在AQS中實(shí)現(xiàn),源碼如下:
protected final void setState(long newState) {state = newState; }說(shuō)明:在AQS中,state是一個(gè)private volatile long類(lèi)型的對(duì)象。對(duì)于CountDownLatch而言,state表示的”鎖計(jì)數(shù)器“。CountDownLatch中的getCount()最終是調(diào)用AQS中的getState(),返回的state對(duì)象,即”鎖計(jì)數(shù)器“。
2.await()
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1); }說(shuō)明:該函數(shù)實(shí)際上是調(diào)用的AQS的acquireSharedInterruptibly(1);
AQS中的acquireSharedInterruptibly()的源碼如下:
public final void acquireSharedInterruptibly(long arg)throws InterruptedException {//如果當(dāng)前線(xiàn)程是中斷狀態(tài),則拋出異常InterruptedExceptionif (Thread.interrupted())throw new InterruptedException();//調(diào)用tryAcquireShared(arg)嘗試獲取共享鎖,嘗試成功則返回//否則就調(diào)用doAcquireSharedInterruptibly()if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }tryAcquireShared()在CountDownLatch.java中被重寫(xiě),它的源碼如下:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1; }如果”鎖計(jì)數(shù)器=0”,即鎖是可獲取狀態(tài),則返回1;否則,鎖是不可獲取狀態(tài),則返回-1。
doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(long arg)throws InterruptedException {// 創(chuàng)建"當(dāng)前線(xiàn)程"的Node節(jié)點(diǎn),且Node中記錄的鎖是"共享鎖"類(lèi)型;并將該節(jié)點(diǎn)添加到CLH隊(duì)列末尾。final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 獲取上一個(gè)節(jié)點(diǎn)。// 如果上一節(jié)點(diǎn)是CLH隊(duì)列的表頭,則"嘗試獲取共享鎖"。final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// (上一節(jié)點(diǎn)不是CLH隊(duì)列的表頭) 當(dāng)前線(xiàn)程一直等待,直到獲取到共享鎖。// 如果線(xiàn)程在等待過(guò)程中被中斷過(guò),則再次中斷該線(xiàn)程(還原之前的中斷狀態(tài))。if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }doAcquireSharedInterruptibly()會(huì)使當(dāng)前線(xiàn)程一直等待,直到當(dāng)前線(xiàn)程獲取到共享鎖(或被中斷)才返回。跟共享鎖ReadLock的doAcquireShare()方法非常類(lèi)似,這里就不進(jìn)行贅述了。
3. countDown()
public void countDown() {sync.releaseShared(1); }說(shuō)明:該函數(shù)實(shí)際上調(diào)用releaseShared(1)釋放共享鎖。
releaseShared()在AQS中實(shí)現(xiàn),源碼如下:
說(shuō)明:releaseShared()的目的是讓當(dāng)前線(xiàn)程釋放它所持有的共享鎖。
它首先會(huì)通過(guò)tryReleaseShared()去嘗試釋放共享鎖。嘗試成功,則直接返回;嘗試失敗,則通過(guò)doReleaseShared()去釋放共享鎖。
tryReleaseShared()在CountDownLatch.java中被重寫(xiě),源碼如下:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {// 獲取“鎖計(jì)數(shù)器”的狀態(tài)int c = getState();if (c == 0)return false;// “鎖計(jì)數(shù)器”-1int nextc = c-1;// 通過(guò)CAS函數(shù)進(jìn)行賦值。if (compareAndSetState(c, nextc))return nextc == 0;} }說(shuō)明:tryReleaseShared()的作用是釋放共享鎖,將“鎖計(jì)數(shù)器”的值-1。
總結(jié):CountDownLatch是通過(guò)“共享鎖”實(shí)現(xiàn)的。在創(chuàng)建CountDownLatch中時(shí),會(huì)傳遞一個(gè)int類(lèi)型參數(shù)count,該參數(shù)是“鎖計(jì)數(shù)器”的初始狀態(tài),表示該“共享鎖”最多能被count給線(xiàn)程同時(shí)獲取。當(dāng)某線(xiàn)程調(diào)用該CountDownLatch對(duì)象的await()方法時(shí),該線(xiàn)程會(huì)等待“共享鎖”可用時(shí),才能獲取“共享鎖”進(jìn)而繼續(xù)運(yùn)行。而“共享鎖”可用的條件,就是“鎖計(jì)數(shù)器”的值為0!而“鎖計(jì)數(shù)器”的初始值為count,每當(dāng)一個(gè)線(xiàn)程調(diào)用該CountDownLatch對(duì)象的countDown()方法時(shí),才將“鎖計(jì)數(shù)器”-1;通過(guò)這種方式,必須有count個(gè)線(xiàn)程調(diào)用countDown()之后,“鎖計(jì)數(shù)器”才為0,而前面提到的等待線(xiàn)程才能繼續(xù)運(yùn)行!
CountDownLatch的使用示例
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier;public class CountDownLatchTest1 {private static int LATCH_SIZE = 5;private static CountDownLatch doneSignal;public static void main(String[] args) {try {doneSignal = new CountDownLatch(LATCH_SIZE);// 新建5個(gè)任務(wù)for(int i=0; i<LATCH_SIZE; i++)new InnerThread().start();System.out.println("main await begin.");// "主線(xiàn)程"等待線(xiàn)程池中5個(gè)任務(wù)的完成doneSignal.await();System.out.println("main await finished.");} catch (InterruptedException e) {e.printStackTrace();}}static class InnerThread extends Thread{public void run() {try {Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + " sleep 1000ms.");// 將CountDownLatch的數(shù)值減1doneSignal.countDown();} catch (InterruptedException e) {e.printStackTrace();}}} }運(yùn)行結(jié)果:
main await begin. Thread-0 sleep 1000ms. Thread-2 sleep 1000ms. Thread-1 sleep 1000ms. Thread-4 sleep 1000ms. Thread-3 sleep 1000ms. main await finished.總結(jié)
以上是生活随笔為你收集整理的JUC锁-CountDownLatch(六)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: JUC锁-ReentrantReadWr
- 下一篇: JUC锁-Semaphore(八)