并发编程-线程池
為什么使用線程池?
- 線程是稀缺資源,不能頻繁的創建。
- 解耦作用;線程的創建與執行完全分開,方便維護。
- 應當將其放入一個池子中,可以給其他任務進行復用。
線程池原理
核心的思想就是把寶貴的資源放到一個池子中;每次使用都從里面獲取,用完之后又放回池子供其他人使用。
如何配置線程
在 JDK 1.5 之后推出了相關的 api,常見的創建線程池方式有以下幾種:
- Executors.newCachedThreadPool():無限線程池。
- Executors.newFixedThreadPool(nThreads):創建固定大小的線程池。
- Executors.newSingleThreadExecutor():創建單個線程的線程池。
查看代碼會發現,其實看這三種方式創建的源碼就會發現,以上三種都是利用利用?ThreadPoolExecutor?類實現的。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler); }這幾個核心參數的作用:
- corePoolSize 為線程池的基本大小。
- maximumPoolSize 為線程池最大線程大小。
- keepAliveTime 和 unit 則是線程空閑后的存活時間。
- workQueue 用于存放任務的阻塞隊列。
- handler 當隊列和最大線程池都滿了之后的飽和策略。
通常我們都是使用:
?
threadPool.execute(new?Job());這樣的方式來提交一個任務到線程池中,所以核心的邏輯就是 execute() 函數了。
在具體分析之前先了解下線程池中所定義的狀態,這些狀態都和線程的執行密切相關:
?
private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS- RUNNING?
(1)狀態說明:自然是運行狀態,指可以接受任務執行隊列里的任務線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,并且線程池中的任務數為0
? ? ? (2)狀態切換:線程池的初始化狀態是RUNNING。換句話說,線程池被一旦被創建,就處于RUNNING狀態,并且線程池中的任務數為0!
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));- SHUTDOWN?
(1) 狀態說明:線程池處在SHUTDOWN狀態時,不接收新任務,但能處理已添加的任務。?
(2) 狀態切換:調用線程池的shutdown()接口時,線程池由RUNNING -> SHUTDOWN。
- STOP?
- TIDYING?
(1) 狀態說明:當所有的任務已終止,任務數量”為0,線程池會變為TIDYING狀態。當線程池變為TIDYING狀態時,會執行鉤子函數terminated()。terminated()在ThreadPoolExecutor類中是空的,若用戶想在線程池變為TIDYING時,進行相應的處理;可以通過重載terminated()函數來實現。?
(2) 狀態切換:當線程池在SHUTDOWN狀態下,阻塞隊列為空并且線程池中執行的任務也為空時,就會由 SHUTDOWN -> TIDYING。 當線程池在STOP狀態下,線程池中執行的任務為空時,就會由STOP -> TIDYING。
- TERMINATED?終止狀態,當執行?terminated()?后會更新為這個狀態。
?
?
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();//獲取當前線程池的狀態 if (workerCountOf(c) < corePoolSize) {//當前線程數量小于 coreSize 時創建一個新的線程運行if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {//如果當前線程處于運行狀態,并且寫入阻塞隊列成功int recheck = ctl.get();if (! isRunning(recheck) && remove(command)) //雙重檢查,再次獲取線程狀態;如果線程狀態變了(非運行狀態)就需要從阻塞隊列移除任務,并嘗試判斷線程是否全部執行完畢。同時執行拒絕策略。 reject(command);else if (workerCountOf(recheck) == 0) //如果當前線程池為空就新創建一個線程并執行。addWorker(null, false);}else if (!addWorker(command, false)) //如果在第三步的判斷為非運行狀態,嘗試新建線程,如果失敗則執行拒絕策略 reject(command); }如何配置線程
流程聊完了再來看看上文提到了幾個核心參數應該如何配置呢?
有一點是肯定的,線程池肯定是不是越大越好。
通常我們是需要根據這批任務執行的性質來確定的。
?
- IO 密集型任務:由于線程并不是一直在運行,所以可以盡可能的多配置線程,比如 CPU 個數 * 2
- CPU 密集型任務(大量復雜的運算)應當分配較少的線程,比如 CPU 個數相當的大小。
當然這些都是經驗值,最好的方式還是根據實際情況測試得出最佳配置。
?
優雅的關閉線程池
有運行任務自然也有關閉任務,從上文提到的 5 個狀態就能看出如何來關閉線程池。
其實無非就是兩個方法?shutdown()/shutdownNow()。
但他們有著重要的區別:
- shutdown()?執行后停止接受新任務,會把隊列的任務執行完畢。
- shutdownNow()?也是停止接受新任務,但會中斷所有的任務,將線程池狀態變為 stop。
示例:
import java.util.Random; import java.util.concurrent.TimeUnit;public class TaskDemo implements Runnable{@Overridepublic void run() {System.out.println(Thread.currentThread().getName()+" is running");try {TimeUnit.SECONDS.sleep(new Random().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();}} }import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class CachedPool {public static void main(String[] args) {ExecutorService pool=Executors.newCachedThreadPool();for (int i = 0; i < 10; i++) {//創建任務Runnable task=new TaskDemo();//把任務交給pool去執行 pool.execute(task);}} } import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class FixedPoolDemo {public static void main(String[] args) {//創建固定大小線程池ExecutorService pool=Executors.newFixedThreadPool(5);//創建10個任務給poolfor (int i = 0; i < 10; i++) {//創建任務Runnable task=new TaskDemo();//把任務交給pool去執行 pool.execute(task);}//關閉pool.shutdown();//shutdownwhile (!pool.isTerminated()){}System.out.println("finished");}} import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService;public class ScheduledDemo {public static void main(String[] args) {ScheduledExecutorService pool=Executors.newScheduledThreadPool(5);for (int i = 0; i < 10; i++) {Runnable task=new TaskDemo();//把任務交給pool去執行 pool.execute(task);}} }單一線程:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;public class SingleThreadDemo {public static void main(String[] args) {ExecutorService pool=Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {//創建任務Runnable task=new TaskDemo();//把任務交給pool去執行 pool.execute(task);}} }?
轉載于:https://www.cnblogs.com/yintingting/p/11429423.html
總結