Java——线程池
構造一個線程池為什么需要幾個參數?如果避免線程池出現OOM?Runnable和Callable的區別是什么?本文將對這些問題一一解答,同時還將給出使用線程池的常見場景和代碼片段。
基礎知識
Executors創建線程池
Java中創建線程池很簡單,只需要調用Executors中相應的便捷方法即可,比如Executors.newFixedThreadPool(int nThreads),但是便捷不僅隱藏了復雜性,也為我們埋下了潛在的隱患(OOM,線程耗盡)。
Executors創建線程池便捷方法列表:
| newFixedThreadPool(int nThreads) | 創建固定大小的線程池 |
| newSingleThreadExecutor() | 創建只有一個線程的線程池 |
| newCachedThreadPool() | 創建一個不限線程數上限的線程池,任何提交的任務都將立即執行 |
小程序使用這些快捷方法沒什么問題,對于服務端需要長期運行的程序,創建線程池應該直接使用ThreadPoolExecutor的構造方法。沒錯,上述Executors方法創建的線程池就是ThreadPoolExecutor。
ThreadPoolExecutor構造方法
Executors中創建線程池的快捷方法,實際上是調用了ThreadPoolExecutor的構造方法(定時任務使用的是ScheduledThreadPoolExecutor),該類構造方法參數列表如下:
// Java線程池的完整構造函數 public ThreadPoolExecutor( int corePoolSize, // 線程池長期維持的線程數,即使線程處于Idle狀態,也不會回收。 int maximumPoolSize, // 線程數的上限 long keepAliveTime, TimeUnit unit, // 超過corePoolSize的線程的idle時長, // 超過這個時間,多余的線程會被回收。 BlockingQueue<Runnable> workQueue, // 任務的排隊隊列 ThreadFactory threadFactory, // 新線程的產生方式 RejectedExecutionHandler handler) // 拒絕策略竟然有7個參數,很無奈,構造一個線程池確實需要這么多參數。這些參數中,比較容易引起問題的有corePoolSize,?maximumPoolSize,?workQueue以及handler:
- corePoolSize和maximumPoolSize設置不當會影響效率,甚至耗盡線程;
- workQueue設置不當容易導致OOM;
- handler設置不當會導致提交任務時拋出異常。
正確的參數設置方式會在下文給出。
線程池的工作順序
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略
Runnable和Callable
可以向線程池提交的任務有兩種:Runnable和Callable,二者的區別如下:
Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。
三種提交任務的方式:
| Future<T> submit(Callable<T> task) | 是 |
| void execute(Runnable command) | 否 |
| Future<?> submit(Runnable task) | 否,雖然返回Future,但是其get()方法總是返回null |
如何正確使用線程池
避免使用無界隊列
不要使用Executors.newXXXThreadPool()快捷方法創建線程池,因為這種方式會使用無界的任務隊列,為避免OOM,我們應該使用ThreadPoolExecutor的構造方法手動指定隊列的最大長度:
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM new ThreadPoolExecutor.DiscardPolicy());明確拒絕任務時的行為
任務隊列總有占滿的時候,這是再submit()提交新的任務會怎么樣呢?RejectedExecutionHandler接口為我們提供了控制方式,接口定義如下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }線程池給我們提供了幾種常見的拒絕策略:
| AbortPolicy | 拋出RejectedExecutionException |
| DiscardPolicy | 什么也不做,直接忽略 |
| DiscardOldestPolicy | 丟棄執行隊列中最老的任務,嘗試為當前提交的任務騰出位置 |
| CallerRunsPolicy | 直接由提交任務者執行這個任務 |
線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多余的任務會悄悄的被忽略。
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略獲取處理結果和異常
線程池的處理結果、以及處理過程中的異常都被包裝到Future中,并在調用Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。獲取執行結果的代碼可以這樣寫:
ExecutorService executorService = Executors.newFixedThreadPool(4); Future<Object> future = executorService.submit(new Callable<Object>() {上述代碼輸出類似如下:
線程池的常用場景
正確構造線程池
int poolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); executorService = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, queue, policy);獲取單個結果
過submit()向線程池提交任務后會返回一個Future,調用V Future.get()方法能夠阻塞等待執行結果,V get(long timeout, TimeUnit unit)方法可以指定等待的超時時間。
獲取多個結果
如果向線程池提交了多個任務,要獲取這些任務的執行結果,可以依次調用Future.get()獲得。但對于這種場景,我們更應該使用ExecutorCompletionService,該類的take()方法總是阻塞等待某一個任務完成,然后返回該任務的Future對象。向CompletionService批量提交任務后,只需調用相同次數的CompletionService.take()方法,就能獲取所有任務的執行結果,獲取順序是任意的,取決于任務的完成順序:
void solve(Executor executor, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構造器 for (Callable<Result> s : solvers)// 提交所有任務 ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) {// 獲取每一個完成的任務 Result r = ecs.take().get(); if (r != null) use(r); } }單個任務的超時時間
V Future.get(long timeout, TimeUnit unit)方法可以指定等待的超時時間,超時未完成會拋出TimeoutException。
多個任務的超時時間
等待多個任務完成,并設置最大等待時間,可以通過CountDownLatch完成:
public void testLatch(ExecutorService executorService, List<Runnable> tasks) throws InterruptedException{ CountDownLatch latch = new CountDownLatch(tasks.size()); for(Runnable r : tasks){ executorService.submit(new Runnable() {線程池和裝修公司
以運營一家裝修公司做個比喻。公司在辦公地點等待客戶來提交裝修請求;公司有固定數量的正式工以維持運轉;旺季業務較多時,新來的客戶請求會被排期,比如接單后告訴用戶一個月后才能開始裝修;當排期太多時,為避免用戶等太久,公司會通過某些渠道(比如人才市場、熟人介紹等)雇傭一些臨時工(注意,招聘臨時工是在排期排滿之后);如果臨時工也忙不過來,公司將決定不再接收新的客戶,直接拒單。
線程池就是程序中的“裝修公司”,代勞各種臟活累活。上面的過程對應到線程池上:
// Java線程池的完整構造函數 public ThreadPoolExecutor( int corePoolSize, // 正式工數量 int maximumPoolSize, // 工人數量上限,包括正式工和臨時工 long keepAliveTime, TimeUnit unit, // 臨時工游手好閑的最長時間,超過這個時間將被解雇 BlockingQueue<Runnable> workQueue, // 排期隊列 ThreadFactory threadFactory, // 招人渠道 RejectedExecutionHandler handler) // 拒單方式總結
Executors為我們提供了構造線程池的便捷方法,對于服務器程序我們應該杜絕使用這些便捷方法,而是直接使用線程池ThreadPoolExecutor的構造方法,避免無界隊列可能導致的OOM以及線程個數限制不當導致的線程數耗盡等問題。ExecutorCompletionService提供了等待所有任務執行結束的有效方式,如果要設置等待的超時時間,則可以通過CountDownLatch完成。
參考
ThreadPoolExecutor API Doc
來源:http://www.cnblogs.com/CarpenterLee/p/9558026.html
轉載于:https://www.cnblogs.com/CaptainFM/p/10690018.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
- 上一篇: 前后端token机制 识别用户登录信息
- 下一篇: 企业——memcache对PHP页面的缓