高级同步器:可重用的同步屏障Phaser
引自:https://shift-alt-ctrl.iteye.com/blog/2302923
在JAVA 1.7引入了一個新的并發(fā)API:Phaser,一個可重用的同步barrier。在此前,JAVA已經(jīng)有CyclicBarrier、CountDownLatch這兩種同步barrier,但是Phaser更加靈活,而且側(cè)重于“重用”。
一、簡述
? ? 1、注冊機制:與其他barrier不同的是,Phaser中的“注冊的同步者(parties)”會隨時間而變化,Phaser可以通過構(gòu)造器初始化parties個數(shù),也可以在Phaser運行期間隨時加入(register)新的parties,以及在運行期間注銷(deregister)parties。運行時可以隨時加入、注銷parties,只會影響Phaser內(nèi)部的計數(shù)器,它建立任何內(nèi)部的bookkeeping(賬本),因此task不能查詢自己是否已經(jīng)注冊了,當(dāng)然你可以通過實現(xiàn)子類來達成這一設(shè)計要求。
Java代碼???
? ? 此外,CyclicBarrier、CountDownLatch需要在初始化的構(gòu)造函數(shù)中指定同步者的個數(shù),且運行時無法再次調(diào)整。
Java代碼???
? ? 2、同步機制:類似于CyclicBarrier,Phaser也可以awaited多次,它的arrivedAndAwaitAdvance()方法的效果類似于CyclicBarrier的await()。Phaser的每個周期(generation)都有一個phase數(shù)字,phase 從0開始,當(dāng)所有的已注冊的parties都到達后(arrive)將會導(dǎo)致此phase數(shù)字自增(advance),當(dāng)達到Integer.MAX_VALUE后繼續(xù)從0開始。這個phase數(shù)字用于表示當(dāng)前parties所處于的“階段周期”,它既可以標(biāo)記和控制parties的wait行為、喚醒等待的時機。
? ? 1)Arrival:Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞(block),但是會返回相應(yīng)的phase數(shù)字,當(dāng)此phase中最后一個party也arrive以后,phase數(shù)字將會增加,即phase進入下一個周期,同時觸發(fā)(onAdvance)那些阻塞在上一phase的線程。這一點類似于CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現(xiàn)更多的觸發(fā)行為。
?
? ? 2)Waiting:Phaser中的awaitAdvance()方法,需要指定一個phase數(shù)字,表示此Thread阻塞直到phase推進到此周期,arriveAndAwaitAdvance()方法阻塞到下一周期開始(或者當(dāng)前phase結(jié)束)。不像CyclicBarrier,即使等待Thread已經(jīng)interrupted,awaitAdvance方法會繼續(xù)等待。Phaser提供了Interruptible和Timout的阻塞機制,不過當(dāng)線程Interrupted或者timout之后將會拋出異常,而不會修改Phaser的內(nèi)部狀態(tài)。如果必要的話,你可以在遇到此類異常時,進行相應(yīng)的恢復(fù)操作,通常是在調(diào)用forceTermination()方法之后。
? ? Phaser通常在ForJoinPool中執(zhí)行tasks,它可以在有task阻塞等待advance時,確保其他tasks的充分并行能力。
?
? ? 3、中斷(終止):Phaser可以進入Termination狀態(tài),可以通過isTermination()方法判斷;當(dāng)Phaser被終止后,所有的同步方法將會立即返回(解除阻塞),不需要等到advance(即advance也會解除阻塞),且這些阻塞方法將會返回一個負值的phase值(awaitAdvance方法、arriveAndAwaitAdvance方法)。當(dāng)然,向一個termination狀態(tài)的Phaser注冊party將不會有效;此時onAdvance()方法也將會返回true(默認實現(xiàn)),即所有的parties都會被deregister,即register個數(shù)為0。
?
? ? 4、Tiering(分層):Phaser可以“分層”,以tree的方式構(gòu)建Phaser來降低“競爭”。如果一個Phaser中有大量parties,這會導(dǎo)致嚴(yán)重的同步競爭,所以我們可以將它們分組并共享一個parent Phaser,這樣可以提高吞吐能力;Phaser中注冊和注銷parties都會有Child 和parent Phaser自動管理。當(dāng)Child Phaser中中注冊的parties變?yōu)榉?時(在構(gòu)造函數(shù)Phaser(Phaser parent,int parties),或者register()方法),Child Phaser將會注冊到其Parent上;當(dāng)Child Phaser中的parties變?yōu)?時(比如由arrivedAndDegister()方法),那么此時Child Phaser也將從其parent中注銷出去。
?
? ? 5、監(jiān)控:同步的方法只會被register操作調(diào)用,對于當(dāng)前state的監(jiān)控方法可以在任何時候調(diào)用,比如getRegisteredParties()獲取已經(jīng)注冊的parties個數(shù),getPhase()獲取當(dāng)前phase周期數(shù)等;因為這些方法并非同步,所以只能反映當(dāng)時的瞬間狀態(tài)。
?
二、常用的Barrier比較
? ? 1、CountDownLatch
Java代碼?? //創(chuàng)建時,就需要指定參與的parties個數(shù) int parties = 12; CountDownLatch latch = new CountDownLatch(parties); //線程池中同步task ExecutorService executor = Executors.newFixedThreadPool(parties); for(int i = 0; i < parties; i++) { executor.execute(new Runnable() { @Override public void run() { try { //可以在任務(wù)執(zhí)行開始時執(zhí)行,表示所有的任務(wù)都啟動后,主線程的await即可解除 //latch.countDown(); //run //.. Thread.sleep(3000); } catch (Exception e) { } finally { //任務(wù)執(zhí)行完畢后:到達 //表示所有的任務(wù)都結(jié)束,主線程才能繼續(xù) latch.countDown(); } } }); } latch.await();//主線程阻塞,直到所有的parties到達 //latch上所有的parties都達到后,再次執(zhí)行await將不會有效, //即barrier是不可重用的 executor.shutdown();?
? ? 2、CyclicBarrier
Java代碼?? //創(chuàng)建時,就需要指定參與的parties個數(shù) int parties = 12; CyclicBarrier barrier = new CyclicBarrier(parties); //線程池中同步task ExecutorService executor = Executors.newFixedThreadPool(parties); for(int i = 0; i < parties; i++) { executor.execute(new Runnable() { @Override public void run() { try { int i = 0; while (i < 3 && !barrier.isBroken()) { System.out.println("generation begin:" + i + ",tid:" + Thread.currentThread().getId()); Thread.sleep(3000); //如果所有的parties都到達,則開啟新的一次周期(generation) //barrier可以被重用 barrier.await(); i++; } } catch (Exception e) { e.printStackTrace(); } finally { } } }); } Thread.sleep(100000);?
? ? 3、Phaser
Java代碼?? package com.thread;import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Phaser;/*** 同步器:Phaser,可重用的同步屏障* @author Administrator**/ public class PhaserTest {public static void main(String[] args) {// 創(chuàng)建時,就需要指定參與的parties個數(shù)// 可以在創(chuàng)建時不指定parties// 而是在運行時,隨時注冊和注銷新的partiesint parties = 12;Phaser phaser = new Phaser();// 主線程先注冊一個// 對應(yīng)下文中,主線程可以等待所有的parties到達后再解除阻塞(類似與CountDownLatch) phaser.register();ExecutorService executor = Executors.newFixedThreadPool(parties);for (int i = 0; i < parties; i++) {phaser.register();// 每創(chuàng)建一個task,我們就注冊一個partyexecutor.execute(new Runnable() {@Overridepublic void run() {try {int i = 0;while (i < 3 && !phaser.isTerminated()) {//getPhase()獲取當(dāng)前phase周期數(shù)System.out.println("Generation:" + phaser.getPhase());Thread.sleep(3000);// 等待同一周期內(nèi),其他Task到達// 然后進入新的周期,并繼續(xù)同步進行 phaser.arriveAndAwaitAdvance();i++;// 我們假定,運行三個周期即可 }} catch (Exception e) {} finally {phaser.arriveAndDeregister();}}});}// 主線程到達,且注銷自己// 此后線程池中的線程即可開始按照周期,同步執(zhí)行。 phaser.arriveAndDeregister();executor.shutdown();} }?
三、API簡述
? ? ?1、Phaser():構(gòu)造函數(shù),創(chuàng)建一個Phaser;默認parties個數(shù)為0。此后我們可以通過register()、bulkRegister()方法來注冊新的parties。每個Phaser實例內(nèi)部,都持有幾個狀態(tài)數(shù)據(jù):termination狀態(tài)、已經(jīng)注冊的parties個數(shù)(registeredParties)、當(dāng)前phase下已到達的parties個數(shù)(arrivedParties)、當(dāng)前phase周期數(shù),還有2個同步阻塞隊列Queue。Queue中保存了所有的waiter,即因為advance而等待的線程信息;這兩個Queue分別為evenQ和oddQ,這兩個Queue在實現(xiàn)上沒有任何區(qū)別,Queue的元素為QNode,每個QNode保存一個waiter的信息,比如Thread引用、阻塞的phase、超時的deadline、是否支持interrupted響應(yīng)等。兩個Queue,其中一個保存當(dāng)前phase中正在使用的waiter,另一個備用,當(dāng)phase為奇數(shù)時使用evenQ、oddQ備用,偶數(shù)時相反,即兩個Queue輪換使用。當(dāng)advance事件觸發(fā)期間,新register的parties將會被放在備用的Queue中,advance只需要響應(yīng)另一個Queue中的waiters即可,避免出現(xiàn)混亂。
?
? ? 2、Phaser(int parties):構(gòu)造函數(shù),初始一定數(shù)量的parties;相當(dāng)于直接regsiter此數(shù)量的parties。
? ? 3、arrive():到達,阻塞,等到當(dāng)前phase下其他parties到達。如果沒有register(即已register數(shù)量為0),調(diào)用此方法將會拋出異常,此方法返回當(dāng)前phase周期數(shù),如果Phaser已經(jīng)終止,則返回負數(shù)。
? ? 4、arriveAndDeregister():到達,并注銷一個parties數(shù)量,非阻塞方法。注銷,將會導(dǎo)致Phaser內(nèi)部的parties個數(shù)減一(只影響當(dāng)前phase),即下一個phase需要等待arrive的parties數(shù)量將減一。異常機制和返回值,與arrive方法一致。
? ? 5、arriveAndAwaitAdvance():到達,且阻塞直到其他parties都到達,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞機制支持timeout、interrupted響應(yīng),可以使用類似的其他方法(參見下文)。如果你希望到達后且注銷,而且阻塞等到當(dāng)前phase下其他的parties到達,可以使用awaitAdvance(arriveAndDeregister())方法組合。此方法的異常機制和返回值同arrive()。
? ? 6、awaitAdvance(int phase):阻塞方法,等待phase周期數(shù)下其他所有的parties都到達。如果指定的phase與Phaser當(dāng)前的phase不一致,則立即返回。
? ? 7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted響應(yīng),即waiter線程如果被外部中斷,則此方法立即返回,并拋出InterrutedException。
? ? 8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout類型的interrupted響應(yīng),即當(dāng)前線程阻塞等待約定的時長,超時后以TimeoutException異常方式返回。
? ? 9、forceTermination():強制終止,此后Phaser對象將不可用,即register等將不再有效。此方法將會導(dǎo)致Queue中所有的waiter線程被喚醒。
? ? 10、register():新注冊一個party,導(dǎo)致Phaser內(nèi)部registerPaties數(shù)量加1;如果此時onAdvance方法正在執(zhí)行,此方法將會等待它執(zhí)行完畢后才會返回。此方法返回當(dāng)前的phase周期數(shù),如果Phaser已經(jīng)中斷,將會返回負數(shù)。
? ? 11、bulkRegister(int parties):批量注冊多個parties數(shù)組,規(guī)則同10、。
? ? 12、getArrivedParties():獲取已經(jīng)到達的parties個數(shù)。
? ? 13、getPhase():獲取當(dāng)前phase周期數(shù)。如果Phaser已經(jīng)中斷,則返回負值。
? ? 14、getRegisteredParties():獲取已經(jīng)注冊的parties個數(shù)。
? ? 15、getUnarrivedParties():獲取尚未到達的parties個數(shù)。
? ? 16、onAdvance(int phase,int registeredParties):這個方法比較特殊,表示當(dāng)進入下一個phase時可以進行的事件處理,如果返回true表示此Phaser應(yīng)該終止(此后將會把Phaser的狀態(tài)為termination,即isTermination()將返回true。),否則可以繼續(xù)進行。phase參數(shù)表示當(dāng)前周期數(shù),registeredParties表示當(dāng)前已經(jīng)注冊的parties個數(shù)。
? ? 默認實現(xiàn)為:return registeredParties == 0;在很多情況下,開發(fā)者可以通過重寫此方法,來實現(xiàn)自定義的advance時間處理機制。
?
? ? 內(nèi)部原理,比較簡單(簡述):
? ? 1)兩個計數(shù)器:分別表示parties個數(shù)和當(dāng)前phase。register和deregister會觸發(fā)parties變更(CAS),全部parties到達(arrive)會觸發(fā)phase變更。
? ? 2)一個主要的阻塞隊列:非AQS實現(xiàn),對于arriveAndWait的線程,會被添加到隊列中并被park阻塞,知道當(dāng)前phase中最后一個party到達后觸發(fā)喚醒。
轉(zhuǎn)載于:https://www.cnblogs.com/x-jingxin/p/10655164.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的高级同步器:可重用的同步屏障Phaser的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [原创]Enterprise Archi
- 下一篇: 2013网易实习生招聘笔试题