Java并发编程之线程池及示例
1、Executor
線程池頂級接口。定義方法,void execute(Runnable)。方法是用于處理任務的一個服務方法。調用者提供Runnable 接口的實現,線程池通過線程執行這個 Runnable。服務方法無返回值的。是 Runnable 接口中的 run 方法無返回值。
常用方法 -voidexecute(Runnable) 作用是: 啟動線程任務的。示例如下:
2、ExecutorService
Executor 接口的子接口。提供了一個新的服務方法,submit。有返回值(Future 類型)。 submit 方法提供了 overload 方法。其中有參數類型為 Runnable 的,不需要提供返回值的; 有參數類型為 Callable,可以提供線程執行后的返回值。
Future,是 submit 方法的返回值。代表未來,也就是線程執行結束后的一種結果。如返 回值。
常見方法 -void execute(Runnable), Future submit(Callable), Future submit(Runnable) 線程池狀態: Running, ShuttingDown,?Terminated。
Running- 線程池正在執行中。活動狀態。
ShuttingDown- 線程池正在關閉過程中。優雅關閉。一旦進入這個狀態,線程池不再接收新的任務,處理所有已接收的任務,處理完畢后,關閉線程池。
Terminated- 線程池已經關閉。
3、Future
未來結果,代表線程任務執行結束后的結果。獲取線程執行結果的方式是通過 get 方法獲取的。get 無參,阻塞等待線程執行結束,并得到結果。get 有參,阻塞固定時長,等待 線程執行結束后的結果,如果在阻塞時長范圍內,線程未執行結束,拋出異常。
常用方法: T get()、T get(long, TimeUnit) 。
/*** 線程池* 固定容量線程池*/ import java.util.concurrent.*;public class Test_03_Future {public static void main(String[] args) throws InterruptedException, ExecutionException {/*FutureTask<String> task = new FutureTask<>(new Callable<String>() {@Overridepublic String call() throws Exception {return "first future task";}});new Thread(task).start();System.out.println(task.get());*/ExecutorService service = Executors.newFixedThreadPool(1);Future<String> future = service.submit(new Callable<String>() {@Overridepublic String call() {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("aaa");return Thread.currentThread().getName() + " - test executor";}});System.out.println(future);System.out.println(future.isDone()); // 查看線程是否結束, 任務是否完成。 call方法是否執行結束 System.out.println(future.get()); // 獲取call方法的返回值。 System.out.println(future.isDone());} }4、Callable
可執行接口。 類似 Runnable 接口。也是可以啟動一個線程的接口。其中定義的方法是 call。call 方法的作用和 Runnable 中的 run 方法完全一致。call 方法有返回值。
接口方法 : Object call();相當于 Runnable 接口中的 run 方法。區別為此方法有返回值。 不能拋出已檢查異常。
Callable、Runnable 接口的選擇:需要返回值或需要拋出異常時,使用 Callable;其他情況可任意選擇。
5、Executors
工具類型。為 Executor 線程池提供工具方法。可以快速的提供若干種線程池。如:固定 容量的,無限容量的,容量為 1 等各種線程池。
線程池是一個進程級的重量級資源。默認的生命周期和 JVM 一致。當開啟線程池后, 直到 JVM 關閉為止,是線程池的默認生命周期。如果手工調用 shutdown 方法,那么線程池 執行所有的任務后,自動關閉。
開始 - 創建線程池。
結束 - JVM 關閉或調用 shutdown 并處理完所有的任務。 類似 Arrays,Collections 等工具類型的功用。
6、FixedThreadPool
容量固定的線程池。活動狀態和線程池容量是有上限的線程池。所有的線程池中,都有 一個任務隊列。使用的是 BlockingQueue<Runnable>作為任務的載體。當任務數量大于線程池容量的時候,沒有運行的任務保存在任務隊列中,當線程有空閑的,自動從隊列中取出任務執行。
使用場景: 大多數情況下,使用的線程池,首選推薦 FixedThreadPool。OS 系統和硬件是有線程支持上限。不能隨意的無限制提供線程池。
線程池默認的容量上限是 Integer.MAX_VALUE。 常見的線程池容量: PC:200。 服務器:1000~10000
線程池容量和并發能力換算關系大約為:并發量= 10*線程池容量 ~ 18*線程池容量。
queued tasks- 任務隊列
completed tasks- 結束任務隊列
/*** 線程池* 固定容量線程池* FixedThreadPool - 固定容量線程池。創建線程池的時候,容量固定。構造的時候,提供線程池最大容量* Executors.newFixedThreadPool(int) -> ExecutorService - 線程池服務類型。所有的線程池類型都實現這個接口。* 實現這個接口,代表可以提供線程池能力。* shutdown - 優雅關閉。 不是強行關閉線程池,回收線程池中的資源。而是不再處理新的任務,將已接收的任務處理完畢后再關閉。* Executors - Executor的工具類。類似Collection和Collections的關系,可以更簡單的創建若干種線程池。*/ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;public class Test_02_FixedThreadPool {public static void main(String[] args) {ExecutorService service =Executors.newFixedThreadPool(5);for (int i = 0; i < 6; i++) {service.execute(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " - test executor");}});}System.out.println(service);service.shutdown();// 是否已經結束, 相當于回收了資源。 System.out.println(service.isTerminated());// 是否已經關閉, 是否調用過shutdown方法 System.out.println(service.isShutdown());System.out.println(service);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}// service.shutdown(); System.out.println(service.isTerminated());System.out.println(service.isShutdown());System.out.println(service);} }執行結果:
java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] false true java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] pool-1-thread-1 - test executor pool-1-thread-2 - test executor pool-1-thread-3 - test executor pool-1-thread-4 - test executor pool-1-thread-5 - test executor pool-1-thread-1 - test executor true true java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]注意:FixedThreadPool實現是基于LinkedBlockingQueue的。
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()); }7、CachedThreadPool
緩存的線程池。容量不限(Integer.MAX_VALUE)。自動擴容。容量管理策略:如果線程 池中的線程數量不滿足任務執行,創建新的線程。每次有新任務無法即時處理的時候,都會 創建新的線程。當線程池中的線程空閑時長達到一定的臨界值(默認 60 秒),自動釋放線程。
默認線程空閑 60 秒,自動銷毀。
應用場景: 內部應用或測試應用。 內部應用,有條件的內部數據瞬間處理時應用,如:
電信平臺夜間執行數據整理:有把握在短時間內處理完所有工作,且對硬件和軟件有足夠的信心。 測試應用:在測試的時候,嘗試得到硬件或軟件的最高負載量,用于提供 FixedThreadPool 容量的指導。
/*** 線程池* 無容量限制的線程池(最大容量默認為Integer.MAX_VALUE)*/ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit;public class Test_05_CachedThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();System.out.println(service);for (int i = 0; i < 5; i++) {service.execute(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " - test executor");}});}System.out.println(service);try {TimeUnit.SECONDS.sleep(65);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(service);} }運行結果:
java.util.concurrent.ThreadPoolExecutor@483bf400[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] java.util.concurrent.ThreadPoolExecutor@483bf400[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0] pool-1-thread-1 - test executor pool-1-thread-2 - test executor pool-1-thread-3 - test executor pool-1-thread-5 - test executor pool-1-thread-4 - test executor 注意:CachedThreadPool實現是基于SynchronousQueue的。 public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>()); }8、ScheduledThreadPool
計劃任務線程池。可以根據計劃自動執行任務的線程池。
scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)
runnable - 要執行的任務。
start_limit - 第一次任務執行的間隔。
limit - 多次任務執行的間隔。
timeunit - 多次任務執行間隔的時間單位。使用場景: 計劃任務時選用(DelaydQueue),如:電信行業中的數據整理,每分鐘整 理,每小時整理,每天整理等。
/*** 線程池* 計劃任務線程池。*/ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;public class Test_07_ScheduledThreadPool {public static void main(String[] args) {ScheduledExecutorService service = Executors.newScheduledThreadPool(3);System.out.println(service);// 定時完成任務。 scheduleAtFixedRate(Runnable, start_limit, limit, timeunit)// runnable - 要執行的任務。service.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());}}, 0, 300, TimeUnit.MILLISECONDS);} }運行結果:
java.util.concurrent.ScheduledThreadPoolExecutor@483bf400[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] pool-1-thread-1 pool-1-thread-1 pool-1-thread-2 pool-1-thread-2 pool-1-thread-2注意:ScheduledThreadPool實現是基于DelayedWorkQueue的。
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue()); }9、SingleThreadExceutor
單一容量的線程池。
使用場景: 保證任務順序時使用。如: 游戲大廳中的公共頻道聊天。秒殺。
運行結果:
java.util.concurrent.Executors$FinalizableDelegatedExecutorService@483bf400 pool-1-thread-1 - test executor pool-1-thread-1 - test executor pool-1-thread-1 - test executor pool-1-thread-1 - test executor pool-1-thread-1 - test executor注意:SingleThreadExceutor實現是基于LinkedBlockingQueue的。
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>())); }10、ForkJoinPool
分支合并線程池(mapduce 類似的設計思想)。適合用于處理復雜任務。 初始化線程容量與 CPU 核心數相關。
線程池中運行的內容必須是 ForkJoinTask 的子類型(RecursiveTask,RecursiveAction)。 ForkJoinPool - 分支合并線程池。 可以遞歸完成復雜任務。要求可分支合并的任務必須是 ForkJoinTask 類型的子類型。其中提供了分支和合并的能力。ForkJoinTask 類型提供了兩個抽象子類型,RecursiveTask 有返回結果的分支合并任務,RecursiveAction無返回結果的分支合并任務。(Callable/Runnable)compute 方法:就是任務的執行邏輯。
ForkJoinPool 沒有所謂的容量。默認都是 1 個線程。根據任務自動的分支新的子線程。 當子線程任務結束后,自動合并。所謂自動是根據 fork 和 join 兩個方法實現的。
應用: 主要是做科學計算或天文計算的。數據分析的。
/*** 線程池* 分支合并線程池。*/ import java.io.IOException; import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask;public class Test_08_ForkJoinPool {final static int[] numbers = new int[1000000];final static int MAX_SIZE = 50000;final static Random r = new Random();static {for (int i = 0; i < numbers.length; i++) {numbers[i] = r.nextInt(1000);}}public static void main(String[] args) throws InterruptedException, ExecutionException, IOException {long result = 0L;for (int i = 0; i < numbers.length; i++) {result += numbers[i];}System.out.println(result);ForkJoinPool pool = new ForkJoinPool();AddTask task = new AddTask(0, numbers.length);Future<Long> future = pool.submit(task);System.out.println(future.get());}static class AddTask extends RecursiveTask<Long> { // RecursiveActionint begin, end;public AddTask(int begin, int end) {this.begin = begin;this.end = end;}// protected Long compute() {if ((end - begin) < MAX_SIZE) {long sum = 0L;for (int i = begin; i < end; i++) {sum += numbers[i];}// System.out.println("form " + begin + " to " + end + " sum is : " + sum);return sum;} else {int middle = begin + (end - begin) / 2;AddTask task1 = new AddTask(begin, middle);AddTask task2 = new AddTask(middle, end);task1.fork();// fork - 就是用于開啟新的任務的。 就是分支工作的。 就是開啟一個新的線程任務。 task2.fork();// join - 合并。將任務的結果獲取。 這是一個阻塞方法。一定會得到結果數據。return task1.join() + task2.join();}}} }11、WorkStealingPool
JDK1.8 新增的線程池。工作竊取線程池。當線程池中有空閑連接時,自動到等待隊列中 竊取未完成任務,自動執行。
初始化線程容量與 CPU 核心數相關。此線程池中維護的是精靈線程。 ExecutorService.newWorkStealingPool ();
12、ThreadPoolExecutor
線程池底層實現。除 ForkJoinPool 外,其他常用線程池底層都是使用 ThreadPoolExecutor實現的。
public ThreadPoolExecutor(int corePoolSize, // 核心容量,創建線程池的時候,默認有多少線程。也是線程池保持的最少線程數。int maximumPoolSize, // 最大容量,線程池最多有多少線程long keepAliveTime, // 生命周期,0為永久。當線程空閑多久后,自動回收。TimeUnit unit, // 生命周期單位,為生命周期提供單位,如:秒,毫秒BlockingQueue<Runnable> workQueue // 任務隊列,阻塞隊列。 注意:泛型必須是 Runnable );使用場景: 默認提供的線程池不滿足條件時使用。如:初始線程數據 4,最大線程數 200,線程空閑周期 30 秒。
/*** 線程池* 模擬固定容量線程池*/ import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;public class Test_09_ThreadPoolExecutor {public static void main(String[] args) {// 模擬fixedThreadPool, 核心線程5個,最大容量5個,線程的生命周期無限。ExecutorService service =new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());for (int i = 0; i < 6; i++) {service.execute(new Runnable() {@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + " - test executor");}});}System.out.println(service);service.shutdown();System.out.println(service.isTerminated());System.out.println(service.isShutdown());System.out.println(service);try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}service.shutdown();System.out.println(service.isTerminated());System.out.println(service.isShutdown());System.out.println(service);} }運行結果:
java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] false true java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0] pool-1-thread-2 - test executor pool-1-thread-1 - test executor pool-1-thread-5 - test executor pool-1-thread-3 - test executor pool-1-thread-4 - test executor pool-1-thread-2 - test executor true true java.util.concurrent.ThreadPoolExecutor@71f2a7d5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]13、性能問題
如下示例為線程池和單線程運算時的性能測試。
/*** 線程池* 固定容量線程池, 簡單應用*/ import java.util.ArrayList; import java.util.List; import java.util.concurrent.*;public class Test_04_ParallelComputingWithFixedThreadPool {public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();computing(1, 200000);long end = System.currentTimeMillis();System.out.println("computing times : " + (end - start));ExecutorService service = Executors.newFixedThreadPool(5);ComputingTask t1 = new ComputingTask(1, 60000);ComputingTask t2 = new ComputingTask(60001, 110000);ComputingTask t3 = new ComputingTask(110001, 150000);ComputingTask t4 = new ComputingTask(150001, 180000);ComputingTask t5 = new ComputingTask(180001, 200000);Future<List<Integer>> f1 = service.submit(t1);Future<List<Integer>> f2 = service.submit(t2);Future<List<Integer>> f3 = service.submit(t3);Future<List<Integer>> f4 = service.submit(t4);Future<List<Integer>> f5 = service.submit(t5);start = System.currentTimeMillis();f1.get();f2.get();f3.get();f4.get();f5.get();end = System.currentTimeMillis();System.out.println("parallel computing times : " + (end - start));}private static List<Integer> computing(Integer start, Integer end) {List<Integer> results = new ArrayList<>();boolean isPrime = true;for (int i = start; i <= end; i++) {for (int j = 1; j < Math.sqrt(i); j++) {if (i % j == 0) {isPrime = false;break;}}if (isPrime) {results.add(i);}isPrime = true;}return results;}static class ComputingTask implements Callable<List<Integer>> {int start, end;public ComputingTask(int start, int end) {this.start = start;this.end = end;}public List<Integer> call() throws Exception {List<Integer> results = new ArrayList<>();boolean isPrime = true;for (int i = start; i <= end; i++) {for (int j = 1; j < Math.sqrt(i); j++) {if (i % j == 0) {isPrime = false;break;}}if (isPrime) {results.add(i);}isPrime = true;}return results;}} }運行結果:
computing times : 9 parallel computing times : 1轉載于:https://www.cnblogs.com/jing99/p/10817449.html
總結
以上是生活随笔為你收集整理的Java并发编程之线程池及示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python反转链表和成对反转
- 下一篇: Directx教程(27) 简单的光照模