Java线程池Executor框架
一、為什么要引入線程池
當存在大量并發任務時,創建、銷毀線程需要很大的開銷,運用線程池可以大大減小開銷。
二、Executor框架
說明:
線程池是一個復雜的任務調度工具,它涉及到任務、線程池等的生命周期問題。要配置一個線程池是比較復雜的,尤其是對于線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的。
JDK中的線程池均由ThreadPoolExecutor類實現。其構造方法如下:
參數說明:
1.corePoolSize:核心線程數。
2.maximumPoolSize:最大線程數。
3.keepAliveTime:非核心線程如果處于閑置狀態超過該值,就會被銷毀。如果設置allowCoreThreadTimeOut(true),則會也作用于核心線程。
4.unit:keepAliveTime的單位。java.util.concurrent.TimeUnit類存在靜態靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS
5.workQueue:Runnable的阻塞隊列。若線程池已經被占滿,則該隊列用于存放無法再放入線程池中的Runnable。
JDK內部提供了五種最常見的線程池。由Executors類的五個靜態工廠方法創建。
1.newFixedThreadPool(…)
2.newSingleThreadExecutor(…)
3.newCachedThreadPool(…)
4.newScheduledThreadPool(…)
5.newSingleThreadScheduledExecutor()
返回單線程的Executor,將多個任務交給此Exector時,這個線程處理完一個任務后接著處理下一個任務,若該線程出現異常,將會有一個新的線程來替代。此線程池保證所有任務的執行順序按照任務的提交順序執行。
說明:LinkedBlockingQueue會無限的添加需要執行的Runnable。
每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
static ExecutorSevice newFixedThreadPool()
返回一個包含指定數目線程的線程池,如果任務數量多于線程數目,那么沒有沒有執行的任務必須等待,直到有任務完成為止。
可緩存的線程池 newCachedThreadPool
如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴于操作系統(或者說JVM)能夠創建的最大線程大小。
newCachedThreadPool方法創建的線程池可以自動的擴展線程池的容量。核心線程數量為0。
SynchronousQueue是個特殊的隊列。
SynchronousQueue隊列的容量為0。
當試圖為SynchronousQueue添加Runnable,則執行會失敗。只有當一邊從SynchronousQueue取數據,一邊向SynchronousQueue添加數據才可以成功。
SynchronousQueue僅僅起到數據交換的作用,并不保存線程。但newCachedThreadPool()方法沒有線程上限。Runable添加到SynchronousQueue會被立刻取出。
根據用戶的任務數創建相應的線程來處理,該線程池不會對線程數目加以限制,完全依賴于JVM能創建線程的數量,可能引起內存不足。
定時任務調度的線程池 newScheduledThreadPool
創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。
此線程池支持定時以及周期性執行任務的需求。
注意:實際開發中,都是自定義自己的線程池
三、Executor接口
Executor是一個線程執行接口。任務執行的主要抽象不是Thead,而是Executor。
public interface Executor{void executor(Runnable command); }Executor將任務的提交過程與執行過程解耦,并用Runnable來表示任務。執行的任務放入run方法中即可,將Runnable接口的實現類交給線程池的execute方法,作為它的一個參數。如果需要給任務傳遞參數,可以通過創建一個Runnable接口的實現類來完成。
Executor可以支持多種不同類型的任務執行策略。
Executor基于生產者消費者模式,提交任務的操作相當于生產者,執行任務的線程則相當于消費者。
線程池接口。ExecutorService在Executor的基礎上增加了一些方法,其中有兩個核心的方法:
這兩個方法都是向線程池中提交任務,它們的區別在于Runnable在執行完畢后沒有結果,Callable執行完畢后有一個結果。這在多個線程中傳遞狀態和結果是非常有用的。另外他們的相同點在于都返回一個Future對象。Future對象可以阻塞線程直到運行完畢(獲取結果,如果有的話),也可以取消任務執行,當然也能夠檢測任務是否被取消或者是否執行完畢。
在沒有Future之前我們檢測一個線程是否執行完畢通常使用Thread.join()或者用一個死循環加狀態位來描述線程執行完畢。現在有了更好的方法能夠阻塞線程,檢測任務執行完畢甚至取消執行中或者未開始執行的任務。
ScheduledExecutorService描述的功能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。這包括延遲時間一次性執行、延遲時間周期性執行以及固定延遲時間周期性執行等。當然了繼承ExecutorService的ScheduledExecutorService擁有ExecutorService的全部特性。
四、線程池生命周期
線程是有多種執行狀態的,同樣管理線程的線程池也有多種狀態。JVM會在所有線程(非后臺daemon線程)全部終止后才退出,為了節省資源和有效釋放資源關閉一個線程池就顯得很重要。有時候無法正確的關閉線程池,將會阻止JVM的結束。
線程池Executor是異步的執行任務,因此任何時刻不能夠直接獲取提交的任務的狀態。這些任務有可能已經完成,也有可能正在執行或者還在排隊等待執行。因此關閉線程池可能出現一下幾種情況:
1.平緩關閉:已經啟動的任務全部執行完畢,同時不再接受新的任務。
2.立即關閉:取消所有正在執行和未執行的任務。
另外關閉線程池后對于任務的狀態應該有相應的反饋信息。
線程池在構造前(new操作)是初始狀態,一旦構造完成線程池就進入了執行狀態RUNNING。嚴格意義上講線程池構造完成后并沒有線程被立即啟動,只有進行"預啟動"或者接收到任務的時候才會啟動線程。線程池是處于運行狀態,隨時準備接受任務來執行。
線程池運行中可以通過shutdown()和shutdownNow()來改變運行狀態。
shutdown():平緩的關閉線程池。線程池停止接受新的任務,同時等待已經提交的任務執行完畢,包括那些進入隊列還沒有開始的任務。shutdown()方法執行過程中,線程池處于SHUTDOWN狀態。
shutdownNow():立即關閉線程池。線程池停止接受新的任務,同時線程池取消所有執行的任務和已經進入隊列但是還沒有執行的任務。shutdownNow()方法執行過程中,線程池處于STOP狀態。shutdownNow方法本質是調用Thread.interrupt()方法。但我們知道該方法僅僅是讓線程處于interrupted狀態,并不會讓線程真正的停止!所以若只調用或只調用一次shutdownNow()方法,不一定會讓線程池中的線程都關閉掉,線程中必須要有處理interrupt事件的機制。
3. 線程池結束
一旦shutdown()或者shutdownNow()執行完畢,線程池就進入TERMINATED狀態,即線程池就結束了。isTerminating() 如果關閉后所有任務都已完成,則返回 true。isShutdown() 如果此執行程序已關閉,則返回 true。
例:使用固定大小的線程池。并將任務添加到線程池。
public class JavaThreadPool {public static void main(String[] args) {// 創建一個可重用固定線程數的線程池ExecutorService pool = Executors.newFixedThreadPool(2);// 創建實現了Runnable接口對象,Thread對象當然也實現了Runnable接口Thread t1 = new MyThread();Thread t2 = new MyThread();Thread t3 = new MyThread();Thread t4 = new MyThread();Thread t5 = new MyThread();// 將線程放入池中進行執行pool.execute(t1);pool.execute(t2);pool.execute(t3);pool.execute(t4);pool.execute(t5);// 關閉線程池pool.shutdown();}} class MyThread extends Thread {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "正在執行。。。");} }五、Java線程池擴展
ThreadPoolExecutor線程池的執行監控
ThreadPoolExecutor中定義了三個空方法,用于監控線程的執行情況。
例:使用覆蓋方法,定義新的線程池。
public class ExtThreadPoolTest {static class MyTask implements Runnable {public String name;public MyTask(String name) {super();this.name = name;}@Overridepublic void run() {try {Thread.sleep(500);System.out.println("執行中:"+this.name);Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {ExecutorService es = new ThreadPoolExecutor(5,5,0,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()){@Overrideprotected void beforeExecute(Thread t, Runnable r) {System.out.println("準備執行:" + ((MyTask)r).name);}@Overrideprotected void afterExecute(Runnable r, Throwable t) {System.out.println("執行完成:" + ((MyTask)r).name);}@Overrideprotected void terminated() {System.out.println("執行退出");} };for(int i=0;i<5;i++){MyTask task = new MyTask("Task-"+i);es.execute(task);}Thread.sleep(10); // 等待terminated()執行es.shutdown(); // 若無該方法,主線程不會結束。} }六、ThreadPoolExecutor的拒絕策略
線程池不可能處理無限多的線程。所以一旦線程池中中需要執行的任務過多,線程池對于某些任務就無法處理了。拒絕策略即對這些無法處理的任務進行處理。可能丟棄掉這些不能處理的任務,也可能用其他方式。
ThreadPoolExecutor類還有另一個構造方法。該構造方法中的RejectedExecutionHandler用于定義拒絕策略。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { }JDK內部已經提供一些拒絕策略:
- AbortPolicy 一旦線程不能處理,則拋出異常。
- DiscardPolicy 一旦線程不能處理,則丟棄任務。
- CallerRunsPolicy 一旦線程不能處理,則將任務返回給提交任務的線程處理。
- DiscardOldestPolicy 一旦線程不能處理,丟棄掉隊列中最老的任務。
使用例子如下:
int corePoolSize = 1; int maximumPoolSize = 1; BlockingQueue queue = new ArrayBlockingQueue<Runnable>(1); ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,0, TimeUnit.SECONDS, queue ) ; pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy ()); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());注意:一般都是自定義拒絕策略
例:自定義拒絕策略。打印并丟棄無法處理的任務。
七、ThreadFactory 線程工廠
ThreadPoolExecutor類構造器的參數其中之一即為ThreadFactory線程工廠。
ThreadFactory的實現類中一般定義線程了線程組,線程數與線程名稱。
DefaultThreadFactory源碼:
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}這里需要稍微提一下的是CompletionService接口,它是用于描述順序獲取執行結果的一個線程池包裝器。它依賴一個具體的線程池調度,但是能夠根據任務的執行先后順序得到執行結果,這在某些情況下可能提高并發效率。
文章轉載
總結
以上是生活随笔為你收集整理的Java线程池Executor框架的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Condition接口详解
- 下一篇: 2021上半年短视频及电商生态研究报告