Java多线程编程模式实战指南(三):Two-phase Termination模式--转载
本文由本人首次發(fā)布在infoq中文站上:http://www.infoq.com/cn/articles/java-multithreaded-programming-mode-two-phase-termination。轉(zhuǎn)載請注明作者: 黃文海 出處:http://viscent.iteye.com。
?
停止線程是一個目標(biāo)簡單而實現(xiàn)卻不那么簡單的任務(wù)。首先,Java沒有提供直接的API用于停止線程。此外,停止線程時還有一些額外的細(xì)節(jié)需要考慮,如待停止的線程處于阻塞(等待鎖)或者等待狀態(tài)(等待其它線程)、尚有未處理完的任務(wù)等。本文介紹的Two-phase Termination模式提供了一種通用的用于優(yōu)雅地停止線程的方法。
Two-phase Termination模式簡介
Java并沒有提供直接的API用于停止線程。Two-phase Termination模式通過將停止線程這個動作分解為準(zhǔn)備階段和執(zhí)行階段這兩個階段,以應(yīng)對停止線程過程中可能存在的問題。
準(zhǔn)備階段。該階段主要動作是“通知”目標(biāo)線程(欲停止的線程)準(zhǔn)備進(jìn)行停止。這一步會設(shè)置一個標(biāo)志變量用于指示目標(biāo)線程可以準(zhǔn)備停止了。但是,由于目標(biāo)線程可能正處于阻塞狀態(tài)(等待鎖的獲得)、等待狀態(tài)(如調(diào)用Object.wait)或者I/O(如InputStream.read)等待等狀態(tài),即便設(shè)置了這個標(biāo)志,目標(biāo)線程也無法立即“看到”這個標(biāo)志而做出相應(yīng)動作。因此,這一階段還需要通過調(diào)用目標(biāo)線程的interrupt方法,以期望目標(biāo)線程能夠通過捕獲相關(guān)的異常偵測到該方法調(diào)用,從而中斷其阻塞狀態(tài)、等待狀態(tài)。對于能夠?qū)nterrupt方法調(diào)用作出響應(yīng)的方法(參見表1),目標(biāo)線程代碼可以通過捕獲這些方法拋出的InterruptedException來偵測線程停止信號。但也有一些方法(如InputStream.read)并不對interrupt調(diào)用作出響應(yīng),此時需要我們手工處理,如同步的Socket I/O操作中通過關(guān)閉socket,使處于I/O等待的socket拋出java.net.SocketException。
?
表 1. 能夠?qū)hread.interrupt作出響應(yīng)的一些方法
| 方法 | 響應(yīng)interrupt調(diào)用拋出的異常 |
| Object.wait() 、 Object.wait(long timeout) 、Object.wait(long timeout, int nanos) | InterruptedException |
| Thread.sleep(long millis) 、Thread.sleep(long millis, int nanos) | InterruptedException |
| Thread.join()、Thread.join(long millis) 、Thread.join(long millis, int nanos) | InterruptedException |
| java.util.concurrent.BlockingQueue.take() | InterruptedException |
| java.util.concurrent.locks.Lock.lockInterruptibly() | InterruptedException |
| java.nio.channels.InterruptibleChannel | java.nio.channels.ClosedByInterruptException |
執(zhí)行階段。該階段的主要動作是檢查準(zhǔn)備階段所設(shè)置的線程停止標(biāo)志和信號,在此基礎(chǔ)上決定線程停止的時機(jī),并進(jìn)行適當(dāng)?shù)摹扒謇怼辈僮鳌?/p>
Two-phase Termination模式的架構(gòu)
Two-phase Termination模式的主要參與者有以下幾種。其類圖如圖1所示。
圖 1. Two-phase Termination模式的類圖
- ThreadOwner:目標(biāo)線程的擁有者。Java語言中,并沒有線程的擁有者的概念,但是線程的背后是其要處理的任務(wù)或者其所提供的服務(wù),因此我們不能在不清楚某個線程具體是做什么的情況下貿(mào)然將其停止。一般地,我們可以將目標(biāo)線程的創(chuàng)建者視為該線程的擁有者,并假定其“知道”目標(biāo)線程的工作內(nèi)容,可以安全地停止目標(biāo)線程。
- TerminatableThread:可停止的線程。其主要方法及職責(zé)如下:
- terminate:設(shè)置線程停止標(biāo)志,并發(fā)送停止“信號”給目標(biāo)線程。
- doTerminate:留給子類實現(xiàn)線程停止時所需的一些額外操作,如目標(biāo)線程代碼中包含Socket I/O,子類可以在該方法中關(guān)閉Socket以達(dá)到快速停止線程,而不會使目標(biāo)線程等待I/O完成才能偵測到線程停止標(biāo)記。
- doRun:留給子類實現(xiàn)線程的處理邏輯。相當(dāng)于Thread.run,只不過該方法中無需關(guān)心停止線程的邏輯,因為這個邏輯已經(jīng)被封裝在TerminatableThread的run方法中了。
- doCleanup:留給子類實現(xiàn)線程停止后可能需要的一些清理動作。
- TerminationToken:線程停止標(biāo)志。toShutdown用于指示目標(biāo)線程可以停止了。reservations可用于反映目標(biāo)線程還有多少數(shù)量未完成的任務(wù),以支持等目標(biāo)線程處理完其任務(wù)后再行停止。
準(zhǔn)備階段的序列圖如圖2所示:
圖 2. 準(zhǔn)備階段的序列圖
1、客戶端代碼調(diào)用線程擁有者的shutdown方法。
2、shutdown方法調(diào)用目標(biāo)線程的terminate方法。
3~4、terminate方法將terminationToken的toShutdown標(biāo)志設(shè)置為true。
5、terminate方法調(diào)用由TerminatableThread子類實現(xiàn)的doTerminate方法,使得子類可以為停止目標(biāo)線程做一些其它必要的操作。
6、若terminationToken的reservations屬性值為0,則表示目標(biāo)線程沒有未處理完的任務(wù)或者ThreadOwner在停止線程時不關(guān)心其是否有未處理的任務(wù)。此時,terminate方法會調(diào)用目標(biāo)線程的interrupt方法。
7、terminate方法調(diào)用結(jié)束。
8、shutdown調(diào)用返回,此時目標(biāo)線程可能還仍然在運(yùn)行。
執(zhí)行階段由目標(biāo)線程的代碼去檢查terminationToken的toShutdown屬性、reservations屬性的值,并捕獲由interrupt方法調(diào)用拋出的相關(guān)異常以決定是否停止線程。在線程停止前由TerminatableThread子類實現(xiàn)的doCleanup方法會被調(diào)用。
Two-phase Termination模式實戰(zhàn)案例
某系統(tǒng)需要對接告警系統(tǒng)以實現(xiàn)告警功能。告警系統(tǒng)是一個C/S結(jié)構(gòu)的系統(tǒng),它提供了一套客戶端API(AlarmAgent)用于與其對接的系統(tǒng)給其發(fā)送告警。該系統(tǒng)將告警功能封裝在一個名為AlarmMgr的單件類(Singleton)中,系統(tǒng)中其它代碼需要發(fā)送告警的只需要調(diào)用該類的sendAlarm方法。該方法將告警信息緩存入隊列,由專門的告警發(fā)送線程負(fù)責(zé)調(diào)用AlarmAgent的相關(guān)方法將告警信息發(fā)送至告警服務(wù)器。
告警發(fā)送線程是一個用戶線程(User Thread),因此在系統(tǒng)的停止過程中,該線程若未停止則會阻止JVM正常關(guān)閉。所以,在系統(tǒng)停止過程中我們必須主動去停止告警發(fā)送線程,而非依賴JVM。為了能夠盡可能快的以優(yōu)雅的方式將告警發(fā)送線程停止,我們需要處理以下兩個問題:
上述問題可以通過使用Two-phase Termination模式來解決。
AlarmMgr相當(dāng)于圖1中的ThreadOwner參與者實例,它是告警發(fā)送線程的擁有者。系統(tǒng)停止過程中調(diào)用其shutdown方法(AlarmMgr.getInstance().shutdown())即可請求告警發(fā)送線程停止。其代碼如清單1所示:
清單 1. AlarmMgr源碼
public class AlarmMgr {private final BlockingQueue<AlarmInfo> alarms = new LinkedBlockingQueue<AlarmInfo>();//告警系統(tǒng)客戶端APIprivate final AlarmAgent alarmAgent = new AlarmAgent();//告警發(fā)送線程private final AbstractTerminatableThread alarmSendingThread;private boolean shutdownRequested = false;private static final AlarmMgr INSTANCE = new AlarmMgr();private AlarmMgr() {alarmSendingThread = new AbstractTerminatableThread() {@Overrideprotected void doRun() throws Exception {if (alarmAgent.waitUntilConnected()) {AlarmInfo alarm;alarm = alarms.take();terminationToken.reservations.decrementAndGet();try {alarmAgent.sendAlarm(alarm);} catch (Exception e) {e.printStackTrace();}}}@Overrideprotected void doCleanup(Exception exp) {if (null != exp) {exp.printStackTrace();}alarmAgent.disconnect();}};alarmAgent.init();}public static AlarmMgr getInstance() {return INSTANCE;}public void sendAlarm(AlarmType type, String id, String extraInfo) {final TerminationToken terminationToken = alarmSendingThread.terminationToken;if (terminationToken.isToShutdown()) {// log the alarmSystem.err.println("rejected alarm:" + id + "," + extraInfo);return;}try {AlarmInfo alarm = new AlarmInfo(id, type);alarm.setExtraInfo(extraInfo);terminationToken.reservations.incrementAndGet();alarms.add(alarm);} catch (Throwable t) {t.printStackTrace();}}public void init() {alarmSendingThread.start();}public synchronized void shutdown() {if (shutdownRequested) {throw new IllegalStateException("shutdown already requested!");}alarmSendingThread.terminate();shutdownRequested = true;}public int pendingAlarms() {return alarmSendingThread.terminationToken.reservations.get();}}class AlarmAgent {// 省略其它代碼private volatile boolean connectedToServer = false;public void sendAlarm(AlarmInfo alarm) throws Exception {// 省略其它代碼System.out.println("Sending " + alarm);try {Thread.sleep(50);} catch (Exception e) {}}public void init() {// 省略其它代碼connectedToServer = true;}public void disconnect() {// 省略其它代碼System.out.println("disconnected from alarm server.");}public boolean waitUntilConnected() {// 省略其它代碼return connectedToServer;} }從上面的代碼可以看出,AlarmMgr每接受一個告警信息放入緩存隊列便將terminationToken的reservations值增加1,而告警發(fā)送線程每發(fā)送一個告警到告警服務(wù)器則將terminationToken的reservations值減少1。這為我們可以在停止告警發(fā)送線程前確保隊列中現(xiàn)有的告警信息會被處理完畢提供了線索:AbstractTerminatableThread的run方法會根據(jù)terminationToken的reservations是否為0來判斷待停止的線程已無未處理的任務(wù),或者無需關(guān)心其是否有待處理的任務(wù)。
AbstractTerminatableThread的源碼見清單2:
清單 2. AbstractTerminatableThread源碼
public abstract class AbstractTerminatableThread extends Thread implements Terminatable {public final TerminationToken terminationToken;public AbstractTerminatableThread() {super();this.terminationToken = new TerminationToken();}/*** * @param terminationToken 線程間共享的線程終止標(biāo)志實例*/public AbstractTerminatableThread(TerminationToken terminationToken) {super();this.terminationToken = terminationToken;}protected abstract void doRun() throws Exception;protected void doCleanup(Exception cause) {}protected void doTerminiate() {}@Overridepublic void run() {Exception ex = null;try {while (true) {/** 在執(zhí)行線程的處理邏輯前先判斷線程停止的標(biāo)志。*/if (terminationToken.isToShutdown()&& terminationToken.reservations.get() <= 0) {break;}doRun();}} catch (Exception e) {// Allow the thread to terminate in response of a interrupt invocationex = e;} finally {doCleanup(ex);}}@Overridepublic void interrupt() {terminate();}@Overridepublic void terminate() {terminationToken.setToShutdown(true);try {doTerminiate();} finally {// 若無待處理的任務(wù),則試圖強(qiáng)制終止線程if (terminationToken.reservations.get() <= 0) {super.interrupt();}}}}AbstractTerminatableThread是一個可復(fù)用的TerminatableThread參與者實例。其terminate方法完成了線程停止的準(zhǔn)備階段。該方法首先將terminationToken的toShutdown變量設(shè)置為true,指示目標(biāo)線程可以準(zhǔn)備停止了。但是,此時目標(biāo)線程可能處于一些阻塞(Blocking)方法的調(diào)用,如調(diào)用Object.sleep、InputStream.read等,無法偵測到該變量。調(diào)用目標(biāo)線程的interrupt方法可以使一些阻塞方法(參見表1)通過拋出異常從而使目標(biāo)線程停止。但也有些阻塞方法如InputStream.read并不對interrupt方法調(diào)用作出響應(yīng),此時需要由TerminatableThread的子類實現(xiàn)doTerminiate方法,在該方法中實現(xiàn)一些關(guān)閉目標(biāo)線程所需的額外操作。例如,在Socket同步I/O中通過關(guān)閉socket使得使用該socket的線程若處于I/O等待會拋出SocketException。因此,terminate方法下一步調(diào)用doTerminate方法。接著,若terminationToken.reservations的值為非正數(shù)(表示目標(biāo)線程無待處理任務(wù)、或者我們不關(guān)心其是否有待處理任務(wù)),則terminate方法會調(diào)用目標(biāo)線程的interrupt方法,強(qiáng)制目標(biāo)線程的阻塞方法中斷,從而強(qiáng)制終止目標(biāo)線程。
執(zhí)行階段在AbstractTerminatableThread的run方法中完成。該方法通過對TerminationToken的toShutdown屬性和reservations屬性的判斷或者通過捕獲由interrupt方法調(diào)用而拋出的異常來終止線程。并在線程終止前調(diào)用由TerminatableThread子類實現(xiàn)的doCleanup方法用于執(zhí)行一些清理動作。
在執(zhí)行階段,由于AbstractTerminatableThread.run方法每次執(zhí)行線程處理邏輯(通過調(diào)用doRun方法實現(xiàn))前都先判斷下toShutdown屬性和reservations屬性的值,在目標(biāo)線程處理完其待處理的任務(wù)后(此時reservations屬性的值為非正數(shù))目標(biāo)線程run方法也就退出了while循環(huán)。因此,線程的處理邏輯代碼(doRun方法)將不再被調(diào)用,從而使本案例在不使用Two-phase Termination模式的情況下停止目標(biāo)線程存在的兩個問題得以解決(目標(biāo)線程停止前可以保證其處理完待處理的任務(wù)——發(fā)送隊列中現(xiàn)有的告警信息到服務(wù)器)和規(guī)避(目標(biāo)線程發(fā)送完隊列中現(xiàn)有的告警信息后,doRun方法不再被調(diào)用,從而避免了隊列為空時BlockingQueue.take調(diào)用導(dǎo)致的阻塞)。
從上可知,準(zhǔn)備階段、執(zhí)行階段需要通過TerminationToken作為“中介”來協(xié)調(diào)二者的動作。TerminationToken的源碼如清單3所示:
清單 3. TerminationToken源碼
public class TerminationToken {//使用volatile修飾,以保證無需顯示鎖的情況下該變量的內(nèi)存可見性protected volatile boolean toShutdown = false;public final AtomicInteger reservations = new AtomicInteger(0);public boolean isToShutdown() {return toShutdown;}protected void setToShutdown(boolean toShutdown) {this.toShutdown = true;}}Two-phase Termination模式的評價與實現(xiàn)考量
Two-phase Termination模式使得我們可以對各種形式的目標(biāo)線程進(jìn)行優(yōu)雅的停止。如目標(biāo)線程調(diào)用了能夠?qū)nterrupt方法調(diào)用作出響應(yīng)的阻塞方法、目標(biāo)線程調(diào)用了不能對interrupt方法調(diào)用作出響應(yīng)的阻塞方法、目標(biāo)線程作為消費(fèi)者處理其它線程生產(chǎn)的“產(chǎn)品”在其停止前需要處理完現(xiàn)有“產(chǎn)品”等。Two-phase Termination模式實現(xiàn)的線程停止可能出現(xiàn)延遲,即客戶端代碼調(diào)用完ThreadOwner.shutdown后,該線程可能仍在運(yùn)行。
本文案例展示了一個可復(fù)用的Two-phase Termination模式實現(xiàn)代碼。讀者若要自行實現(xiàn)該模式,可能需要注意以下幾個問題。
線程停止標(biāo)志
本文案例使用了TerminationToken作為目標(biāo)線程可以準(zhǔn)備停止的標(biāo)志。從清單3的代碼我們可以看到,TerminationToken使用了toShutdown這個boolean變量作為主要的停止標(biāo)志,而非使用Thread.isInterrupted()。這是因為,調(diào)用目標(biāo)線程的interrupt方法無法保證目標(biāo)線程的isInterrupted()方法返回值為true:目標(biāo)線程可能調(diào)用一些能夠捕獲InterruptedException而不保留線程中斷狀態(tài)的代碼。另外,toShutdown這個變量為了保證內(nèi)存可見性而又能避免使用顯式鎖的開銷,采用了volatile修飾。這點也很重要,筆者曾經(jīng)見過一些采用boolean變量作為線程停止標(biāo)志的代碼,只是這些變量沒有用volatile修飾,對其訪問也沒有加鎖,這就可能無法停止目標(biāo)線程。
生產(chǎn)者——消費(fèi)者問題中的線程停止
在多線程編程中,許多問題和一些多線程編程模式都可以看作生產(chǎn)者——消費(fèi)者問題。停止處于生產(chǎn)者——消費(fèi)者問題中的線程,需要考慮更多的問題:需要注意線程的停止順序,如果消費(fèi)者線程比生產(chǎn)者線程先停止則會導(dǎo)致生產(chǎn)者生產(chǎn)的新”產(chǎn)品“無法被處理,而如果先停止生產(chǎn)者線程又可能使消費(fèi)者線程處于空等待(如生產(chǎn)者消費(fèi)者采用阻塞隊列中轉(zhuǎn)”產(chǎn)品“)。并且,停止消費(fèi)者線程前是否考慮要等待其處理完所有待處理的任務(wù)或者將這些任務(wù)做個備份也是個問題。本文案例部分地展示生產(chǎn)者——消費(fèi)者問題中線程停止的處理,其核心就是通過使用TerminationToken的reservations變量:生產(chǎn)者每”生產(chǎn)“一個產(chǎn)品,Two-phase Termination模式的調(diào)用方代碼要使reservations變量值增加1(terminationToken.reservations.incrementAndGet());消費(fèi)者線程每處理一個產(chǎn)品,Two-phase Termination模式的調(diào)用方代碼要使reservations變量值減少1(terminationToken.reservations.decrementAndGet())。當(dāng)然,在停止消費(fèi)者線程時如果我們不關(guān)心其待處理的任務(wù),Two-phase Termination模式的調(diào)用方代碼可以忽略對reservations變量的操作。清單4展示了一個完整的停止生產(chǎn)者——消費(fèi)者問題中的線程的例子:
清單 4. 停止生產(chǎn)者——消費(fèi)者問題中的線程的例子
public class ProducerConsumerStop {class SampleConsumer<P> {private final BlockingQueue<P> queue = new LinkedBlockingQueue<P>();private AbstractTerminatableThread workThread = new AbstractTerminatableThread() {@Overrideprotected void doRun() throws Exception {terminationToken.reservations.decrementAndGet();P product = queue.take();// ...System.out.println(product);}};public void placeProduct(P product) {if (workThread.terminationToken.isToShutdown()) {throw new IllegalStateException("Thread shutdown");}try {queue.put(product);workThread.terminationToken.reservations.incrementAndGet();} catch (InterruptedException e) {}}public void shutdown() {workThread.terminate();}public void start() {workThread.start();}}public void test() {final SampleConsumer<String> aConsumer = new SampleConsumer<String>();AbstractTerminatableThread aProducer = new AbstractTerminatableThread() {private int i = 0;@Overrideprotected void doRun() throws Exception {aConsumer.placeProduct(String.valueOf(i));}@Overrideprotected void doCleanup(Exception cause) {// 生產(chǎn)者線程停止完畢后再請求停止消費(fèi)者線程aConsumer.shutdown();}};aProducer.start();aConsumer.start();} }隱藏而非暴露可停止的線程
為了保證可停止的線程不被其它代碼誤停止,一般我們將可停止線程隱藏在線程擁有者背后,而使系統(tǒng)中其它代碼無法直接訪問該線程,正如本案例代碼(見清單1)所展示:AlarmMgr定義了一個private字段alarmSendingThread用于引用告警發(fā)送線程(可停止的線程),系統(tǒng)中的其它代碼只能通過調(diào)用AlarmMgr的shutdown方法來請求該線程停止,而非通過引用該線程對象自身來停止它。
總結(jié)
本文介紹了Two-phase Termination模式的意圖及架構(gòu)。并結(jié)合筆者工作經(jīng)歷提供了一個實際的案例用于展示一個可復(fù)用的Two-phase Termination模式實現(xiàn)代碼,在此基礎(chǔ)上對該模式進(jìn)行了評價并分享在實際運(yùn)用該模式時需要注意的事項。
參考資源
- 本文的源代碼在線閱讀:https://github.com/Viscent/JavaConcurrencyPattern/
- Brian G?etz et al.,Java Concurrency In Practice
- Mark Grand,Patterns in Java,Volume 1, 2nd Edition
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4249078.html
總結(jié)
以上是生活随笔為你收集整理的Java多线程编程模式实战指南(三):Two-phase Termination模式--转载的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java多线程编程模式实战指南(二):I
- 下一篇: Referenced file cont