线程执行器
為什么80%的碼農都做不了架構師?>>> ??
????通常我們使用JAVA來開發一個簡單的并發應用時,會創建一些Runnable對象,然后創建對應的Thread對象來執行他們,但是,如果需要開發一個程序需要運行大量并發任務的時候,這個方法顯然不合適。Java提供了執行器框架(Executor Framework)來解決這些問題。
????Executor Framework機制分離了任務的創建和執行。通過執行器,僅需要實現Runnable接口的對象,然后把這個對象發送給執行器即可。執行器通過創建所需要的線程來負責這些Runnable對象的創建、實例化以及運行。執行器使用了線程池來提高應用程序的性能。當發送一個任務執行器時,執行器會嘗試使用線程池中的線程來執行這個任務,避免了不斷地創建和銷毀線程而導致系統性能下降。
????執行器框架另一個重要的優勢是Callable接口。這個接口的主方法是call(),可以返回結果。當發送一個Callable對象給執行器時,將獲得一個實現了Future接口的對象。可以使用這個對象來控制Callable對象的狀態和結果。
1、創建線程執行器。
????使用執行器框架(Executor Framework)的第一步是創建ThreadPoolExecutor對象。可以使用ThreadPoolExecutor類提供的四個構造器或者使用Executor工廠類來創建ThreadPoolExecutor對象。一旦有了執行器,就可以將Runnable或者Callable對象發送給它去執行了。下面將用實例來演示Java創建線程執行器。
package?org.concurrency.executorframework; import?java.util.Date; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?定義一個任務類,實現Runnable接口*?只是定義,不執行*/ public?class?Task?implements?Runnable?{private?Date?initDate;//存儲任務創建時間private?String?name;//存儲任務的名稱public?Task()?{}public?Task(String?name)?{initDate?=?new?Date();this.name?=?name;}@Overridepublic?void?run()?{//?TODO?Auto-generated?method?stubSystem.out.printf("%s:?Task?%s?Created?on:?%s\n",Thread.currentThread().getName(),name,initDate);System.out.printf("%s:?Task?%s?Started?on:?%s\n",Thread.currentThread().getName(),name,initDate);try?{Long?duration?=?(long)(Math.random()*10);System.out.printf("%s:?Task?%s:?Doing?a?task?during?%d?seconds\n",Thread.currentThread().getName(),name,duration);TimeUnit.SECONDS.sleep(duration);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("%s:?Task?%s:?Finished?on:?%s\n",Thread.currentThread().getName(),name,new?Date());} }package?org.concurrency.executorframework; import?java.util.concurrent.Executors; import?java.util.concurrent.ThreadPoolExecutor; /***?@author?Administrator*?它將執行通過執行器接收到的每一個任務。*/ public?class?Server?{private?ThreadPoolExecutor?executor;public?Server()?{/*線程執行器的創建有兩個方式:*?一個是直接使用ThreadPoolExecutor的構造器來實現*?一個是通過Executors工廠類來構造執行器和其他相關對象。*?但是由于TheadPoolExecutor構造器在使用上的復雜性,推薦使用Executors工廠類類創建。*?這里使用了Executors工廠類的newCacheThreadPoolExecutor()方法來創建一個緩存線程池*?返回一個ExecutorService對象,因此被強制轉換成ThreadPoolExecutor類型。*?使用線程池的優點是減少新建線程所花費的時間。此類緩存池的缺點是,如果發送過多任務給執行器,系統的復合會過載。*?當且僅當線程的數量是合理的,或者線程只會運行很短的時間時,適合采用緩存線程池類。*?*/executor?=?(ThreadPoolExecutor)?Executors.newCachedThreadPool();}/***?創建了執行器之后,就可以使用執行器的execute()方法來發送Runnable或者Callable類型的任務。*?這里的Task是實現了Runnable接口的對象。*?這里也有一些執行器相關的日志信息:*?getPoolSize():返回執行器線程池中實際的線程數*?getActiveCount():返回執行器中正在執行任務的線程數*?getCompleteTaskCount():返回執行器中已經完成的任務數*?*/public?void?executeTask(Task?task){System.out.printf("Server:?A?new?task?hs?arrived\n");executor.execute(task);System.out.printf("Server:?Pool?Size:?%d\n",executor.getPoolSize());System.out.printf("Server:?Active?Count:?%d\n",executor.getActiveCount());System.out.printf("Server:?Completed?Tasks:?%d\n",executor.getCompletedTaskCount());}/***?執行器以及ThreadPoolExecutor類一個重要的特性是,通常需要顯示地區結束,如果不這樣做,那么執行器將繼續執行。*?為了完成執行器的執行,可以使用ThreadPoolExecutor類的shutdown()方法。當執行器執行完所有待運行的任務,它將結束執行。*?如果再shutdown()方法之后,有新的任務發送給執行器,那么會報出RejectExecutionException異常。*?*/public?void?endServer(){executor.shutdown();} }package?org.concurrency.executorframework; /***?@author?Administrator*?main主程序,循環創建Task*/ public?class?Task_Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stubServer?server?=?new?Server();for(int?i?=?0;i?<?100;i++){Task?task?=?new?Task("Task"+i);server.executeTask(task);}server.endServer();} }執行結果:????
ThreadPoolExecutor類提供了其他結束執行器的方法:
shutdownNow():這個方法會立即關閉執行器。執行器將不再執行那些正在等待執行的任務。這個方法將返回等待執行的任務列表。調用時,正在執行的任務將繼續執行,但這個方法不等待這個任務的完成。
isTerminated():如果調用了shutdown()或shutdownNow()方法,并且執行器完成了關閉過程,那么這個方法將返回true。
isShutdown():如果調用了shutdown()方法,則返回true。
awaitTermination(long timeout,TimeUnit unit):這個方法將阻塞所調用的線程,知道執行器完成任務或者達到所指定的timeout值。
2、創建固定大小的線程執行器
????當使用Executors類的newCachedThreadPool()方法創建的ThreadPoolExecutor時,執行器運行過程中將碰到線程數量問題。如果線程池中沒有空閑的線程可用,那么執行器將為接收到的每一個任務創建一個新的線程,當發送大量的任務給執行器并且任務需要持續較長的時間時,系統將會超負荷,應用程序也將隨之不佳。
????為了避免這個問題,Executors工廠類提供了一個方法來床架一個固定大小的線程執行器。這個執行器有一個線程數的最大值,如果發送超過這個最大值的任務給執行器,執行器將不會創建額外的線程,剩下的任務將被阻塞直到執行器有空閑的線程可用。這個特性可以保證執行器不會給應用程序帶來性能不佳的問題。
????可以對上述示例進行修改
public?Server()?{executor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(5);}public?void?executeTask(Task?task){System.out.printf("Server:?A?new?task?hs?arrived\n");executor.execute(task);System.out.printf("Server:?Pool?Size:?%d\n",executor.getPoolSize());System.out.printf("Server:?Active?Count:?%d\n",executor.getActiveCount());System.out.printf("Server:?Completed?Tasks:?%d\n",executor.getCompletedTaskCount());System.out.printf("Server:?Task?Count:?%d\n",executor.getTaskCount());}????在這個示例中使用了Executors工廠類的newFixedThreadPool()方法來創建執行器。這個方法創建了具有線程數量最大值的執行器。如果發送超過線程數的任務給執行器,剩余的任務將被阻塞知道線程池里有空閑的線程來處理他們。
3、在執行器中執行任務并返回結果
????執行器框架(Executor Framework)的優勢之一是,可以運行并發任務并返回結果。Callable:這個接口聲明了call()方法。可以在這個方法里實現任務的具體邏輯操作。Callable接口是一個泛型接口,這意味著必須聲明call()方法返回的數據類型。Future:這個接口聲明了一些方法來獲取由Callable對象產生的結果,并管理它們的狀態。
package?org.concurrency.executorframework.callable; import?java.util.ArrayList; import?java.util.List; import?java.util.Random; import?java.util.concurrent.Callable; import?java.util.concurrent.ExecutionException; import?java.util.concurrent.Executors; import?java.util.concurrent.Future; import?java.util.concurrent.ThreadPoolExecutor; import?java.util.concurrent.TimeUnit; public?class?FactorialCalculator?implements?Callable<Integer>?{private?Integer?number;//存儲任務即將用來計算的數字public?FactorialCalculator(Integer?number)?{this.number?=?number;}@Overridepublic?Integer?call()?throws?Exception?{//?TODO?Auto-generated?method?stubint?result?=?1;if(number?==0?||?number?==1){result?=?1;}else{for(int?i?=2;i<number;i++){result?*=?i;TimeUnit.MILLISECONDS.sleep(20);}}System.out.printf("%s:?%d\n",Thread.currentThread().getName(),result);return?result;}public?static?void?main(String[]?args)?{/*通過Executors工廠類的newFixedThreadPool()方法創建ThreadPoolExecutor執行器來運行任務。這里最多創建2個線程*/ThreadPoolExecutor?executor?=?(ThreadPoolExecutor)?Executors.newFixedThreadPool(2);List<Future<Integer>>?resultList?=?new?ArrayList<Future<Integer>>();Random?random?=?new?Random();for(int?i=0;i<10;i++){int?number?=?random.nextInt(10);FactorialCalculator?calculator?=?new?FactorialCalculator(number);Future<Integer>?result?=?executor.submit(calculator);resultList.add(result);}do{System.out.printf("Main:?Number?of?Completed?Tasks:%d\n",executor.getCompletedTaskCount());for(int?i=0;i<resultList.size();i++){Future<Integer>?result?=?resultList.get(i);System.out.printf("Main:?Task?%d:?%s\n",i,result.isDone());try?{TimeUnit.MILLISECONDS.sleep(50);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}}}while(executor.getCompletedTaskCount()?<?resultList.size());System.out.printf("Main:?Results\n");for(int?i?=0;i<resultList.size();i++){Future<Integer>?result?=?resultList.get(i);Integer?number?=?null;try?{number?=?result.get();}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}?catch?(ExecutionException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("Main:?Task?%d\n",i,number);}executor.shutdown();} }????在本節中我們學習了如何使用Callable接口來啟動并發任務并返回結果。我們編寫了FactorialCaculator類,它實現了帶有泛型參數Integer類型的Callable接口。因此,這個Integer類型將作為調用call()方法時返回的類型。
????我們通過submit()方法發送一個Callable對象給執行去執行,這個submit()方法接收Callable對象作為參數,并返回Future對象。Future對象可以用于以下兩個目的。
控制任務狀態:可以取消任務或者檢查任務是否已經完成。為了達到這個目的,可使用isDone()方法來檢查任務是否已經完成。
公國call()方法獲取返回結果。為了達到這個目的,可以使用get()方法。這個方法一直等待直到Callable對象的call()方法執行完成并返回結果。如果get()方法在等待結果時中斷了,則會拋出異常。如果call()方法拋出異常,那個get()也會拋出異常。
4、運行多個任務并處理第一個結果
????并發編程中比較常見的一個問題是,當采用多個并發任務解決一個問題時,往往只關系這些任務的第一個結果。例如允許兩種驗證機制,只要有一種驗證機制成功,那么就驗證通過。這主要是用到了ThreadPoolExecutor類的invokeAny()方法。
5、運行多個任務并處理所有結果。
????執行器框架(Executor Framework)允許執行并發任務而不需要去考慮線程創建和執行。它還提供了可以用來控制在執行器中執行任務的狀態和獲取任務結果的Future類。
6、在執行器中周期性執行任務。
????執行器框架提供了ThreadPoolExecutor類,通過線程池來執行并發任務從而避免了執行所有線程的創建操作。當一個任務給執行器后,根據執行器的配置,它將盡快地執行這個任務。當任務執行結束后,這個任務就會從執行器中刪除;如果想再次執行這個任務,則需要再次發送這個任務到執行器。
????但是執行器框架提供了ScheduledThreadPoolExecutor類來執行周期性的任務。通過Executors工廠類的newScheduledThreadPoolExecutor()方法創建ScheduledThreadPoolExecutor執行器對象。這個方法接收一個表示線程中的線程數類作參數。一旦有了可以執行周期性的執行器,就可以發送任務給這個執行器。使用scheduledAtFixedRate()方法發送任務。scheduledAtFixedRate()方法返回一個ScheduledFuture對象,ScheduledFuture接口則擴展了Future接口,于是它帶有了定時任務的相關操作方法。使用getDelay()方法返回任務到下一次執行時所要等待的剩余時間。我們將通過一個實例來演示周期性執行任務
package?org.concurrency.executorframework.scheduled; import?java.util.Date; /***?@author?Administrator*?創建任務線程*/ public?class?Task?implements?Runnable?{private?String?name;public?Task(String?name)?{this.name?=?name;} //?@Override //?public?String?call()?throws?Exception?{ //??//?TODO?Auto-generated?method?stub //??System.out.printf("%s:?Starting?at?:?%s\n",name,new?Date()); //??return?"Hello,world"; //?}@Overridepublic?void?run()?{//?TODO?Auto-generated?method?stubSystem.out.printf("%s:?Starting?at?:?%s\n",name,new?Date()); //??return?"Hello,world";} }package?org.concurrency.executorframework.scheduled; import?java.util.Date; import?java.util.concurrent.Executors; import?java.util.concurrent.ScheduledFuture; import?java.util.concurrent.ScheduledThreadPoolExecutor; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?主線程類*/ public?class?Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stub/*使用scheduledThreadPoolExecutor()方法創建ScheduledExecutorService對象,并轉化為ScheduledThreadPoolExecutor*?這個方法接收一個表示線程數量的整數作為參數。*?*/ScheduledThreadPoolExecutor?executor?=?(ScheduledThreadPoolExecutor)?Executors.newScheduledThreadPool(1);System.out.printf("Main:?Starting?at:?%s\n",new?Date());Task?task?=?new?Task("Task");/**?使用scheduledAtFixedRate()方法發送任務。這個方法接收四個參數*?1.被周期執行的任務*?2.執行第一次任務執行后的延時時間*?3.兩次執行的時間周期*?4.第2個和第3個參數的時間單位*?兩次執行之間的周期是指任務咋兩次執行開始的時間間隔。*?這期間可能會存在多個任務實例*?*/ScheduledFuture<?>?result?=?executor.scheduleAtFixedRate(task,?1,?2,?TimeUnit.SECONDS);for(int?i?=?0;i<10;i++){System.out.printf("Main:?Delay:?%d\n",result.getDelay(TimeUnit.MILLISECONDS));try?{TimeUnit.MILLISECONDS.sleep(500);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}}executor.shutdown();try?{TimeUnit.SECONDS.sleep(5);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.printf("Main:?Finished?at:?%s\n",new?Date());} }????執行結果截圖:
7、在執行器中取消任務。如果需要取消已經發送給執行器的任務,則需要使用Future接口的cancle()方法來執行取消操作。
8、在執行器中控制任務的完成。FutureTask類中提供了一個名為done()方法,允許在執行器中的任務執行結束后還可以執行一些代碼。例如生成報表,通過郵件發送結果或釋放一些系統資源等。我們可以可以覆蓋FutureTask類的done()方法來控制任務的完成。
9、在執行器中分離任務的啟動和出結果的處理
????通常情況下,使用執行器執行并發任務時,將Runnable或Callable任務發送給執行器,并獲得Future對象來控制任務。此外,還會碰到如下情形,需要在一個對象里發送任務給執行器,然后唉另一個對象里處理結果。對于這種情況,Java API提供了CompletionService類。
????CompletionService類有一個方法用來發送任務給執行器,還有一個方法為下一個已經執行結束的任務獲取Future對象。
????我們將通過一個實例學習如何使用CompletionService類,在執行器中分離任務的啟動與結果的處理。
package?org.concurrency.executorframework.callable; import?java.util.concurrent.Callable; import?java.util.concurrent.TimeUnit; /***?@author?Administrator**/ public?class?ReportGenerator?implements?Callable<String>?{/*用來表示數據和報告*/private?String?sender;private?String?title;public?ReportGenerator(String?sender,?String?title)?{super();this.sender?=?sender;this.title?=?title;}@Overridepublic?String?call()?throws?Exception?{//?TODO?讓線程休眠一段隨機時間long?duration?=?(long)?(Math.random()*10);System.out.printf("%s_%s:?ReportGenerator:Generating?a?report?during?%d?seconds\n",this.sender,this.title,duration);TimeUnit.SECONDS.sleep(duration);String?ret?=?sender+":"+title;return?ret;} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; /***?@author?Administrator*?用來模擬請求報告*/ public?class?ReportRequest?implements?Runnable?{private?String?name;private?CompletionService<String>?service;public?ReportRequest(String?name,?CompletionService<String>?service)?{this.name?=?name;this.service?=?service;}@Overridepublic?void?run()?{//?TODO?創建了ReportGenerator對象,并使用submit()方法將此對對象發送給CompletionService。ReportGenerator?reportGenerator?=?new?ReportGenerator(name,?"Report");service.submit(reportGenerator);} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; import?java.util.concurrent.ExecutionException; import?java.util.concurrent.Future; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?這個類將獲取ReportGenerator任務的結果*/ public?class?ReportProcessor?implements?Runnable?{private?CompletionService<String>?service;private?boolean?end;public?ReportProcessor(CompletionService<String>?service)?{this.service?=?service;end?=?false;}@Overridepublic?void?run()?{//?TODO?獲取下一個已經完成任務的Future對象;當然這個任務是采用CompletionService來完成/*當?*完成服務*任務結束,這些任務中的一個任務就執行結束了,完成服務中存儲著Future對象,用來空載它在隊列中的隊形?*?調用poll()方法訪問這個隊列,查看是否有任務已經完成,如果有就返回隊列中的第一個元素,即一個任務執行完成后的Future對象。*?當poll()方法返回Future對象后,它將從隊列中刪除這個Future對象。*?*/while(!end){try?{Future<String>?result?=?service.poll(20,?TimeUnit.SECONDS);if(result?!=?null){String?report?=?result.get();System.out.println("ReportReciver:Report?Received:"+?report);}}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}?catch?(ExecutionException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.println("ReportSender:?End");}}public?void?setEnd(boolean?end)?{this.end?=?end;} }package?org.concurrency.executorframework.callable; import?java.util.concurrent.CompletionService; import?java.util.concurrent.ExecutorCompletionService; import?java.util.concurrent.ExecutorService; import?java.util.concurrent.Executors; import?java.util.concurrent.TimeUnit; /***?@author?Administrator*?線程啟動類*/ public?class?Main?{public?static?void?main(String[]?args)?{//?TODO?Auto-generated?method?stubExecutorService?executor?=?Executors.newCachedThreadPool();CompletionService<String>?service?=?new?ExecutorCompletionService<>(executor);ReportRequest?faceRequest?=?new?ReportRequest("Face",?service);ReportRequest?onlineRequest?=?new?ReportRequest("Online",?service);Thread?faceThread?=?new?Thread(faceRequest);Thread?onlineThread?=?new?Thread(onlineRequest);ReportProcessor?processor?=?new?ReportProcessor(service);Thread?senderThread?=?new?Thread(processor);System.out.println("Main:?Staring?the?Threads");faceThread.start();onlineThread.start();senderThread.start();try?{System.out.println("Main:?Waiting?for?the?reportgenerators.");faceThread.join();onlineThread.join();}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}System.out.println("Main:?Shutting?down?the?executor.");executor.shutdown();try?{/*調用awaitTerminated()方法等待所有任務執行結束*/executor.awaitTermination(1,?TimeUnit.DAYS);}?catch?(InterruptedException?e)?{//?TODO?Auto-generated?catch?blocke.printStackTrace();}processor.setEnd(true);System.out.println("Main:Ends");} }執行結果截圖:
10、處理在執行器中被拒絕的任務。
當我們想結束執行器的執行時,調用shutdown()方法來表示執行器應當結束,但是,執行器只有等待正在運行的任務或者等待執行的任務結束后,才能真正結束。如果在此期間發送給一個任務給執行器,這個任務會被拒絕,ThreadPoolExecutor提供了一套機制來處理被拒絕的任務。這些任務實現了RejectExecutionHandler接口。
本文出自 “阿酷博客源” 博客,請務必保留此出處http://aku28907.blog.51cto.com/5668513/1788500
轉載于:https://my.oschina.net/mrku/blog/693731
總結
- 上一篇: dedecms--需要注意的细节
- 下一篇: 重构随笔——重构的原则