线程池任务拒绝策略
http://www.blogjava.net/xylz/archive/2011/01/08/342609.html
上一節中提到關閉線程池過程中需要對新提交的任務進行處理。這個是java.util.concurrent.RejectedExecutionHandler處理的邏輯。
?
在沒有分析線程池原理之前先來分析下為什么有任務拒絕的情況發生。
這里先假設一個前提:線程池有一個任務隊列,用于緩存所有待處理的任務,正在處理的任務將從任務隊列中移除。因此在任務隊列長度有限的情況下就會出現新任務的拒絕處理問題,需要有一種策略來處理應該加入任務隊列卻因為隊列已滿無法加入的情況。另外在線程池關閉的時候也需要對任務加入隊列操作進行額外的協調處理。
?
RejectedExecutionHandler提供了四種方式來處理任務拒絕策略。
這四種策略是獨立無關的,是對任務拒絕處理的四中表現形式。最簡單的方式就是直接丟棄任務。但是卻有兩種方式,到底是該丟棄哪一個任務,比如可以丟棄當前將要加入隊列的任務本身(DiscardPolicy)或者丟棄任務隊列中最舊任務(DiscardOldestPolicy)。丟棄最舊任務也不是簡單的丟棄最舊的任務,而是有一些額外的處理。除了丟棄任務還可以直接拋出一個異常(RejectedExecutionException),這是比較簡單的方式。拋出異常的方式(AbortPolicy)盡管實現方式比較簡單,但是由于拋出一個RuntimeException,因此會中斷調用者的處理過程。除了拋出異常以外還可以不進入線程池執行,在這種方式(CallerRunsPolicy)中任務將有調用者線程去執行。
?
上面是一些理論知識,下面結合一些例子進行分析討論。
package?xylz.study.concurrency;import?java.lang.reflect.Field;
import?java.util.concurrent.ArrayBlockingQueue;
import?java.util.concurrent.ThreadPoolExecutor;
import?java.util.concurrent.TimeUnit;
import?java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import?java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
public?class?ExecutorServiceDemo {
????static?void?log(String msg) {
??????? System.out.println(System.currentTimeMillis()?+?"?->?"?+?msg);
??? }
????static?int?getThreadPoolRunState(ThreadPoolExecutor pool)?throws?Exception {
??????? Field f?=?ThreadPoolExecutor.class.getDeclaredField("runState");
??????? f.setAccessible(true);
????????int?v?=?f.getInt(pool);
????????return?v;
??? }
????public?static?void?main(String[] args)?throws?Exception {
??????? ThreadPoolExecutor pool?=?new?ThreadPoolExecutor(1,?1,?0, TimeUnit.SECONDS,
????????????????new?ArrayBlockingQueue<Runnable>(1));
??????? pool.setRejectedExecutionHandler(new?ThreadPoolExecutor.DiscardPolicy());
????????for?(int?i?=?0; i?<?10; i++) {
????????????final?int?index?=?i;
??????????? pool.submit(new?Runnable() {
????????????????public?void?run() {
??????????????????? log("run task:"?+?index?+?"?->?"?+?Thread.currentThread().getName());
????????????????????try?{
??????????????????????? Thread.sleep(1000L);
??????????????????? }?catch?(Exception e) {
??????????????????????? e.printStackTrace();
??????????????????? }
??????????????????? log("run over:"?+?index?+?"?->?"?+?Thread.currentThread().getName());
??????????????? }
??????????? });
??????? }
??????? log("before sleep");
??????? Thread.sleep(4000L);
??????? log("before shutdown()");
??????? pool.shutdown();
??????? log("after shutdown(),pool.isTerminated="?+?pool.isTerminated());
??????? pool.awaitTermination(1000L, TimeUnit.SECONDS);
??????? log("now,pool.isTerminated="?+?pool.isTerminated()?+?", state="
????????????????+?getThreadPoolRunState(pool));
??? }
}
?
?
第一種方式直接丟棄(DiscardPolicy)的輸出結果是:
1294494050696?->?run task:01294494050696?->?before sleep
1294494051697?->?run over:0?->?pool-1-thread-1
1294494051697?->?run task:1
1294494052697?->?run over:1?->?pool-1-thread-1
1294494054697?->?before shutdown()
1294494054697?->?after shutdown(),pool.isTerminated=false
1294494054698?->?now,pool.isTerminated=true, state=3
?
對于上面的結果需要補充幾點。
如果把策略換成丟棄最舊任務(DiscardOldestPolicy),結果會稍有不同。
1294494484622?->?run task:01294494484622?->?before sleep
1294494485622?->?run over:0?->?pool-1-thread-1
1294494485622?->?run task:9
1294494486622?->?run over:9?->?pool-1-thread-1
1294494488622?->?before shutdown()
1294494488622?->?after shutdown(),pool.isTerminated=false
1294494488623?->?now,pool.isTerminated=true, state=3
?
這里依然只是執行兩個任務,但是換成了任務task0和task9。實際上task1~task8還是進入了任務隊列,只不過被task9擠出去了。
對于異常策略(AbortPolicy)就比較簡單,這回調用線程的任務執行。
對于調用線程執行方式(CallerRunsPolicy),輸出的結果就有意思了。
1294496076266?->?run task:2?->?main1294496076266?->?run task:0?->?pool-1-thread-1
1294496077266?->?run over:0?->?pool-1-thread-1
1294496077266?->?run task:1?->?pool-1-thread-1
1294496077266?->?run over:2?->?main
1294496077266?->?run task:4?->?main
1294496078267?->?run over:4?->?main
1294496078267?->?run task:5?->?main
1294496078267?->?run over:1?->?pool-1-thread-1
1294496078267?->?run task:3?->?pool-1-thread-1
1294496079267?->?run over:3?->?pool-1-thread-1
1294496079267?->?run over:5?->?main
1294496079267?->?run task:7?->?main
1294496079267?->?run task:6?->?pool-1-thread-1
1294496080267?->?run over:7?->?main
1294496080267?->?run task:9?->?main
1294496080267?->?run over:6?->?pool-1-thread-1
1294496080267?->?run task:8?->?pool-1-thread-1
1294496081268?->?run over:9?->?main
1294496081268?->?before sleep
1294496081268?->?run over:8?->?pool-1-thread-1
1294496085268?->?before shutdown()
1294496085268?->?after shutdown(),pool.isTerminated=false
1294496085269?->?now,pool.isTerminated=true, state=3
?
由于啟動線程有稍微的延時,因此一種可能的執行順序是這樣的。
?
如果有興趣可以看看ThreadPoolExecutor中四種RejectedExecutionHandler的源碼,都非常簡單。
轉載于:https://www.cnblogs.com/davidwang456/articles/10523348.html
總結
- 上一篇: Thrown KeeperErrorCo
- 下一篇: zookeeper服务发现实战及原理--