j.u.c系列(08)---之并发工具类:CountDownLatch
寫在前面
CountDownLatch所描述的是”在完成一組正在其他線程中執(zhí)行的操作之前,它允許一個或多個線程一直等待“:用給定的計數(shù) 初始化 CountDownLatch。由于調(diào)用了 countDown() 方法,所以在當前計數(shù)到達零之前,await 方法會一直受阻塞。之后,會釋放所有等待的線程,await 的所有后續(xù)調(diào)用都將立即返回。CountDownLatch的本質(zhì)也是一個"共享鎖"
\
CountDownLatch(int count) 構(gòu)造一個用給定計數(shù)初始化的 CountDownLatch。// 使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷。 void await() // 使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷或超出了指定的等待時間。 boolean await(long timeout, TimeUnit unit) // 遞減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。 void countDown() // 返回當前計數(shù)。 long getCount() // 返回標識此鎖存器及其狀態(tài)的字符串。 String toString()CountDownLatch是通過一個計數(shù)器來實現(xiàn)的,當我們在new 一個CountDownLatch對象的時候需要帶入該計數(shù)器值,該值就表示了線程的數(shù)量。每當一個線程完成自己的任務后,計數(shù)器的值就會減1。當計數(shù)器的值變?yōu)?時,就表示所有的線程均已經(jīng)完成了任務,然后就可以恢復等待的線程繼續(xù)執(zhí)行了。
雖然,CountDownlatch與CyclicBarrier(后續(xù)會接受。另外一并發(fā)工具類)區(qū)別:
?
實現(xiàn)分析
通過上面的結(jié)構(gòu)圖我們可以看到,CountDownLatch內(nèi)部依賴Sync實現(xiàn),而Sync繼承AQS。CountDownLatch僅提供了一個構(gòu)造方法:
CountDownLatch(int count) : 構(gòu)造一個用給定計數(shù)初始化的 CountDownLatch
?
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}sync為CountDownLatch的一個內(nèi)部類,其定義如下:
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {setState(count);}//獲取同步狀態(tài)int getCount() {return getState();}//獲取同步狀態(tài)protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}//釋放同步狀態(tài)protected boolean tryReleaseShared(int releases) {for (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}}?
? 通過這個內(nèi)部類Sync我們可以清楚地看到CountDownLatch是采用共享鎖來實現(xiàn)的。
CountDownLatch提供await()方法來使當前線程在鎖存器倒計數(shù)至零之前一直等待,除非線程被中斷,定義如下:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}?
? await其內(nèi)部使用AQS的acquireSharedInterruptibly(int arg):
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}?
? 在內(nèi)部類Sync中重寫了tryAcquireShared(int arg)方法:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}getState()獲取同步狀態(tài),其值等于計數(shù)器的值,從這里我們可以看到如果計數(shù)器值不等于0,則會調(diào)用doAcquireSharedInterruptibly(int arg),該方法為一個自旋方法會嘗試一直去獲取同步狀態(tài):
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {/*** 對于CountDownLatch而言,如果計數(shù)器值不等于0,那么r 會一直小于0*/int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}?
? CountDownLatch提供countDown() 方法遞減鎖存器的計數(shù),如果計數(shù)到達零,則釋放所有等待的線程。
public void countDown() {sync.releaseShared(1);}?
?內(nèi)部調(diào)用AQS的releaseShared(int arg)方法來釋放共享鎖同步狀態(tài):
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}?
? tryReleaseShared(int arg)方法被CountDownLatch的內(nèi)部類Sync重寫:
protected boolean tryReleaseShared(int releases) {for (;;) {//獲取鎖狀態(tài)int c = getState();//c == 0 直接返回,釋放鎖成功if (c == 0)return false;//計算新“鎖計數(shù)器”int nextc = c-1;//更新鎖狀態(tài)(計數(shù)器)if (compareAndSetState(c, nextc))return nextc == 0;}}?
?
總結(jié)
CountDownLatch內(nèi)部通過共享鎖實現(xiàn)。在創(chuàng)建CountDownLatch實例時,需要傳遞一個int型的參數(shù):count,該參數(shù)為計數(shù)器的初始值,也可以理解為該共享鎖可以獲取的總次數(shù)。當某個線程調(diào)用await()方法,程序首先判斷count的值是否為0,如果不會0的話則會一直等待直到為0為止。當其他線程調(diào)用countDown()方法時,則執(zhí)行釋放共享鎖狀態(tài),使count值 - 1。當在創(chuàng)建CountDownLatch時初始化的count參數(shù),必須要有count線程調(diào)用countDown方法才會使計數(shù)器count等于0,鎖才會釋放,前面等待的線程才會繼續(xù)運行。注意CountDownLatch不能回滾重置。
應用示例
示例仍然使用開會案例。老板進入會議室等待5個人全部到達會議室才會開會。所以這里有兩個線程老板等待開會線程、員工到達會議室:
public class CountDownLatchTest {private volatile static CountDownLatch countDownLatch = new CountDownLatch(5);/*** Boss線程,等待員工到達開會*/static class BossThread extends Thread{BossThread(String name){super(name);}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":Boss在會議室等待,總共有" + countDownLatch.getCount() + "個人開會...");try {//Boss等待 countDownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + ":所有人都已經(jīng)到齊了,開會吧...");}}//員工到達會議室static class EmpleoyeeThread extends Thread{@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ",到達會議室....");//員工到達會議室 count - 1 countDownLatch.countDown();}}public static void main(String[] args) throws InterruptedException{//Boss線程啟動new BossThread("張總").start();new BossThread("李總").start();new BossThread("王總").start();Thread.sleep(1000);for(int i = 0 ; i < 5 ; i++){new EmpleoyeeThread().start();}} } 張總:Boss在會議室等待,總共有5個人開會... 李總:Boss在會議室等待,總共有5個人開會... 王總:Boss在會議室等待,總共有5個人開會... Thread-0,到達會議室.... Thread-1,到達會議室.... Thread-2,到達會議室.... Thread-3,到達會議室.... Thread-4,到達會議室.... 張總:所有人都已經(jīng)到齊了,開會吧... 王總:所有人都已經(jīng)到齊了,開會吧... 李總:所有人都已經(jīng)到齊了,開會吧...?
轉(zhuǎn)載于:https://www.cnblogs.com/chihirotan/p/8526692.html
總結(jié)
以上是生活随笔為你收集整理的j.u.c系列(08)---之并发工具类:CountDownLatch的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: php怎么设置浏览器禁止打开新窗口,JS
- 下一篇: web存储机制localStorage和