深入理解线程通信
前言
開(kāi)發(fā)中不免會(huì)遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場(chǎng)景。
或者是線程 A 在執(zhí)行到某個(gè)條件通知線程 B 執(zhí)行某個(gè)操作。
可以通過(guò)以下幾種方式實(shí)現(xiàn):
?
等待通知機(jī)制
等待通知模式是 Java 中比較經(jīng)典的線程通信方式。
兩個(gè)線程通過(guò)對(duì)同一對(duì)象調(diào)用等待 wait() 和通知 notify() 方法來(lái)進(jìn)行通訊。
如兩個(gè)線程交替打印奇偶數(shù):
public class TwoThreadWaitNotify {private int start = 1;private boolean flag = false;public static void main(String[] args) {TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();Thread t1 = new Thread(new OuNum(twoThread));t1.setName("A");Thread t2 = new Thread(new JiNum(twoThread));t2.setName("B");t1.start();t2.start();}/*** 偶數(shù)線程*/public static class OuNum implements Runnable {private TwoThreadWaitNotify number;public OuNum(TwoThreadWaitNotify number) {this.number = number;}@Overridepublic void run() {while (number.start <= 100) {synchronized (TwoThreadWaitNotify.class) {System.out.println("偶數(shù)線程搶到鎖了");if (number.flag) {System.out.println(Thread.currentThread().getName() + "+-+偶數(shù)" + number.start);number.start++;number.flag = false;TwoThreadWaitNotify.class.notify();}else {try {TwoThreadWaitNotify.class.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}}/*** 奇數(shù)線程*/public static class JiNum implements Runnable {private TwoThreadWaitNotify number;public JiNum(TwoThreadWaitNotify number) {this.number = number;}@Overridepublic void run() {while (number.start <= 100) {synchronized (TwoThreadWaitNotify.class) {System.out.println("奇數(shù)線程搶到鎖了");if (!number.flag) {System.out.println(Thread.currentThread().getName() + "+-+奇數(shù)" + number.start);number.start++;number.flag = true;TwoThreadWaitNotify.class.notify();}else {try {TwoThreadWaitNotify.class.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}}}}輸出結(jié)果:
t2+-+奇數(shù)93 t1+-+偶數(shù)94 t2+-+奇數(shù)95 t1+-+偶數(shù)96 t2+-+奇數(shù)97 t1+-+偶數(shù)98 t2+-+奇數(shù)99 t1+-+偶數(shù)100這里的線程 A 和線程 B 都對(duì)同一個(gè)對(duì)象 TwoThreadWaitNotify.class 獲取鎖,A 線程調(diào)用了同步對(duì)象的 wait() 方法釋放了鎖并進(jìn)入 WAITING 狀態(tài)。
B 線程調(diào)用了 notify() 方法,這樣 A 線程收到通知之后就可以從 wait() 方法中返回。
這里利用了 TwoThreadWaitNotify.class 對(duì)象完成了通信。
有一些需要注意:
-
wait() 、nofify() 、nofityAll() 調(diào)用的前提都是獲得了對(duì)象的鎖(也可稱為對(duì)象監(jiān)視器)。
-
調(diào)用 wait() 方法后線程會(huì)釋放鎖,進(jìn)入?WAITING?狀態(tài),該線程也會(huì)被移動(dòng)到等待隊(duì)列中。
-
調(diào)用 notify() 方法會(huì)將等待隊(duì)列中的線程移動(dòng)到同步隊(duì)列中,線程狀態(tài)也會(huì)更新為?BLOCKED
-
從 wait() 方法返回的前提是調(diào)用 notify() 方法的線程釋放鎖,wait() 方法的線程獲得鎖。
等待通知有著一個(gè)經(jīng)典范式:
線程 A 作為消費(fèi)者:
獲取對(duì)象的鎖。
進(jìn)入 while(判斷條件),并調(diào)用 wait() 方法。
當(dāng)條件滿足跳出循環(huán)執(zhí)行具體處理邏輯。
線程 B 作為生產(chǎn)者:
獲取對(duì)象鎖。
更改與線程 A 共用的判斷條件。
調(diào)用 notify() 方法。
偽代碼如下:
//Thread A synchronized(Object){while(條件){Object.wait();}//do something }//Thread B synchronized(Object){條件=false;//改變條件Object.notify(); }?
join() 方法
private static void join() throws InterruptedException {Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {LOGGER.info("running");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}}) ;Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {LOGGER.info("running2");try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}}}) ;t1.start();t2.start();//等待線程1終止t1.join();//等待線程2終止t2.join();LOGGER.info("main over");}
輸出結(jié)果:
2018-03-16 20:21:30.967 [Thread-1] INFO ?c.c.actual.ThreadCommunication - running2 2018-03-16 20:21:30.967 [Thread-0] INFO ?c.c.actual.ThreadCommunication - running 2018-03-16 20:21:34.972 [main] INFO ?c.c.actual.ThreadCommunication - main over在 t1.join() 時(shí)會(huì)一直阻塞到 t1 執(zhí)行完畢,所以最終主線程會(huì)等待 t1 和 t2 線程執(zhí)行完畢。
其實(shí)從源碼可以看出,join() 也是利用的等待通知機(jī)制:
核心邏輯:
while (isAlive()) {wait(0); }在 join 線程完成后會(huì)調(diào)用 notifyAll() 方法,是在 JVM 實(shí)現(xiàn)中調(diào)用,所以這里看不出來(lái)。
?
volatile 共享內(nèi)存
因?yàn)?Java 是采用共享內(nèi)存的方式進(jìn)行線程通信的,所以可以采用以下方式用主線程關(guān)閉 A 線程:
public class Volatile implements Runnable{private static volatile boolean flag = true ;@Overridepublic void run() {while (flag){System.out.println(Thread.currentThread().getName() + "正在運(yùn)行。。。");}System.out.println(Thread.currentThread().getName() +"執(zhí)行完畢");}public static void main(String[] args) throws InterruptedException {Volatile aVolatile = new Volatile();new Thread(aVolatile,"thread A").start();System.out.println("main 線程正在運(yùn)行") ;TimeUnit.MILLISECONDS.sleep(100) ;aVolatile.stopThread();}private void stopThread(){flag = false ;}}輸出結(jié)果:
thread A正在運(yùn)行。。。 thread A正在運(yùn)行。。。 thread A正在運(yùn)行。。。 thread A正在運(yùn)行。。。 thread A執(zhí)行完畢這里的 flag 存放于主內(nèi)存中,所以主線程和線程 A 都可以看到。
flag 采用 volatile 修飾主要是為了內(nèi)存可見(jiàn)性,更多內(nèi)容可以查看這里。
?
CountDownLatch 并發(fā)工具
CountDownLatch 可以實(shí)現(xiàn) join 相同的功能,但是更加的靈活。
? ?private static void countDownLatch() throws Exception{int thread = 3 ;long start = System.currentTimeMillis();final CountDownLatch countDown = new CountDownLatch(thread);for (int i= 0 ;i<thread ; i++){new Thread(new Runnable() {@Overridepublic void run() {LOGGER.info("thread run");try {Thread.sleep(2000);countDown.countDown();LOGGER.info("thread end");} catch (InterruptedException e) {e.printStackTrace();}}}).start();}countDown.await();long stop = System.currentTimeMillis();LOGGER.info("main over total time={}",stop-start);}輸出結(jié)果:
2018-03-16 20:19:44.126 [Thread-0] INFO ?c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-2] INFO ?c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-1] INFO ?c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:46.136 [Thread-2] INFO ?c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-1] INFO ?c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-0] INFO ?c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [main] INFO ?c.c.actual.ThreadCommunication - main over total time=2012CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 實(shí)現(xiàn)的,更多實(shí)現(xiàn)參考 ReentrantLock 實(shí)現(xiàn)原理
-
初始化一個(gè) CountDownLatch 時(shí)告訴并發(fā)的線程,然后在每個(gè)線程處理完畢之后調(diào)用 countDown() 方法。
-
該方法會(huì)將 AQS 內(nèi)置的一個(gè) state 狀態(tài) -1 。
-
最終在主線程調(diào)用 await() 方法,它會(huì)阻塞直到?state==0?的時(shí)候返回。
?
線程響應(yīng)中斷
public class StopThread implements Runnable {@Overridepublic void run() {while ( !Thread.currentThread().isInterrupted()) {// 線程執(zhí)行具體邏輯System.out.println(Thread.currentThread().getName() + "運(yùn)行中。。");}System.out.println(Thread.currentThread().getName() + "退出。。");}public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(new StopThread(), "thread A");thread.start();System.out.println("main 線程正在運(yùn)行") ;TimeUnit.MILLISECONDS.sleep(10) ;thread.interrupt();}}
輸出結(jié)果:
thread A運(yùn)行中。。 thread A運(yùn)行中。。 thread A退出。。可以采用中斷線程的方式來(lái)通信,調(diào)用了 thread.interrupt() 方法其實(shí)就是將 thread 中的一個(gè)標(biāo)志屬性置為了 true。
并不是說(shuō)調(diào)用了該方法就可以中斷線程,如果不對(duì)這個(gè)標(biāo)志進(jìn)行響應(yīng)其實(shí)是沒(méi)有什么作用(這里對(duì)這個(gè)標(biāo)志進(jìn)行了判斷)。
但是如果拋出了 InterruptedException 異常,該標(biāo)志就會(huì)被 JVM 重置為 false。
?
線程池 awaitTermination() 方法
如果是用線程池來(lái)管理線程,可以使用以下方式來(lái)讓主線程等待線程池中所有任務(wù)執(zhí)行完畢:
? ?private static void executorService() throws Exception{BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;poolExecutor.execute(new Runnable() {@Overridepublic void run() {LOGGER.info("running");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}});poolExecutor.execute(new Runnable() {@Overridepublic void run() {LOGGER.info("running2");try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}});poolExecutor.shutdown();while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){LOGGER.info("線程還在執(zhí)行。。。");}LOGGER.info("main over");}輸出結(jié)果:
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO ?c.c.actual.ThreadCommunication - running2 2018-03-16 20:18:01.273 [pool-1-thread-1] INFO ?c.c.actual.ThreadCommunication - running 2018-03-16 20:18:02.273 [main] INFO ?c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。 2018-03-16 20:18:03.278 [main] INFO ?c.c.actual.ThreadCommunication - 線程還在執(zhí)行。。。 2018-03-16 20:18:04.278 [main] INFO ?c.c.actual.ThreadCommunication - main over使用這個(gè) awaitTermination() 方法的前提需要關(guān)閉線程池,如調(diào)用了 shutdown() 方法。
調(diào)用了 shutdown() 之后線程池會(huì)停止接受新任務(wù),并且會(huì)平滑的關(guān)閉線程池中現(xiàn)有的任務(wù)。
?
管道通信
? ?public static void piped() throws IOException {//面向于字符 PipedInputStream 面向于字節(jié)PipedWriter writer = new PipedWriter();PipedReader reader = new PipedReader();//輸入輸出流建立連接writer.connect(reader);Thread t1 = new Thread(new Runnable() {@Overridepublic void run() {LOGGER.info("running");try {for (int i = 0; i < 10; i++) {writer.write(i+"");Thread.sleep(10);}} catch (Exception e) {} finally {try {writer.close();} catch (IOException e) {e.printStackTrace();}}}});Thread t2 = new Thread(new Runnable() {@Overridepublic void run() {LOGGER.info("running2");int msg = 0;try {while ((msg = reader.read()) != -1) {LOGGER.info("msg={}", (char) msg);}} catch (Exception e) {}}});t1.start();t2.start();}
輸出結(jié)果:
2018-03-16 19:56:43.014 [Thread-0] INFO ?c.c.actual.ThreadCommunication - running 2018-03-16 19:56:43.014 [Thread-1] INFO ?c.c.actual.ThreadCommunication - running2 2018-03-16 19:56:43.130 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=0 2018-03-16 19:56:43.132 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=1 2018-03-16 19:56:43.132 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=2 2018-03-16 19:56:43.133 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=3 2018-03-16 19:56:43.133 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=4 2018-03-16 19:56:43.133 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=5 2018-03-16 19:56:43.133 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=6 2018-03-16 19:56:43.134 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=7 2018-03-16 19:56:43.134 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=8 2018-03-16 19:56:43.134 [Thread-1] INFO ?c.c.actual.ThreadCommunication - msg=9Java 雖說(shuō)是基于內(nèi)存通信的,但也可以使用管道通信。
需要注意的是,輸入流和輸出流需要首先建立連接。這樣線程 B 就可以收到線程 A 發(fā)出的消息了。
實(shí)際開(kāi)發(fā)中可以靈活根據(jù)需求選擇最適合的線程通信方式。
總結(jié)
- 上一篇: 你应该知道的 volatile 关键字
- 下一篇: 应用限流