多线程:线程池
深入淺出線程池
jdk1.5引入Executor線程池框架,通過它把任務(wù)的提交和執(zhí)行進(jìn)行解耦,只需要定義好任務(wù),然后提交給線程池,而不用關(guān)心該任務(wù)是如何執(zhí)行、被哪個線程執(zhí)行,以及什么時候執(zhí)行。
初始化線程池(4種)
簡介:
Java線程池的工廠類:Executors類,
初始化4種類型的線程池:
newFixedThreadPool()
說明:初始化一個指定線程數(shù)的線程池,其中corePoolSize== maxiPoolSize,使用LinkedBlockingQuene作為阻塞隊列
特點:即使當(dāng)線程池沒有可執(zhí)行任務(wù)時,也不會釋放線程。
newCachedThreadPool()
說明:初始化一個可以緩存線程的線程池,默認(rèn)緩存60s,線程池的線程數(shù)可達(dá)到Integer.MAX_VALUE,即2147483647,內(nèi)部使用SynchronousQueue作為阻塞隊列;
特點:在沒有任務(wù)執(zhí)行時,當(dāng)線程的空閑時間超過keepAliveTime,會自動釋放線程資源;當(dāng)提交新任務(wù)時,如果沒有空閑線程,則創(chuàng)建新線程執(zhí)行任務(wù),會導(dǎo)致一定的系統(tǒng)開銷;
因此,使用時要注意控制并發(fā)的任務(wù)數(shù),防止因創(chuàng)建大量的線程導(dǎo)致而降低性能。
newSingleThreadExecutor()
說明:初始化只有一個線程的線程池,內(nèi)部使用LinkedBlockingQueue作為阻塞隊列。
特點:如果該線程異常結(jié)束,會重新創(chuàng)建一個新的線程繼續(xù)執(zhí)行任務(wù),唯一的線程可以保證所提交任務(wù)的順序執(zhí)行
newScheduledThreadPool()
特點:初始化的線程池可以在指定的時間內(nèi)周期性的執(zhí)行所提交的任務(wù),在實際的業(yè)務(wù)場景中可以使用該線程池定期的同步數(shù)據(jù)。
1.CacheThreadPool?
這是一個線程數(shù)變動性非常強的線程池,默認(rèn)配置下,它可以開啟無限多個線程(Integer.maxSize 和 JVM允許線程數(shù)范圍內(nèi))。且如果該線程池里的線程在60秒內(nèi)如果是處于空閑狀態(tài)(即沒任務(wù)執(zhí)行),那么該線程就會被回收,不再由線程池維護(hù)。如果有新任務(wù)進(jìn)來時,由于之前的線程池里的線程已被回收,那么新的線程也會再次創(chuàng)建。當(dāng)執(zhí)行完任務(wù),60秒內(nèi)依舊無新任務(wù)的可執(zhí)行話,那么該線程又會被再次回收。?
綜合該線程池的特性,我們可以思考下什么情況下應(yīng)該使用這類線程池。比如:我們的應(yīng)用服務(wù)器上面,會在非固定時間(時間跨域度會盡可能大)和非固定的任務(wù)數(shù)量。?
2.FixedThreadPool
FixedThreadPool一個固定數(shù)量的線程池,且該線程池不會隨著任務(wù)的變化而增多或減少線程數(shù)量。即該線程池下的線程池如果你不主動調(diào)用銷毀shutdowm、purge之類的方法。那么這些線程將會永遠(yuǎn)被線程池維護(hù)著。
3.SingleThreadExecutor
SingleThreadExecutor是一個固定單線程的線程池,該線程池會永遠(yuǎn)都保持著一個線程的活動狀態(tài),如果該線程池的單線程因某些異常而退出后,線程池會繼續(xù)創(chuàng)建一個新的線程。
4.ScheduledThreadPool
ScheduledThreadPool是一個支持任務(wù)定時調(diào)度的線程池。
總結(jié):除了newScheduledThreadPool的內(nèi)部實現(xiàn)特殊一點之外,其它線程池內(nèi)部都是基于ThreadPoolExecutor類(Executor的子類)實現(xiàn)的。
ThreadPoolExecutor內(nèi)部具體實現(xiàn):
ThreadPoolExecutor類構(gòu)造器語法形式:
ThreadPoolExecutor(corePoolSize,maxiPoolSize,keepAliveTime,timeUnit,workQueue,threadFactory,handle);??
方法參數(shù):
? ? ?corePoolSize:核心線程數(shù)
? ? ?maxPoolSize:最大線程數(shù)
?????keepAliveTime:線程存活時間(在corePore<*<maxPoolSize情況下有用)
?????timeUnit:存活時間的時間單位
?????workQueue:阻塞隊列(用來保存等待被執(zhí)行的任務(wù))
注:關(guān)于workQueue參數(shù)的取值,JDK提供了4種阻塞隊列類型供選擇:
?????ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);
? ? ?LinkedBlockingQueue:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),有界有上限,所生成的BlockingQueue的大小由Integer.MAX_VALUE來決定。
????SynchronousQueue:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于ArrayBlockingQueue;
?????PriorityBlockingQueue:具有優(yōu)先級的無界阻塞隊列;
?????threadFactory:線程工廠,主要用來創(chuàng)建線程;
?????handler:表示當(dāng)拒絕處理任務(wù)時的策略,有以下四種取值
?注:?當(dāng)線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。? 默認(rèn)策略
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)。用于被拒絕任務(wù)的處理程序,它直接在?execute?方法的調(diào)用線程中運行被拒絕的任務(wù);如果執(zhí)行程序已關(guān)閉,則會丟棄該任務(wù)。
RejectedExecutionHandler?handler?=? new?ThreadPoolExecutor.CallerRunsPolicy();
當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。
?線程池的狀態(tài)(5種)
其中AtomicInteger變量ctl的功能非常強大:利用低29位表示線程池中線程數(shù),通過高3位表示線程池的運行狀態(tài):
1、RUNNING:-1 << COUNT_BITS,即高3位為111,該狀態(tài)的線程池會接收新任務(wù),并處理阻塞隊列中的任務(wù);
2、SHUTDOWN:?0 << COUNT_BITS,即高3位為000,該狀態(tài)的線程池不會接收新任務(wù),但會處理阻塞隊列中的任務(wù);
3、STOP?:?1<< COUNT_BITS,即高3位為001,該狀態(tài)的線程不會接收新任務(wù),也不會處理阻塞隊列中的任務(wù),而且會中斷正在運行的任務(wù);
4、TIDYING?:?2<< COUNT_BITS,即高3位為010,該狀態(tài)表示線程池對線程進(jìn)行整理優(yōu)化;
5、TERMINATED:?3 << COUNT_BITS,即高3位為011,該狀態(tài)表示線程池停止工作;
?向線程池提交任務(wù)(2種)
有兩種方式:
? ???Executor.execute(Runnablecommand);
?????ExecutorService.submit(Callable<T> task);
execute()內(nèi)部實現(xiàn)
1.首次通過workCountof()獲知當(dāng)前線程池中的線程數(shù),
??如果小于corePoolSize, 就通過addWorker()創(chuàng)建線程并執(zhí)行該任務(wù);
否則,將該任務(wù)放入阻塞隊列;
2. 如果能成功將任務(wù)放入阻塞隊列中, ?
如果當(dāng)前線程池是非RUNNING狀態(tài),則將該任務(wù)從阻塞隊列中移除,然后執(zhí)行reject()處理該任務(wù);
如果當(dāng)前線程池處于RUNNING狀態(tài),則需要再次檢查線程池(因為可能在上次檢查后,有線程資源被釋放),是否有空閑的線程;如果有則執(zhí)行該任務(wù);
3、如果不能將任務(wù)放入阻塞隊列中,說明阻塞隊列已滿;那么將通過addWoker()嘗試創(chuàng)建一個新的線程去執(zhí)行這個任務(wù);如果addWoker()執(zhí)行失敗,說明線程池中線程數(shù)達(dá)到maxPoolSize,則執(zhí)行reject()處理任務(wù);
?sumbit()內(nèi)部實現(xiàn)
會將提交的Callable任務(wù)會被封裝成了一個FutureTask對象
FutureTask類實現(xiàn)了Runnable接口,這樣就可以通過Executor.execute()提交FutureTask到線程池中等待被執(zhí)行,最終執(zhí)行的是FutureTask的run方法;?
比較:
?兩個方法都可以向線程池提交任務(wù),execute()方法的返回類型是void,它定義在Executor接口中, 而submit()方法可以返回持有計算結(jié)果的Future對象,它定義在ExecutorService接口中,它擴(kuò)展了Executor接口,其它線程池類像ThreadPoolExecutor和ScheduledThreadPoolExecutor都有這些方法。?
線程池的關(guān)閉(2種)
ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)
shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊列,返回尚未執(zhí)行的任務(wù)
?線程池容量的動態(tài)調(diào)整
ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
總結(jié):
線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;如果阻塞隊列滿了,那就創(chuàng)建新的線程執(zhí)行當(dāng)前任務(wù);直到線程池中的線程數(shù)達(dá)到maxPoolSize,這時再有任務(wù)來,只能執(zhí)行reject()處理該任務(wù);
注:如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
?
總結(jié)
- 上一篇: JVM:jstack
- 下一篇: 多线程:线程同步的几种方式