AQS(CountdownLatch、CyclicBarrier、Semaphore)、FutureTask、BlockingQueue、ForkJoin
1. J.U.C - AQS (AbstractQueuedSynchronizer)
java.util.concurrent(J.U.C) 大大提高了并發(fā)性能,AQS 被認(rèn)為是 J.U.C 的核心。
1.1?CountdownLatch
用來控制一個(gè)線程等待多個(gè)線程。
維護(hù)了一個(gè)計(jì)數(shù)器 cnt,每次調(diào)用 countDown() 方法會讓計(jì)數(shù)器的值減 1,減到 0的時(shí)候,那些因為調(diào)用 await() 方法而在等待的線程就會被喚醒。
public class CountdownLatchExample {public static void main(String[] args) throws InterruptedException {final int totalThread = 10;CountDownLatch countDownLatch = new CountDownLatch(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("run..");countDownLatch.countDown();});} countDownLatch.await();System.out.println("end");executorService.shutdown();} }運(yùn)行結(jié)果:
1.2 CyclicBarrier
用來控制多個(gè)線程互相等待,只有當(dāng)多個(gè)線程都到達(dá)時(shí),這些線程才會繼續(xù)執(zhí)行。
和 CountdownLatch 相似,都是通過維護(hù)計(jì)數(shù)器來實(shí)現(xiàn)的。
- 線程執(zhí)行 await() 方法之后計(jì)數(shù)器會減 1,并進(jìn)行等待,直到計(jì)數(shù)器為 0,所有調(diào)用 awati() 方法而在等待的線程才能繼續(xù)執(zhí)行。
CyclicBarrier 和 CountdownLatch 的一個(gè)區(qū)別是:
- CyclicBarrier 的計(jì)數(shù)器通過調(diào)用reset() 方法可以循環(huán)使用,所以它才叫做循環(huán)屏障。
CyclicBarrier 有兩個(gè)構(gòu)造函數(shù),其中 parties 指示計(jì)數(shù)器的初始值,barrierAction在所有線程都到達(dá)屏障的時(shí)候會執(zhí)行一次。
public CyclicBarrier(int parties, Runnable barrierAction) {if (parties <= 0)throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) {this(parties, null); } public class CyclicBarrierExample {public static void main(String[] args) {final int totalThread = 10;CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalThread; i++) {executorService.execute(() -> {System.out.print("before..");try {cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();} System.out.print("after..");});} executorService.shutdown();} }運(yùn)行結(jié)果:
1.3?Semaphore
Semaphore 就是操作系統(tǒng)中的信號量,可以控制對互斥資源的訪問線程數(shù)。
以下代碼模擬了對某個(gè)服務(wù)的并發(fā)請求,每次只能有 3 個(gè)客戶端同時(shí)訪問,請求總數(shù)為 10。
public class SemaphoreExample {public static void main(String[] args) {final int clientCount = 3;final int totalRequestCount = 10;Semaphore semaphore = new Semaphore(clientCount);ExecutorService executorService = Executors.newCachedThreadPool();for (int i = 0; i < totalRequestCount; i++) {executorService.execute(()->{try {semaphore.acquire();System.out.print(semaphore.availablePermits() + " ");} catch (InterruptedException e) {e.printStackTrace();} finally {semaphore.release();}});} executorService.shutdown();} }運(yùn)行結(jié)果:
2. J.U.C - 其它組件
2.1?FutureTask
在介紹 Callable 時(shí)我們知道它可以有返回值,返回值通過 Future 進(jìn)行封裝。
FutureTask 實(shí)現(xiàn)了 RunnableFuture 接口,該接口繼承自 Runnable 和 Future 接口,這使得 FutureTask 既可以當(dāng)做一個(gè)任務(wù)執(zhí)行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>public interface RunnableFuture<V> extends Runnable, Future<V>FutureTask 可用于異步獲取執(zhí)行結(jié)果或取消執(zhí)行任務(wù)的場景。
當(dāng)一個(gè)計(jì)算任務(wù)需要執(zhí)行很長時(shí)間,那么就可以用 FutureTask 來封裝這個(gè)任務(wù),主線程在完成自己的任務(wù)之后再去獲取結(jié)果。
public class FutureTaskExample {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {int result = 0;for (int i = 0; i < 100; i++) {Thread.sleep(10);result += i;} return result;}});Thread computeThread = new Thread(futureTask);computeThread.start();Thread otherThread = new Thread(() -> {System.out.println("other task is running...");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}});otherThread.start();System.out.println(futureTask.get());} }運(yùn)行結(jié)果:
2.2?BlockingQueue
java.util.concurrent.BlockingQueue 接口有以下阻塞隊(duì)列的實(shí)現(xiàn):
- FIFO 隊(duì)列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
- 優(yōu)先級隊(duì)列 :PriorityBlockingQueue提供了阻塞的 take() 和 put() 方法:
- 如果隊(duì)列為空 take() 將阻塞,直到隊(duì)列中有內(nèi)容;
- 如果隊(duì)列為滿 put() 將阻塞,直到隊(duì)列有空閑位置。
使用 BlockingQueue 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題:
public class ProducerConsumer {private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);private static class Producer extends Thread {@Overridepublic void run() {try {queue.put("product");} catch (InterruptedException e) {e.printStackTrace();} System.out.print("produce..");}}private static class Consumer extends Thread {@Overridepublic void run() {try {String product = queue.take();} catch (InterruptedException e) {e.printStackTrace();} System.out.print("consume..");}} }public static void main(String[] args) {for (int i = 0; i < 2; i++) {Producer producer = new Producer();producer.start();} for (int i = 0; i < 5; i++) {Consumer consumer = new Consumer();consumer.start();} for (int i = 0; i < 3; i++) {Producer producer = new Producer();producer.start();} }運(yùn)行結(jié)果:
2.3?ForkJoin
主要用于并行計(jì)算中,和 MapReduce 原理類似,都是把大的計(jì)算任務(wù)拆分成多個(gè)小任務(wù)并行計(jì)算。
public class ForkJoinExample extends RecursiveTask<Integer> {private final int threshold = 5;private int first;private int last;public ForkJoinExample(int first, int last) {this.first = first;this.last = last;}@Overrideprotected Integer compute() {int result = 0;if (last - first <= threshold) {// 任務(wù)足夠小則直接計(jì)算for (int i = first; i <= last; i++) {result += i;}} else {// 拆分成小任務(wù)int middle = first + (last - first) / 2;ForkJoinExample leftTask = new ForkJoinExample(first, middle);ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);leftTask.fork();rightTask.fork();result = leftTask.join() + rightTask.join();}return result;} }public static void main(String[] args) throws ExecutionException, InterruptedException {ForkJoinExample example = new ForkJoinExample(1, 10000);ForkJoinPool forkJoinPool = new ForkJoinPool();Future result = forkJoinPool.submit(example);System.out.println(result.get()); }ForkJoin 使用 ForkJoinPool 來啟動,它是一個(gè)特殊的線程池,線程數(shù)量取決于CPU 核數(shù)。
public class ForkJoinPool extends AbstractExecutorServiceForkJoinPool 實(shí)現(xiàn)了工作竊取算法來提高 CPU 的利用率。
- 每個(gè)線程都維護(hù)了一個(gè)雙端隊(duì)列,用來存儲需要執(zhí)行的任務(wù)。
- 工作竊取算法允許空閑的線程從其它線程的雙端隊(duì)列中竊取一個(gè)任務(wù)來執(zhí)行。
- 竊取的任務(wù)必須是最晚的任務(wù),避免和隊(duì)列所屬線程發(fā)生競爭。但是如果隊(duì)列中只有一個(gè)任務(wù)時(shí)還是會發(fā)生競爭。
?
?
總結(jié)
以上是生活随笔為你收集整理的AQS(CountdownLatch、CyclicBarrier、Semaphore)、FutureTask、BlockingQueue、ForkJoin的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互斥同步(synchronized、Lo
- 下一篇: 线程安全的实现方法