Java同步组件之CountDownLatch,Semaphore
Java同步組件概況
- CountDownLatch : 是閉鎖,通過一個計數(shù)來保證線程是否一直阻塞
- Semaphore: 控制同一時間,并發(fā)線程數(shù)量
- CyclicBarrier:字面意思是回環(huán)柵欄,通過它可以實現(xiàn)讓一組線程等待至某個狀態(tài)之后再全部同時執(zhí)行。
- ReentrantLock:是一個重入鎖,一個線程獲得了鎖之后仍然可以反復(fù)加鎖,不會出現(xiàn)自己阻塞自己的情況。
- Condition:配合
ReentrantLock,實現(xiàn)等待/通知模型 - FutureTask:FutureTask實現(xiàn)了接口Future,同F(xiàn)uture一樣,代表異步計算的結(jié)果。
CountDownLatch 同步輔助類
CountDownLatch類位于java.util.concurrent包,利用它可以實現(xiàn)類似計數(shù)器的功能,比如有一個任務(wù)A,它要等待其它4個任務(wù)執(zhí)行完畢后才能執(zhí)行,此時就可以使用CountDownLatch來實現(xiàn)這種功能。
假設(shè)計數(shù)器的值是3,線程A調(diào)用
await()方法后,當前線程就進入了等待狀態(tài),之后其它線程中執(zhí)行CountDownLatch,計數(shù)器就會減1,當計數(shù)器從3變成0,線程A繼續(xù)執(zhí)行,CountDownLatch這個類可以阻塞當前線程,保證線程在某種條件下,繼續(xù)執(zhí)行。
構(gòu)造器中的計數(shù)值(count)實際上就是閉鎖需要等待的線程數(shù)量,這個值只能被設(shè)置一次,而且
CountDownLatch沒有提供任何機會修改這個計數(shù)值。
CountDownLatch代碼案例
package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();CountDownLatch countDownLatch=new CountDownLatch(2);executorService.execute(()->{try{Thread.sleep(3000);System.out.println("任務(wù)一完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});executorService.execute(()->{try{Thread.sleep(5000);System.out.println("任務(wù)二完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});countDownLatch.await();//所有子任務(wù)執(zhí)行完后才會執(zhí)行System.out.println("主線程開始工作.....");executorService.shutdown();}
}任務(wù)一完成
任務(wù)二完成
主線程開始工作.....
CountDownlatch指定時間完成任務(wù),如果在規(guī)定時間內(nèi)完成,則等待之前的等待線程(countDownLatch.await())繼續(xù)執(zhí)行
countDownLatch.await(int timeout,TimeUnit timeUnit);設(shè)置,第一個參數(shù)沒超時時間,第二個參數(shù)為時間單位。
package com.rumenz.task;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class CountDownLatchTest {public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();CountDownLatch countDownLatch=new CountDownLatch(2);executorService.execute(()->{try{Thread.sleep(3000);System.out.println("任務(wù)一完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});executorService.execute(()->{try{Thread.sleep(5000);System.out.println("任務(wù)二完成");}catch (Exception e){e.printStackTrace();}countDownLatch.countDown();});//這里只等3秒countDownLatch.await(3, TimeUnit.SECONDS);//所有子任務(wù)執(zhí)行完后才會執(zhí)行System.out.println("主線程開始工作.....");executorService.shutdown();}
}
//任務(wù)一完成
//主線程開始工作.....
//任務(wù)二完成
Semaphore控制線程數(shù)量
Semaphore經(jīng)常用于限制獲取某種資源的線程數(shù)量,其內(nèi)部是基于AQS的共享模式,AQS的狀態(tài)可以表示許可證的數(shù)量,許可證數(shù)量不夠線程被掛起;而一旦有一個線程釋放資源,那么可喚醒等待隊列中的線程繼續(xù)執(zhí)行。
Semaphore翻譯過來就是信號量,Semaphore可以阻塞進程并控制同時訪問的線程數(shù),通過acquire()獲取一個許可,如果沒有就等待,而release()釋放一個許可,Semaphore有點類似鎖。
CountDownLatch和Semaphore在使用時,通過和線程池配合使用。
Semaphore適合控制并發(fā),CountDownLatch比較適合保證線程執(zhí)行完后再執(zhí)行其它處理,因此模擬并發(fā)兩者結(jié)合最好。
Semaohore應(yīng)用場景
Semaphore適合做流量控制,特別是共享的有限資源,比如數(shù)據(jù)庫連接,假如有一個需求,要讀取幾萬個文件的數(shù)據(jù),因為都是IO密集型任務(wù),我們可以啟動幾十個線程并發(fā)的讀取,但是如果讀到內(nèi)存后,還需要存儲到數(shù)據(jù)庫中,而數(shù)據(jù)庫的連接數(shù)只有10個,這時我們必須控制只有十個線程同時獲取數(shù)據(jù)庫連接保存數(shù)據(jù),否則會報錯無法獲取數(shù)據(jù)庫連接。這個時候,我們就可以使用Semaphore來做流控。
package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreExample1 {private static Integer clientTotal=30;private static Integer threadTotal=3;public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Semaphore semaphore=new Semaphore(threadTotal);for (int i = 0; i < clientTotal; i++) {final Integer j=i;executorService.execute(()->{try{semaphore.acquire(); // 獲取一個許可update(j);semaphore.release(); // 釋放一個許可}catch (Exception e) {e.printStackTrace();}});}executorService.shutdown();}private static void update(Integer j) throws Exception {System.out.println(j);Thread.sleep(2000);}
}
每2秒打印3個數(shù)字。
package com.rumenz.task;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;public class SemaphoreExample1 {private static Integer clientTotal=30;private static Integer threadTotal=3;public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();Semaphore semaphore=new Semaphore(threadTotal);for (int i = 0; i < clientTotal; i++) {final Integer j=i;executorService.execute(()->{try{semaphore.acquire(3); // 獲取多個許可update(j);semaphore.release(3); // 釋放多個許可}catch (Exception e) {e.printStackTrace();}});}executorService.shutdown();}private static void update(Integer j) throws Exception {System.out.println(j);Thread.sleep(2000);}
}
每2秒打印一個數(shù)字。
tryAcquire
嘗試獲取許可,如果獲取不成功,則放棄操作,tryAcquire方法提供幾個重載
- tryAcquire() : boolean
- tryAcquire(int permits) : boolean 嘗試獲取指定數(shù)量的許可
- tryAcquire(int permits,long timeout,TimeUnit timeUnit) : boolean
- tryAcquire(long timeout,TimeUnit timeUnit) : boolean 嘗試獲取許可的時候可以等待一段時間,在指定時間內(nèi)未獲取到許可則放棄
Semaphore源碼分析
Semaphore有兩種模式,公平模式和非公平模式。公平模式就是調(diào)用acquire的順序就是獲取許可證的順序,遵循FIFO;而非公平模式是搶占式的,也就是有可能一個新的獲取線程恰好在一個許可證釋放時得到了這個許可證,而前面還有等待的線程。
// 非公平模式
public Semaphore(int permits) {sync = new NonfairSync(permits);
}
// fair=true為公平模式,false=非公平模式
public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
public class Semaphore implements java.io.Serializable {/** 只指定許可量,構(gòu)造不公平模式*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/** 指定許可量,并指定模式*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}//Semaphore內(nèi)部基于AQS的共享模式,所以實現(xiàn)都委托給了Sync類。 abstract static class Sync extends AbstractQueuedSynchronizer {}/*** NonFair version*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {// 可以看到調(diào)用了setState方法,也就是說AQS中的資源就是許可證的數(shù)量。super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** Fair version*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {// 可以看到調(diào)用了setState方法,也就是說AQS中的資源就是許可證的數(shù)量。super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}}
關(guān)注微信公眾號:【入門小站】,解鎖更多知識點
總結(jié)
以上是生活随笔為你收集整理的Java同步组件之CountDownLatch,Semaphore的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 什么是java常量
- 下一篇: 转:mbedtls学习2.mbedtls