Java并发编程—线程同步类
原文作者:洲洋1984
原文地址:Java 并發包中的高級同步工具
Java 中的并發包指的是 java.util.concurrent(簡稱 JUC)包和其子包下的類和接口,它為 Java 的并發提供了各種功能支持,比如:
- 提供了線程池的創建類 ThreadPoolExecutor、Executors 等;
- 提供了各種鎖,如 Lock、ReentrantLock 等;
- 提供了各種線程安全的數據結構,如 ConcurrentHashMap、LinkedBlockingQueue、DelayQueue 等;
- 提供了更加高級的線程同步結構,如 CountDownLatch、CyclicBarrier、Semaphore 、Phaser等。
在前面的章節中我們已經詳細地介紹了線程池的使用、線程安全的數據結構等,本文我們就重點學習一下 Java 并發包中更高級的線程同步類:CountDownLatch、CyclicBarrier、Semaphore 和 Phaser 等。
一、CountDownLatch 介紹和使用
CountDownLatch(閉鎖)可以看作一個只能做減法的計數器,可以讓一個或多個線程等待排隊執行。CountDownLatch 有兩個重要的方法:
- countDown():使計數器減 1;
- await():當計數器不為 0 時,則調用該方法的線程阻塞,當計數器為 0 時,可以喚醒等待的一個或者全部線程。
CountDownLatch 使用場景:以生活中的情景為例,比如去醫院體檢,通常人們會提前去醫院排隊,但只有等到醫生開始上班,才能正式開始體檢,醫生也要給所有人體檢完才能下班,這種情況就要使用 CountDownLatch,流程為:患者排隊 → 醫生上班 → 體檢完成 → 醫生下班。
CountDownLatch 示例代碼如下:
// 醫院閉鎖 CountDownLatch hospitalLatch = new CountDownLatch(1); // 患者閉鎖 CountDownLatch patientLatch = new CountDownLatch(5); System.out.println("患者排隊"); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 5; i++) {final int j = i;executorService.execute(() -> {try {hospitalLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("體檢:" + j);patientLatch.countDown();}); } System.out.println("醫生上班"); hospitalLatch.countDown(); patientLatch.await(); System.out.println("醫生下班"); executorService.shutdown();以上程序執行結果如下:
患者排隊
醫生上班
體檢:4
體檢:0
體檢:1
體檢:3
體檢:2
醫生下班
二、CyclicBarrier 介紹和使用
CyclicBarrier(循環屏障),通過它可以實現讓一組線程等待滿足某個條件后同時執行。CyclicBarrier 經典使用場景是公交發車,為了簡化理解我們這里定義,每輛公交車只要上滿 4 個人就發車,后面來的人都會排隊依次遵循相應的標準。
它的構造方法為?CyclicBarrier(int parties,Runnable barrierAction)?其中,parties 表示有幾個線程來參與等待,barrierAction 表示滿足條件之后觸發的方法。CyclicBarrier 使用 await() 方法來標識當前線程已到達屏障點,然后被阻塞。
CyclicBarrier 示例代碼如下:
import java.util.concurrent.*; public class CyclicBarrierTest {public static void main(String[] args) throws InterruptedException {CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {@Overridepublic void run() {System.out.println("發車了");}});for (int i = 0; i < 4; i++) {new Thread(new CyclicWorker(cyclicBarrier)).start();}}static class CyclicWorker implements Runnable {private CyclicBarrier cyclicBarrier;CyclicWorker(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {for (int i = 0; i < 2; i++) {System.out.println("乘客:" + i);try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}} }以上程序執行結果如下:
乘客:0
乘客:0
乘客:0
乘客:0
發車了
乘客:1
乘客:1
乘客:1
乘客:1
發車了
三、Semaphore 介紹和使用
Semaphore(信號量)用于管理多線程中控制資源的訪問與使用。Semaphore 就好比停車場的門衛,可以控制車位的使用資源。比如來了 5 輛車,只有 2 個車位,門衛可以先放兩輛車進去,等有車出來之后,再讓后面的車進入。
Semaphore 示例代碼如下:
Semaphore semaphore = new Semaphore(2); ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 5; i++) {semaphoreThread.execute(() -> {try {// 堵塞獲取許可semaphore.acquire();System.out.println("Thread:" + Thread.currentThread().getName() + " 時間:" + LocalDateTime.now());TimeUnit.SECONDS.sleep(2);// 釋放許可semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}}); }以上程序執行結果如下:
Thread:pool-1-thread-1 時間:2019-07-10 21:18:42
Thread:pool-1-thread-2 時間:2019-07-10 21:18:42
Thread:pool-1-thread-3 時間:2019-07-10 21:18:44
Thread:pool-1-thread-4 時間:2019-07-10 21:18:44
Thread:pool-1-thread-5 時間:2019-07-10 21:18:46
四、Phaser(移相器)
是 JDK 7 提供的,它的功能是等待所有線程到達之后,才繼續或者開始進行新的一組任務。比如有一個旅行團,我們規定所有成員必須都到達指定地點之后,才能發車去往景點一,到達景點之后可以各自游玩,之后必須全部到達指定地點之后,才能繼續發車去往下一個景點,類似這種場景就非常適合使用 Phaser。
Phaser 示例代碼如下:
public class Lesson5\_6 {public static void main(String[] args) throws InterruptedException {Phaser phaser = new MyPhaser();PhaserWorker[] phaserWorkers = new PhaserWorker[5];for (int i = 0; i < phaserWorkers.length; i++) {phaserWorkers[i] = new PhaserWorker(phaser);// 注冊 Phaser 等待的線程數,執行一次等待線程數 +1phaser.register();}for (int i = 0; i < phaserWorkers.length; i++) {// 執行任務new Thread(new PhaserWorker(phaser)).start();}}static class PhaserWorker implements Runnable {private final Phaser phaser;public PhaserWorker(Phaser phaser) {this.phaser = phaser;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " | 到達" );phaser.arriveAndAwaitAdvance(); // 集合完畢發車try {Thread.sleep(new Random().nextInt(5) * 1000);System.out.println(Thread.currentThread().getName() + " | 到達" );phaser.arriveAndAwaitAdvance(); // 景點 1 集合完畢發車Thread.sleep(new Random().nextInt(5) * 1000);System.out.println(Thread.currentThread().getName() + " | 到達" );phaser.arriveAndAwaitAdvance(); // 景點 2 集合完畢發車} catch (InterruptedException e) {e.printStackTrace();}}}// Phaser 每個階段完成之后的事件通知static class MyPhaser extends Phaser{@Overrideprotected boolean onAdvance(int phase, int registeredParties) { // 每個階段執行完之后的回調switch (phase) {case 0:System.out.println("==== 集合完畢發車 ====");return false;case 1:System.out.println("==== 景點1集合完畢,發車去下一個景點 ====");return false;case 2:System.out.println("==== 景點2集合完畢,發車回家 ====");return false;default:return true;}}} }以上程序執行結果如下:
Thread-0 | 到達
Thread-4 | 到達
Thread-3 | 到達
Thread-1 | 到達
Thread-2 | 到達
==== 集合完畢發車 ====
Thread-0 | 到達
Thread-4 | 到達
Thread-1 | 到達
Thread-3 | 到達
Thread-2 | 到達
==== 景點1集合完畢,發車去下一個景點 ====
Thread-4 | 到達
Thread-3 | 到達
Thread-2 | 到達
Thread-1 | 到達
Thread-0 | 到達
==== 景點2集合完畢,發車回家 ====
相關面試題
1.以下哪個類用于控制某組資源的訪問權限?
- A:Phaser
- B:Semaphore
- C:CountDownLatch
- D:CyclicBarrier
答:B
2.以下哪個類不能被重用?
- A:Phaser
- B:Semaphore
- C:CountDownLatch
- D:CyclicBarrier
答:C
3.以下哪個方法不屬于 CountDownLatch 類?
- A:await()
- B:countDown()
- C:getCount()
- D:release()
答:D。release() 是 Semaphore 的釋放許可的方法,CountDownLatch 類并不包含此方法。
4.CyclicBarrier 與 CountDownLatch 有什么區別?
答:CyclicBarrier 與 CountDownLatch 本質上都是依賴 volatile 和 CAS 實現的,它們區別如下:
- CountDownLatch 只能使用一次,而 CyclicBarrier 可以使用多次。
- CountDownLatch 是手動指定等待一個或多個線程執行完成再執行,而 CyclicBarrier 是 n 個線程相互等待,任何一個線程完成之前,所有的線程都必須等待。
5.以下哪個類不包含 await() 方法?
- A:Semaphore
- B:CountDownLatch
- C:CyclicBarrier
答:A
6.以下程序執行花費了多長時間?
Semaphore semaphore = new Semaphore(2); ThreadPoolExecutor semaphoreThread = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); for (int i = 0; i < 3; i++) {semaphoreThread.execute(() -> {try {semaphore.release();System.out.println("Hello");TimeUnit.SECONDS.sleep(2);semaphore.acquire();} catch (InterruptedException e) {e.printStackTrace();}}); }- A:1s 以內
- B:2s 以上
答:A。循環先執行了 release() 也就是釋放許可的方法,因此程序可以一次性執行 3 個線程,同時會在 1s 以內執行完。
7.Semaphore 有哪些常用的方法?
答:常用方法如下:
- acquire():獲取一個許可。
- release():釋放一個許可。
- availablePermits():當前可用的許可數。
- acquire(int n):獲取并使用 n 個許可。
- release(int n):釋放 n 個許可。
8.Phaser 常用方法有哪些?
答:常用方法如下:
- register():注冊新的參與者到 Phaser
- arriveAndAwaitAdvance():等待其他線程執行
- arriveAndDeregister():注銷此線程
- forceTermination():強制 Phaser 進入終止態
- isTerminated():判斷 Phaser 是否終止
9.以下程序是否可以正常執行?“發車了”打印了多少次?
import java.util.concurrent.*; public class TestMain {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() {@Overridepublic void run() {System.out.println("發車了");}});for (int i = 0; i < 4; i++) {new Thread(new CyclicWorker(cyclicBarrier)).start();}}static class CyclicWorker implements Runnable {private CyclicBarrier cyclicBarrier;CyclicWorker(CyclicBarrier cyclicBarrier) {this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {for (int i = 0; i < 2; i++) {System.out.println("乘客:" + i);try {cyclicBarrier.await();System.out.println("乘客 II:" + i);cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}} }答:可以正常執行,因為執行了兩次 await(),所以“發車了”打印了 4 次。
總結
本文我們介紹了四種比 synchronized 更高級的線程同步類,其中 CountDownLatch、CyclicBarrier、Phaser 功能比較類似都是實現線程間的等待,只是它們的側重點有所不同,其中 CountDownLatch 一般用于等待一個或多個線程執行完,才執行當前線程,并且 CountDownLatch 不能重復使用;CyclicBarrier 用于等待一組線程資源都進入屏障點再共同執行;Phaser 是 JDK 7 提供的功能更加強大和更加靈活的線程輔助工具,等待所有線程達到之后,繼續或開始新的一組任務,Phaser 提供了動態增加和消除線程同步個數功能。而 Semaphore 提供的功能更像鎖,用于控制一組資源的訪問權限。
總結
以上是生活随笔為你收集整理的Java并发编程—线程同步类的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JVM—内存模型JMM
- 下一篇: 分布式实时计算—实时数据质量如何保障?