常用并发工具类(线程池)
文章目錄
- 概述
- ThreadPoolExecutor
- ThreadPoolExecutor 的主要屬性
- Worker 主要屬性
- 線程池的狀態
- 線程池的狀態流轉
- 線程池提交任務的執行流程
- 線程數量的設置
- 線程池的種類
- FixedThreadPool
- CachedThreadPool
- SingleThreadExecutor
- ScheduledThreadPoolExecutor
- SingleThreadScheduledExecutor
- ForkJoinPool
- work-stealing
- ForkJoinPool 注意事項
概述
線程池,是資源池化思想的一種實現。線程是一種寶貴且有限的 CPU 資源,一個線程的創建跟銷毀的成本是比較高的。
所以創建線程池,主要有以下兩個目的:
- 復用線程:單個線程創建使用完畢后,可以不用立馬銷毀,而是把這個線程放入到線程池中,等待下次執行任務
- 管理線程:線程是一種寶貴且有限的 CPU 資源,其數量并不是無上限的
- 可以通過線程池來限制創建線程的數量,也可以通過線程池來決定何時銷毀冗余創建的線程
- 在線程池中存在多個線程時,可以通過線程池來協調每個線程的任務執行情況,從而避免出現線程處于空閑狀態,造成線程資源的浪費
ThreadPoolExecutor
ThreadPoolExecutor 是一個通用的線程池的實現,類圖如下所示:
可以看到,ThreadPoolExecutor 主要實現了 Executor、ExecutorService、AbstractExecutorService
ThreadPoolExecutor 的主要屬性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();/*** Set containing all worker threads in pool. Accessed only when* holding mainLock.*/private final HashSet<Worker> workers = new HashSet<Worker>();/*** Wait condition to support awaitTermination*/private final Condition termination = mainLock.newCondition();/*** Tracks largest attained pool size. Accessed only under* mainLock.*/private int largestPoolSize;/*** Counter for completed tasks. Updated only on termination of* worker threads. Accessed only under mainLock.*/private long completedTaskCount;private volatile ThreadFactory threadFactory;/*** Handler called when saturated or shutdown in execute.*/private volatile RejectedExecutionHandler handler;/*** Timeout in nanoseconds for idle threads waiting for work.* Threads use this timeout when there are more than corePoolSize* present or if allowCoreThreadTimeOut. Otherwise they wait* forever for new work.*/private volatile long keepAliveTime;/*** If false (default), core threads stay alive even when idle.* If true, core threads use keepAliveTime to time out waiting* for work.*/private volatile boolean allowCoreThreadTimeOut;/*** Core pool size is the minimum number of workers to keep alive* (and not allow to time out etc) unless allowCoreThreadTimeOut* is set, in which case the minimum is zero.*/private volatile int corePoolSize;/*** Maximum pool size. Note that the actual maximum is internally* bounded by CAPACITY.*/private volatile int maximumPoolSize;/*** The default rejected execution handler*/private static final RejectedExecutionHandler defaultHandler =new AbortPolicy();- 內置一個 AtomicInteger,高 3 位用于表示線程池當前狀態,低 29 位用于表示線程個數
- 內置一個阻塞隊列,用于核心線程都處于執行狀態后保存任務
- 內置一個 ReentrantLock 獨占鎖,以及一個由鎖對象創建出來的條件對象 Condition termination
- 一個去重集合 HashSet<Worker> workers,用于保存線程
- 歷史出現過的最多線程數量 int largestPoolSize
- 已完成的任務數量 long completedTaskCount
- 由 volatile 關鍵字修飾的線程組 ThreadFactory threadFactory
- 由 volatile 關鍵字修飾的拒絕策略 RejectedExecutionHandler handler
- 由 volatile 關鍵字修飾的空閑線程最大存活時間 long keepAliveTime
- 由 volatile 關鍵字修飾的是否允許核心線程超時 boolean allowCoreThreadTimeOut
- 由 volatile 關鍵字修飾的核心線程數量 int corePoolSize
- 由 volatile 關鍵字修飾的最大線程數量 int maximumPoolSize
- 默認的拒絕策略 new AbortPolicy(),即默認的拒絕策略是拋出異常
Worker 主要屬性
Worker 即工作線程。Worker 繼承了 AQS 并實現了 Runnable 接口,主要有以下幾個屬性:
- Thread thread:線程對象
- Runnable firstTask:首次執行的任務
線程池的狀態
線程池一共有 5 個狀態,分別是:
- RUNNING:運行狀態,接受新任務并且處理阻塞隊列里的任務
- SHUTDOWN:標記關閉狀態,拒絕新任務,但是處理正在執行和阻塞隊列里的任務
- STOP:關閉狀態,拒絕新任務并且清空阻塞隊列,同時會中斷所有線程
- TIDYING:清場狀態,線程池和阻塞隊列都為空,將要調用 terminated() 方法
線程池的狀態流轉
- 線程池一旦被創建,就是 RUNNING 狀態,可以正常地執行任務,以及接受新任務
- 在 RUNNING 狀態調用 shutdown() 方法后,線程池就會切換到 SHUTDOWN 狀態,標記關閉狀態下
- 遍歷線程池,當前所有空閑線程都會被調用 interrupt() 方法設置中斷( 被設置中斷的線程將會從線程池中移除)
- 不再接受新任務,新提交的任務將直接丟棄
- 當前正在執行任務的線程把正在執行的任務繼續完成,且這些線程將會繼續從阻塞隊列中獲取任務執行
- 在 RUNNING 或者 SHUTDOWN 狀態調用 shutdownNow() 方法后,線程池就會切換到 STOP 狀態
- 不再接受新任務
- 遍歷線程池,調用每個線程的 interrupt() 方法設置中斷
- 遍歷阻塞隊列,移除所有任務
- 在 SHUTDOWN 或者 STOP 狀態持續一段時間后,當線程池中沒有線程,并且阻塞隊列也為空后,線程池將會切換到 TIDYING 狀態
- TIDYING 狀態將會做最后的收尾工作,確保所有資源都被釋放
線程池提交任務的執行流程
- 當有新任務提交時,判斷當前核心線程數是否小于最大核心線程數
- 如果小于最大核心線程數,則創建一個新線程執行任務
- 如果大于核心線程數,則嘗試將任務提交到阻塞隊列中
- 如果阻塞隊列未滿,則將任務提交到阻塞隊列中
- 如果阻塞隊列已滿,則嘗試開啟新線程
- 如果當前線程數量小于最大線程數,則創建一個新線程執行任務
- 如果當前線程數量等于最大線程數,則執行拒絕策略
- AbortPolicy:直接拋出異常
- CallerRunsPolicy:由提交任務的線程執行
- DiscardPolicy:直接丟棄任務
- DiscardOldestPolicy:將阻塞隊列隊頭的任務取出并丟棄,然后重試提交
線程數量的設置
線程數量的經驗計算公式為:
線程數 = CPU 核心數 *(1 + 平均等待時間 / 平均工作時間)線程池的種類
FixedThreadPool
FixedThreadPool,即固定數量線程池,特點是核心線程數量 = 最大線程數量,且阻塞隊列容量較大甚至無界。
- 固定數量,意味著線程不會頻繁地創建和銷毀,在線程數量達到核心線程數量后,在執行任務過程中沒有發生異常的情況下,線程數量將不會再發生變化
- 阻塞隊列容量非常大,代表著任務可以被無條件地,一直不斷地添加進來,可能會引發 GC 問題
- 當任務的執行速度遠小于任務的添加速度時,可能會導致阻塞隊列中的節點數量特別巨大,吃光堆內存空間,導致 OOM
FixedThreadExecutor 適用于計算密集型任務, 確保 CPU 在長時間被單個工作線程使用的情況下, 盡可能少地創建、分配、銷毀線程, 即適用于長期且數量可控的任務。
CachedThreadPool
CachedThreadPool,即緩存線程池,特點是
- 核心線程數量為 0,意味著在線程池空閑時,不會占用任何的線程資源
- 最大線程數非常巨大,意味著可以創建非常多的線程,可能會導致系統線程資源耗盡的問題
- 線程有較短的最大存活時間(如 60s),意味著每個工作線程的存活時間比較短
- 阻塞隊列不存儲元素,意味著每次有新任務提交時,要么可以立馬被一個空閑線程執行,要么可以立馬創建一個新線程執行去執行
CachedThreadPool 適用于存在數量多且耗時少的任務場景,由于未限制線程最大數量,在任務的執行速度遠小于任務的添加速度時,有可能會導致系統線程資源耗盡或引發 OOM。
SingleThreadExecutor
SingleThreadExecutor,即單線程線程池,特點是:
- 核心線程數量 = 最大線程數量 = 1,意味著線程池中最多只會有一個線程,意味著所有提交的任務都會被串行化執行,任意時刻不會有同時兩個任務在被同時執行
- 阻塞隊列容量非常大甚至無界,代表著任務可以被無條件地,一直不斷地添加進來,可能會引發 GC 問題
SingleThreadExecutor 適用于多個任務需要串行化執行的場景,由于阻塞隊列無界,還是有可能引發 OOM。
注意,Executors.newSingleThreadExecutor() 并不等價于 Executors.newFixedThreadPool(1),Executors 中的源碼為:
可以看到,newSingleThreadExecutor() 方法創建出來的 ThreadPoolExecutor 對象被包了一層 FinalizableDelegatedExecutorService,所以這兩者不等價
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor,是定時任務線程池,可以使用它來實現定時任務,主要的構造方法為:
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}可以看到,ScheduledThreadPoolExecutor 的特點是:
- 最大線程數無界
- 空閑線程的最大存活時間為 0,即在線程池不會存儲空閑線程
- 阻塞隊列 DelayedWorkQueue 是一個底層數據結構為堆的延遲阻塞隊列,無界
SingleThreadScheduledExecutor
SingleThreadScheduledExecutor 即單線程定時任務線程池,可以保證所有定時任務都會由一個線程串行執行
ForkJoinPool
ForkJoinPool,是一個用于管理 ForkJoinWrokerThread 的線程池,ForkJoin 是一種基于分治思想的并發編程框架,通過將任務分解后使用多個線程并行計算,最后合并所有子任務的計算結果得到最終的計算結果。
ForkJoinPool 線程池可以把一個大任務拆分成小任務并行執行,但是任務類必須繼承自 RecursiveTask 或者 RecursiveAction
work-stealing
工作竊取算法(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行。主要實現的思路有一下幾個關鍵點:
- 每個工作者線程都會對應一個雙端任務隊列
- 當某個工作者線程自己的任務隊列中的任務執行完畢之后,將會從其他工作者線程的任務隊列里竊取任務出來執行
- 工作者線程從自己的任務隊列的頭部來獲取任務(removeFirst)
- 工作者線程執行任務竊取時,將從其他工作者線程的任務隊列的尾部來獲取任務(removeLast)
ForkJoinPool 注意事項
- 任務類必須繼承自 RecursiveTask 或者 RecursiveAction
- 子任務間沒有相互依賴
- 任務最好不要包含阻塞 IO 操作
總結
以上是生活随笔為你收集整理的常用并发工具类(线程池)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nfine框架 上传文件_MVC之Str
- 下一篇: oracle util_mail,Ora