【多线程】线程池拒绝策略详解与自定义拒绝策略
線程池的拒絕策略
ThreadPoolExecutor內(nèi)部有實(shí)現(xiàn)4個(gè)拒絕策略,默認(rèn)為AbortPolicy策略
- CallerRunsPolicy:由調(diào)用execute方法提交任務(wù)的線程來(lái)執(zhí)行這個(gè)任務(wù)
- AbortPolicy:拋出異常RejectedExecutionException拒絕提交任務(wù)
- DiscardPolicy:直接拋棄任務(wù),不做任何處理
- DiscardOldestPolicy:去除任務(wù)隊(duì)列中的第一個(gè)任務(wù),重新提交
線程池中,有三個(gè)重要的參數(shù),決定影響了拒絕策略:corePoolSize - 核心線程數(shù),也即最小的線程數(shù)。workQueue - 阻塞隊(duì)列 。 maximumPoolSize - 最大線程數(shù)
當(dāng)提交任務(wù)數(shù)大于 corePoolSize 的時(shí)候,會(huì)優(yōu)先將任務(wù)放到 workQueue 阻塞隊(duì)列中。當(dāng)阻塞隊(duì)列飽和后,會(huì)擴(kuò)充線程池中線程數(shù),直到達(dá)到 maximumPoolSize 最大線程數(shù)配置。此時(shí),再多余的任務(wù),則會(huì)觸發(fā)線程池的拒絕策略了。
總結(jié)起來(lái),也就是一句話,當(dāng)提交的任務(wù)數(shù)大于(workQueue.size() + maximumPoolSize ),就會(huì)觸發(fā)線程池的拒絕策略。
拒絕策略的源碼
CallerRunsPolicy
/*** A handler for rejected tasks that runs the rejected task* directly in the calling thread of the {@code execute} method,* unless the executor has been shut down, in which case the task* is discarded.* 用于拒絕任務(wù)的處理程序,* 可以直接在{@code execute}方法的調(diào)用線程中運(yùn)行被拒絕的任務(wù)* 除非執(zhí)行器已被關(guān)閉,否則將丟棄該任務(wù)。*/public static class CallerRunsPolicy implements RejectedExecutionHandler {/*** Creates a {@code CallerRunsPolicy}.* 創(chuàng)建一個(gè){@code CallerRunsPolicy}。*/public CallerRunsPolicy() { }/*** Executes task r in the caller's thread, unless the executor* has been shut down, in which case the task is discarded.* 除非執(zhí)行器已關(guān)閉,否則在調(diào)用者線程中執(zhí)行任務(wù),* r 在這種情況下,該任務(wù)將被丟棄。** @param r the runnable task requested to be executed* r 請(qǐng)求執(zhí)行的可運(yùn)行任務(wù)* @param e the executor attempting to execute this task* e 嘗試執(zhí)行此任務(wù)的執(zhí)行者*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}分析:
CallerRunsPolicy:線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡(jiǎn)單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
這個(gè)策略顯然不想放棄執(zhí)行任務(wù)。但是由于池中已經(jīng)沒(méi)有任何資源了,那么就直接使用調(diào)用該execute的線程本身來(lái)執(zhí)行。(開(kāi)始我總不想丟棄任務(wù)的執(zhí)行,但是對(duì)某些應(yīng)用場(chǎng)景來(lái)講,很有可能造成當(dāng)前線程也被阻塞。如果所有線程都是不能執(zhí)行的,很可能導(dǎo)致程序沒(méi)法繼續(xù)跑了。需要視業(yè)務(wù)情景而定吧。)
這樣生產(chǎn)者雖然沒(méi)有被阻塞,但提交任務(wù)也會(huì)被暫停。
但這種策略也有隱患,當(dāng)生產(chǎn)者較少時(shí),生產(chǎn)者消費(fèi)任務(wù)的時(shí)間里,消費(fèi)者可能已經(jīng)把任務(wù)都消費(fèi)完了,隊(duì)列處于空狀態(tài),當(dāng)生產(chǎn)者執(zhí)行完任務(wù)后才能再繼續(xù)生產(chǎn)任務(wù),這個(gè)過(guò)程中可能導(dǎo)致消費(fèi)者線程的饑餓。
AbortPolicy
/*** A handler for rejected tasks that throws a* {@code RejectedExecutionException}.* 拋出{@code RejectedExecutionException}的拒絕任務(wù)處理程序。*/public static class AbortPolicy implements RejectedExecutionHandler {/*** Creates an {@code AbortPolicy}.*/public AbortPolicy() { }/*** Always throws RejectedExecutionException.* 總是拋出RejectedExecutionException* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task* @throws RejectedExecutionException always*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}分析:
該策略是默認(rèn)飽和策略。
使用該策略時(shí)在飽和時(shí)會(huì)拋出RejectedExecutionException(繼承自RuntimeException),調(diào)用者可捕獲該異常自行處理。
DiscardPolicy
/*** A handler for rejected tasks that silently discards the* rejected task.* 拒絕任務(wù)的處理程序,默認(rèn)丟棄拒絕任務(wù)。*/public static class DiscardPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardPolicy}.*/public DiscardPolicy() { }/*** Does nothing, which has the effect of discarding task r.* 不執(zhí)行任何操作,這具有丟棄任務(wù) r 的作用。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}分析:
如代碼所示,不做任何處理直接拋棄任務(wù)
DiscardOldestPolicy
/*** A handler for rejected tasks that discards the oldest unhandled* request and then retries {@code execute}, unless the executor* is shut down, in which case the task is discarded.* 處理被拒絕任務(wù)的處理程序,它丟棄最舊的未處理請(qǐng)求,* 然后重試{@code execute},* 除非執(zhí)行器*被關(guān)閉,在這種情況下,該任務(wù)將被丟棄。*/public static class DiscardOldestPolicy implements RejectedExecutionHandler {/*** Creates a {@code DiscardOldestPolicy} for the given executor.*/public DiscardOldestPolicy() { }/*** Obtains and ignores the next task that the executor* would otherwise execute, if one is immediately available,* and then retries execution of task r, unless the executor* is shut down, in which case task r is instead discarded.* 獲取并忽略執(zhí)行者*會(huì)立即執(zhí)行的下一個(gè)任務(wù)(如果一個(gè)任務(wù)立即可用),* 然后重試任務(wù)r的執(zhí)行,除非執(zhí)行者*被關(guān)閉,在這種情況下,任務(wù)r會(huì)被丟棄。* @param r the runnable task requested to be executed* @param e the executor attempting to execute this task*/public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}分析:
如代碼,先將阻塞隊(duì)列中的頭元素出隊(duì)拋棄,再嘗試提交任務(wù)。如果此時(shí)阻塞隊(duì)列使用PriorityBlockingQueue優(yōu)先級(jí)隊(duì)列,將會(huì)導(dǎo)致優(yōu)先級(jí)最高的任務(wù)被拋棄,因此不建議將該種策略配合優(yōu)先級(jí)隊(duì)列使用。
自定義策略
看完發(fā)現(xiàn)默認(rèn)的幾個(gè)拒絕策略并不是特別的友好,那么可不可以咱們自己搞個(gè)呢?
可以發(fā)現(xiàn),所有的拒絕策略都是實(shí)現(xiàn)了 RejectedExecutionHandler 接口
public interface RejectedExecutionHandler {/*** Method that may be invoked by a {@link ThreadPoolExecutor} when* {@link ThreadPoolExecutor#execute execute} cannot accept a* task. This may occur when no more threads or queue slots are* available because their bounds would be exceeded, or upon* shutdown of the Executor.** <p>In the absence of other alternatives, the method may throw* an unchecked {@link RejectedExecutionException}, which will be* propagated to the caller of {@code execute}.** @param r the runnable task requested to be executed* @param executor the executor attempting to execute this task* @throws RejectedExecutionException if there is no remedy*/void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }這個(gè)接口只有一個(gè) rejectedExecution 方法。
r 為待執(zhí)行任務(wù);executor 為線程池;方法可能會(huì)拋出拒絕異常。
那么咱們就可以通過(guò)實(shí)現(xiàn) RejectedExecutionHandler 接口擴(kuò)展
兩個(gè)栗子:一
netty自己實(shí)現(xiàn)的線程池里面私有的一個(gè)拒絕策略。單獨(dú)啟動(dòng)一個(gè)新的臨時(shí)線程來(lái)執(zhí)行任務(wù)。
private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {try {final Thread t = new Thread(r, "Temporary task executor");t.start();} catch (Throwable e) {throw new RejectedExecutionException("Failed to start a new thread", e);}}}兩個(gè)栗子:二
dubbo的一個(gè)例子,它直接繼承的 AbortPolicy ,加強(qiáng)了日志輸出,并且輸出dump文件
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor e) {String msg = String.format("Thread pool is EXHAUSTED!" +" Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +" Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),url.getProtocol(), url.getIp(), url.getPort());logger.warn(msg);dumpJStack();throw new RejectedExecutionException(msg);} }自己玩
參考類似的思路,最簡(jiǎn)單的做法,我們可以直接定義一個(gè)RejectedExecutionHandler,當(dāng)隊(duì)列滿時(shí)改為調(diào)用BlockingQueue.put來(lái)實(shí)現(xiàn)生產(chǎn)者的阻塞:
new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {executor.getQueue().put(r);} catch (InterruptedException e) {// should not be interrupted}}}};這樣,我們就無(wú)需再關(guān)心Queue和Consumer的邏輯,只要把精力集中在生產(chǎn)者和消費(fèi)者線程的實(shí)現(xiàn)邏輯上,只管往線程池提交任務(wù)就行了。
相比最初的設(shè)計(jì),這種方式的代碼量能減少不少,而且能避免并發(fā)環(huán)境的很多問(wèn)題。當(dāng)然,你也可以采用另外的手段,例如在提交時(shí)采用信號(hào)量做入口限制等,但是如果僅僅是要讓生產(chǎn)者阻塞,那就顯得復(fù)雜了。
總結(jié)
四種線程池拒絕策略,具體使用哪種策略,還得根據(jù)實(shí)際業(yè)務(wù)場(chǎng)景才能做出抉擇。
總結(jié)
以上是生活随笔為你收集整理的【多线程】线程池拒绝策略详解与自定义拒绝策略的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 【多线程】ThreadPoolExecu
- 下一篇: 【MySQL】MySQL开发注意事项与S