Java高并发编程:线程池
這里首先介紹了java5中的并發的小工具包:java.util.concurrent.atomic,然后介紹了線程池的概念,對使用java5的方式創建不同形式的線程進行了演示,之后介紹了兩個 對象:Callable和Future,用于獲取線程執行后的結果,對于線程鎖技術則在另外一篇文章中介紹。
Java5中的線程并發庫都在java.util.concurrent包及子包中
1. Executor類的繼承結構
Executor是線程池的頂級接口,只有一個執行任務的方法execute()
ExecutorService是Executor的子接口,該接口中包含了線程池常用的一些方法
| execute() | 執行任務 |
| shutdown() | 調用后不再接收新任務,如果里面有任務,就執行完 |
| shutdownNow() | 調用后不再接受新任務,如果有等待任務,移出隊列;有正在執行的,嘗試停止之 |
| isShutdown() | 判斷線程池是否關閉 |
| isTerminated() | 判斷線程池中任務是否執行完成 |
| submit() | 提交任務 |
| invokeAll() | 執行一組任務 |
2. ThreadPoolExecutor
ExecutorService的默認實現,同時也是Executors的底層實現
2.1 構造方法
public ThreadPoolExecutor(int corePoolSize, //核心線程數int maximumPoolSize, //最大線程數long keepAliveTime, //保持時間TimeUnit unit, //時間單位BlockingQueue<Runnable> workQueue, //阻塞隊列ThreadFactory threadFactory, //線程工廠RejectedExecutionHandler handler //異常捕獲器 )2.1.1 int corePoolSize
核心池的大小,這個參數跟后面講述的線程池的實現原理有非常大的關系。在創建了線程池后,默認情況下,線程池中并沒有任何線程,而是等待有任務到來才創建線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預創建線程的意思,即在沒有任務到來之前就創建corePoolSize個線程或者一個線程。默認情況下,在創建了線程池后,線程池中的線程數為0,當有任務來之后,就會創建一個線程去執行任務,當線程池中的線程數目達到corePoolSize后,就會把到達的任務放到緩存隊列當中
2.1.2 int maximumPoolSize
線程池最大線程數,這個參數也是一個非常重要的參數,它表示在線程池中最多能創建多少個線程
2.1.3 long keepAliveTime
表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大于corePoolSize,即當線程池中的線程數大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大于corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0
2.1.4 TimeUnit unit
參數keepAliveTime的時間單位,有7種取值
- TimeUnit.DAYS //天
- TimeUnit.HOURS //小時
- TimeUnit.MINUTES //分鐘
- TimeUnit.SECONDS //秒
- TimeUnit.MILLISECONDS //毫秒
- TimeUnit.MICROSECONDS //微妙
- TimeUnit.NANOSECONDS //納秒
2.1.5 RejectedExecutionHandler
ThreadPoolExecutor.AbortPolicy
當添加任務出錯時的策略捕獲器,丟棄任務并拋出RejectedExecutionException異常ThreadPoolExecutor.DiscardPolicy
也是丟棄任務,但是不拋出異常ThreadPoolExecutor.DiscardOldestPolicy
丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)ThreadPoolExecutor.CallerRunsPolicy
由調用線程處理該任務
3. 任務提交給線程池之后的處理策略
1、如果當前線程池中的線程數目小于corePoolSize,則每來一個任務,就會創建執行這個任務
2、如果當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中
2.1、若添加成功,則該任務會等待空閑線程將其取出去執行
2.2、若添加失敗(一般來說是任務緩存隊列已滿),則會嘗試創建新的線程去執行這個任務
3、如果當前線程池中的線程數目達到maximumPoolSize,則會采取任務拒絕策略進行處理
如果線程池中的線程數量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大于corePoolSize;如果允許為核心池中的線程設置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止
4. 阻塞隊列的介紹
4.1 BlockingQueue
| BlockingQueue | 阻塞隊列的頂級接口,主要用于實現生產者消費者隊列 |
| BlockingDeque | 雙端隊列 |
| SynchronousQueue | 同步隊列,無界隊列,直接提交策略,交替隊列,在某次添加元素后必須等待其他線程取走后才能繼續添加 |
| LinkedBlockingQueue | 無界隊列,基于鏈表的阻塞隊列,可以并發運行,FIFO |
| ArrayBlockingQueue | 基于數組的有界(固定大小的數組)阻塞隊列,只有put方法和take方法才具有阻塞功能,公平性 fairness |
| PriorityBlockingQueue | 基于優先級的阻塞隊列,依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序 |
| DelayQueue | 延時隊列 |
4.2 排隊策略
直接提交
工作隊列的默認選項是 SynchronousQueue,它將任務直接提交給線程而不保持它們。在此,如果不存在可用于立即運行任務的線程,則試圖把任務加入隊列將失敗,因此會構造一個新的線程。此策略可以避免在處理可能具有內部依賴性的請求集時出現鎖。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務。當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性。
無界隊列
使用無界隊列(例如,不具有預定義容量的 LinkedBlockingQueue)使用無界隊列將導致在所有 corePoolSize 線程都忙時新任務在隊列中等待。這樣,創建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當每個任務完全獨立于其他任務,即任務執行互不影響時,適合于使用無界隊列。例如,在 Web 頁服務器中。這種排隊可用于處理瞬態突發請求,當命令以超過隊列所能處理的平均數連續到達時,此策略允許無界線程具有增長的可能性
有界隊列
當使用有限的 maximumPoolSizes 時,有界隊列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調整和控制。隊列大小和最大池大小可能需要相互折衷:使用大型隊列和小型池可以最大限度地降低 CPU 使用率、操作系統資源和上下文切換開銷,但是可能導致人工降低吞吐量。如果任務頻繁阻塞(例如,如果它們是 I/O 邊界),則系統可能為超過您許可的更多線程安排時間。使用小型隊列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調度開銷,這樣也會降低吞吐量。
4.3 BlockingQueue
| Insert | add() | offer() | put() | offer(e,time,unit) |
| Remove | remove() | poll() | take() | poll(time,unit) |
| Examine檢查 | element() | peek() | 不可用 | 不可用 |
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會拋出 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 附加元素。沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩余容量。
BlockingQueue 實現主要用于生產者-使用者隊列,但它另外還支持 Collection 接口。因此,舉例來說,使用 remove(x) 從隊列中移除任意一個元素是有可能的。然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊信息時。
BlockingQueue 實現是線程安全的。所有排隊方法都可以使用內部鎖或其他形式的并發控制來自動達到它們的目的。然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有必要自動執行,除非在實現中特別說明。因此,舉例來說,在只添加了 c 中的一些元素后,addAll(c) 有可能失敗(拋出一個異常)。
BlockingQueue 實質上不支持使用任何一種“close”或“shutdown”操作來指示不再添加任何項。這種功能的需求和使用有依賴于實現的傾向。例如,一種常用的策略是:對于生產者,插入特殊的end-of-stream或poison對象,并根據使用者獲取這些對象的時間來對它們進行解釋。
4.4 BlockingDeque
雙端隊列
4.5 ArrayBlockingQueue
一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。創建其對象必須明確大小,像數組一樣。其內部實現是將對象放到一個數組里。有界也就意味著,它不能夠存儲無限多數量的元素。它有一個同一時間能夠存儲元素數量的上限。你可以在對其初始化的時候設定這個上限,但之后就無法對這個上限進行修改了(譯者注:因為它是基于數組實現的,也就具有數組的特性:一旦初始化,大小就無法修改)。
實現互斥,你一下我一下
public class BlockingQueueCondition {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();final Business3 business = new Business3();service.execute(new Runnable(){public void run() {for(int i=0;i<50;i++){business.sub();}}});for(int i=0;i<50;i++){business.main();}} } class Business3{BlockingQueue subQueue = new ArrayBlockingQueue(1);BlockingQueue mainQueue = new ArrayBlockingQueue(1);{try {mainQueue.put(1);} catch (InterruptedException e) {e.printStackTrace();}}public void sub(){try{mainQueue.take();for(int i=0;i<10;i++){System.out.println(Thread.currentThread().getName() + " : " + i);}subQueue.put(1);}catch(Exception e){}}public void main(){try{subQueue.take();for(int i=0;i<5;i++){System.out.println(Thread.currentThread().getName() + " : " + i);}mainQueue.put(1);}catch(Exception e){}} }輸出結果
pool-1-thread-1 : 0 pool-1-thread-1 : 1 pool-1-thread-1 : 2 pool-1-thread-1 : 3 pool-1-thread-1 : 4 pool-1-thread-1 : 5 pool-1-thread-1 : 6 pool-1-thread-1 : 7 pool-1-thread-1 : 8 pool-1-thread-1 : 9 main : 0 main : 1 main : 2 main : 3 main : 4 pool-1-thread-1 : 0 pool-1-thread-1 : 1 pool-1-thread-1 : 2 pool-1-thread-1 : 3 pool-1-thread-1 : 4 pool-1-thread-1 : 5 pool-1-thread-1 : 6 pool-1-thread-1 : 7 pool-1-thread-1 : 8 pool-1-thread-1 : 9 main : 0 main : 1 main : 2 main : 3 main : 4 pool-1-thread-1 : 0 pool-1-thread-1 : 1 pool-1-thread-1 : 2 pool-1-thread-1 : 3 pool-1-thread-1 : 4 pool-1-thread-1 : 5 pool-1-thread-1 : 6 pool-1-thread-1 : 7 pool-1-thread-1 : 8 pool-1-thread-1 : 9 ...4.6 LinkedBlockingQueue
一個可改變大小的阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。創建其對象如果沒有明確大小,默認值是Integer.MAX_VALUE。鏈接隊列的吞吐量通常要高于基于數組的隊列,但是在大多數并發應用程序中,其可預知的性能要低。
4.7 SynchronousQueue
同步隊列。同步隊列沒有任何容量,每個插入必須等待另一個線程移除,反之亦然。是一個特殊的隊列,它的內部同時只能夠容納單個元素。如果該隊列已有一元素的話,試圖向隊列中插入一個新元素的線程將會阻塞,直到另一個線程將該元素從隊列中抽走。同樣,如果該隊列為空,試圖向隊列中抽取一個元素的線程將會阻塞,直到另一個線程向隊列中插入了一條新的元素。據此,把這個類稱作一個隊列顯然是夸大其詞了。它更多像是一個匯合點。
4.8 DelayQueue
延時隊列,對元素進行持有直到一個特定的延遲到期,只有在延遲期滿時才能從中提取元素。注入其中的元素必須實現 java.util.concurrent.Delayed 接口。
4.9 PriorityBlockingQueue
基于優先級的阻塞隊列,依據對象的自然排序順序或者是構造函數所帶的Comparator決定的順序,應用:Volley
4.10 生產者消費者模式
生產者生產任務,消費者消費任務,那么這時就需要一個任務隊列,生產者向隊列里插入任務,消費者從隊列里提取任務執行
5. 線程池工具類Executors
jdk1.5之后的一個新類,提供了一些靜態工廠,生成一些常用的線程池,ThreadPoolExecutor是Executors類的底層實現
| newCachedThreadPool() | 創建一個可緩存的線程池 |
| newFixedThreadPool() | 創建一個固定大小的線程池 |
| newScheduledThreadPool() | 創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求 |
| newSingleThreadExecutor() | 創建單個線程的線程池,始終保證線程池中會有一個線程在。當某線程死去,會找繼任者 |
| defaultThreadFactory() | 創建一個默認線程池工廠 |
6. 線程池
在線程池的編程模式下,任務是提交給整個線程池,而不是直接交給某個線程,線程池在拿到任務后,它就在內部找有無空閑的線程,再把任務交給內部某個空閑的線程,這就是封裝
記住:任務是提交給整個線程池,一個線程同時只能執行一個任務,但可以同時向一個線程池提交多個任務。
示例:
- 創建固定大小的線程池
- 創建緩存線程池
- 用線程池創建定時器
- 創建單一線程池(始終保證線程池中會有一個線程在。當某線程死去,會找繼任者)
注意
定時器中總是相對時間,我們要想指定具體時間的方法:比如明天早上10點鐘執行,則可以使用明天早上10點的時間減去當前的時間,得到時間間隔
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ThreadPoolTest { public static void main(String[] args){ //創建固定大小的線程池,這里只能完成3個任務 //ExecutorService threadPool = Executors.newFixedThreadPool(3); //創建緩存線程池,根據任務來自動創建線程的數量,可以完成創建的所有任務 //ExecutorService threadPool = Executors.newCachedThreadPool(); //創建單一線程池(始終保持線程池中有一個線程存活。當唯一線程死去,會創建新的繼任者、 ExecutorService threadPool = Executors.newSingleThreadExecutor(); for(int i=1;i<=10;i++){ //內部類不能訪問外部類的局部變量,所以i要定義為final,又由于i++. //所以在循環內部定義一個變量接收i final int task = i; threadPool.execute(new Runnable() { @Override public void run() { for(int j=1;j<=10;j++){ System.out.println(Thread.currentThread().getName() +" is looping of "+ j+" for task of " +task); } } }); } //驗證10個任務都提交給了線程池 System.out.println("all of 10 tasks have committed! "); //threadPool.shutdown(); //等任務完成后,殺死線程、 //threadPool.shutdownNow(); //立即停止線程 //用線程池啟動定時器 Executors.newScheduledThreadPool(3).schedule( new Runnable() { //任務 @Override public void run() { System.out.println("bombing!"); } }, 5, //5秒以后執行 TimeUnit.SECONDS); //單位 //在某個時間執行一次后,再指定后續的執行間隔時間 Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable(){ @Override public void run() { System.out.println("bombing!"); } }, 10, //第一次在10秒時爆炸 3, //以后每隔3秒爆炸一次。 TimeUnit.SECONDS); } }7. 線程池的簡單使用
/*** 一個簡易的線程池管理類,提供三個線程池*/ public class ThreadManager {public static final String DEFAULT_SINGLE_POOL_NAME = "DEFAULT_SINGLE_POOL_NAME";private static ThreadPoolProxy mLongPool = null;private static Object mLongLock = new Object();private static ThreadPoolProxy mShortPool = null;private static Object mShortLock = new Object();private static ThreadPoolProxy mDownloadPool = null;private static Object mDownloadLock = new Object();private static Map<String, ThreadPoolProxy> mMap = new HashMap<String, ThreadPoolProxy>();private static Object mSingleLock = new Object();/** 獲取下載線程 */public static ThreadPoolProxy getDownloadPool() {synchronized (mDownloadLock) {if (mDownloadPool == null) {mDownloadPool = new ThreadPoolProxy(3, 3, 5L);}return mDownloadPool;}}/** 獲取一個用于執行長耗時任務的線程池,避免和短耗時任務處在同一個隊列而阻塞了重要的短耗時任務,通常用來聯網操作 */public static ThreadPoolProxy getLongPool() {synchronized (mLongLock) {if (mLongPool == null) {mLongPool = new ThreadPoolProxy(5, 5, 5L);}return mLongPool;}}/** 獲取一個用于執行短耗時任務的線程池,避免因為和耗時長的任務處在同一個隊列而長時間得不到執行,通常用來執行本地的IO/SQL */public static ThreadPoolProxy getShortPool() {synchronized (mShortLock) {if (mShortPool == null) {mShortPool = new ThreadPoolProxy(2, 2, 5L);}return mShortPool;}}/** 獲取一個單線程池,所有任務將會被按照加入的順序執行,免除了同步開銷的問題 */public static ThreadPoolProxy getSinglePool() {return getSinglePool(DEFAULT_SINGLE_POOL_NAME);}/** 獲取一個單線程池,所有任務將會被按照加入的順序執行,免除了同步開銷的問題 */public static ThreadPoolProxy getSinglePool(String name) {synchronized (mSingleLock) {ThreadPoolProxy singlePool = mMap.get(name);if (singlePool == null) {singlePool = new ThreadPoolProxy(1, 1, 5L);mMap.put(name, singlePool);}return singlePool;}}public static class ThreadPoolProxy {private ThreadPoolExecutor mPool;private int mCorePoolSize;private int mMaximumPoolSize;private long mKeepAliveTime;private ThreadPoolProxy(int corePoolSize, int maximumPoolSize, long keepAliveTime) {mCorePoolSize = corePoolSize;mMaximumPoolSize = maximumPoolSize;mKeepAliveTime = keepAliveTime;}/** 執行任務,當線程池處于關閉,將會重新創建新的線程池 */public synchronized void execute(Runnable run) {if (run == null) {return;}if (mPool == null || mPool.isShutdown()) {//參數說明//當線程池中的線程小于mCorePoolSize,直接創建新的線程加入線程池執行任務//當線程池中的線程數目等于mCorePoolSize,將會把任務放入任務隊列BlockingQueue中//當BlockingQueue中的任務放滿了,將會創建新的線程去執行,//但是當總線程數大于mMaximumPoolSize時,將會拋出異常,交給RejectedExecutionHandler處理//mKeepAliveTime是線程執行完任務后,且隊列中沒有可以執行的任務,存活的時間,后面的參數是時間單位//ThreadFactory是每次創建新的線程工廠mPool = new ThreadPoolExecutor(mCorePoolSize, mMaximumPoolSize, mKeepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new AbortPolicy());}mPool.execute(run);}/** 取消線程池中某個還未執行的任務 */public synchronized void cancel(Runnable run) {if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {mPool.getQueue().remove(run);}}/** 取消線程池中某個還未執行的任務 */public synchronized boolean contains(Runnable run) {if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {return mPool.getQueue().contains(run);} else {return false;}}/** 立刻關閉線程池,并且正在執行的任務也將會被中斷 */public void stop() {if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {mPool.shutdownNow();}}/** 平緩關閉單任務線程池,但是會確保所有已經加入的任務都將會被執行完畢才關閉 */public synchronized void shutdown() {if (mPool != null && (!mPool.isShutdown() || mPool.isTerminating())) {mPool.shutdownNow();}}} }總結
以上是生活随笔為你收集整理的Java高并发编程:线程池的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java高并发编程:线程范围内共享数据
- 下一篇: Java高并发编程:HandlerThr