谈谈java的线程池(创建、机制)
目錄
Executors創(chuàng)建線程池默認(rèn)方法
自定義線程池
Executors創(chuàng)建線程池默認(rèn)方法
? ? newFixedThreadPool()方法,該方法返回一個(gè)固定數(shù)量的線程池,該方法的線程數(shù)始終不變,當(dāng)有一個(gè)任務(wù)提交時(shí),若線程池中空閑,則立即執(zhí)行,若沒有,則會(huì)被暫緩在一個(gè)任務(wù)隊(duì)列中等待有空閑的線程去執(zhí)行。
? ? newSingleThreadExecutor()方法,創(chuàng)建一個(gè)線程的線程池,若空閑則執(zhí)行,若沒有空閑線程則暫緩在任務(wù)隊(duì)列。
? ?newCachedThreadPool()方法,返回一個(gè)可根據(jù)實(shí)際情況調(diào)整線程個(gè)數(shù)的線程池,不限制最大線程數(shù)量,若有空閑的線程則直接執(zhí)行任務(wù),若無空閑則創(chuàng)建線程,若無任務(wù)則不創(chuàng)建線程。并且每一個(gè)空閑線程會(huì)在60秒后自動(dòng)回收。
? ?newScheduledThreadPool()該方法返回一個(gè)ScheduledExecutorService對(duì)象,但該線程池可以指定線程的數(shù)量,可以定時(shí)。
前三種線程池添加線程: ExecutorService pool = Executors.newSingleThreadExecutor(); pool.execute(new Thread()); //submit和execute的區(qū)別: 第一點(diǎn)是submit可以傳入實(shí)現(xiàn)Callable接口的實(shí)例對(duì)象, 第二點(diǎn)是submit方法有返回值 Future f1 = pool.submit(new Thread()); f1.get()//如果為空,則線程執(zhí)行完畢了,否則會(huì)一直阻塞在這里。 //ScheduledThreadPool相當(dāng)于定時(shí)任務(wù) class Temp extends Thread {public void run() {System.out.println("run");} } public class ScheduledJob {public static void main(String args[]) throws Exception {Temp command = new Temp();ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 5, 1, TimeUnit.SECONDS);//5秒初始化之后執(zhí)行一次,以后每1秒執(zhí)行一次} }其實(shí)底層都是new了ThreadPoolExecutor。
? ? 若Executors工廠類無法滿足我們的需求,可以自己去創(chuàng)建自定義的線程池,其實(shí)Executors工廠類里面的創(chuàng)建線程方法其內(nèi)部實(shí)現(xiàn)均是用了ThreadPoolExecutor這個(gè)類,這個(gè)類可以自定義線程。構(gòu)造方法如下:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {} * corePoolSize: 線程池核心線程數(shù)(線程池初始化存在的線程) * maximumPoolSize:線程池最大數(shù) * keepAliveTime: 空閑線程存活時(shí)間 * unit: 時(shí)間單位 * workQueue: 線程池所使用的緩沖隊(duì)列 * threadFactory:線程池創(chuàng)建線程使用的工廠 * handler: 線程池對(duì)拒絕任務(wù)的處理策略自定義線程池
這個(gè)構(gòu)造方法對(duì)于隊(duì)列是什么類型的比較關(guān)鍵。
使用有界隊(duì)列時(shí):若有新的任務(wù)需要執(zhí)行,如果線程池實(shí)際線程小于corePoolSize,則優(yōu)先創(chuàng)建線程,若大于corePoolSize,則會(huì)將任務(wù)加入隊(duì)列,若隊(duì)列已滿,則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的線程去執(zhí)行新任務(wù),若線程數(shù)大于maximumPoolSize,則執(zhí)行拒絕策略。或其他自定義方式。
使用無界隊(duì)列時(shí):LinkedBlockingQueue。與有界隊(duì)列相比,除非系統(tǒng)資源耗盡,否則無界的任務(wù)隊(duì)列不存在任務(wù)入隊(duì)失敗的情況。當(dāng)有新任務(wù)到來,系統(tǒng)的線程數(shù)小于corePoolSize時(shí),則新建線程執(zhí)行任務(wù)。當(dāng)達(dá)到corePoolSize后,就不會(huì)繼續(xù)增加。若后續(xù)仍有新的任務(wù)加入,而又沒有空閑的線程資源,則任務(wù)直接進(jìn)入隊(duì)列等待。若任務(wù)創(chuàng)建和處理的速度差異很大,無界隊(duì)列會(huì)保持快速增長,直到耗盡系統(tǒng)內(nèi)存。
JDK拒絕策略:
? ? AbortPolicy:直接拋出異常組織系統(tǒng)正常工作。
? ? CallerRunsPolicy:只要線程池未關(guān)閉,該策略直接在調(diào)用者線程中,運(yùn)行當(dāng)前被丟棄的任務(wù)。
? ? DiscardOldestPolicy:丟棄最老的一個(gè)請(qǐng)求,嘗試再次提交當(dāng)前任務(wù)。
? ? DiscardPolicy:丟棄無法處理的任務(wù),不給予任何處理。
//如果需要自定義拒絕策略可以實(shí)現(xiàn)RejectedExecutionHandler接口。 public class MyTask implements Runnable {private int taskId;private String taskName;public MyTask(int taskId, String taskName){this.taskId = taskId;this.taskName = taskName;}public int getTaskId() {return taskId;}public void setTaskId(int taskId) {this.taskId = taskId;}public String getTaskName() {return taskName;}public void setTaskName(String taskName) {this.taskName = taskName;}@Overridepublic void run() {try {System.out.println("run taskId =" + this.taskId);Thread.sleep(5*1000);//System.out.println("end taskId =" + this.taskId);} catch (InterruptedException e) {e.printStackTrace();} }public String toString(){return Integer.toString(this.taskId);} }public class UseThreadPoolExecutor1 {public static void main(String[] args) {/*** 在使用有界隊(duì)列時(shí),若有新的任務(wù)需要執(zhí)行,如果線程池實(shí)際線程數(shù)小于corePoolSize,則優(yōu)先創(chuàng)建線程,* 若大于corePoolSize,則會(huì)將任務(wù)加入隊(duì)列,* 若隊(duì)列已滿,則在總線程數(shù)不大于maximumPoolSize的前提下,創(chuàng)建新的線程,* 若線程數(shù)大于maximumPoolSize,則執(zhí)行拒絕策略。或其他自定義方式。**/ ThreadPoolExecutor pool = new ThreadPoolExecutor(1, //coreSize2, //MaxSize60, //60TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(3) //指定一種隊(duì)列 (有界隊(duì)列)//new LinkedBlockingQueue<Runnable>(), new MyRejected()//, new DiscardOldestPolicy());MyTask mt1 = new MyTask(1, "任務(wù)1");MyTask mt2 = new MyTask(2, "任務(wù)2");MyTask mt3 = new MyTask(3, "任務(wù)3");MyTask mt4 = new MyTask(4, "任務(wù)4");MyTask mt5 = new MyTask(5, "任務(wù)5");MyTask mt6 = new MyTask(6, "任務(wù)6");pool.execute(mt1);pool.execute(mt2);pool.execute(mt3);pool.execute(mt4);pool.execute(mt5);pool.execute(mt6);pool.shutdown();} }public class MyRejected implements RejectedExecutionHandler{public MyRejected(){}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("自定義處理..");System.out.println("當(dāng)前被拒絕任務(wù)為:" + r.toString());} } public class UseThreadPoolExecutor2 implements Runnable{private static AtomicInteger count = new AtomicInteger(0);@Overridepublic void run() {try {int temp = count.incrementAndGet();System.out.println("任務(wù)" + temp);Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception{//System.out.println(Runtime.getRuntime().availableProcessors());BlockingQueue<Runnable> queue =//new LinkedBlockingQueue<Runnable>();new ArrayBlockingQueue<Runnable>(10);ExecutorService executor = new ThreadPoolExecutor(5, //core10, //max無界隊(duì)列的這個(gè)參數(shù)其實(shí)沒啥作用了120L, //2fenzhongTimeUnit.SECONDS,queue);for(int i = 0 ; i < 20; i++){executor.execute(new UseThreadPoolExecutor2());}Thread.sleep(1000);System.out.println("queue size:" + queue.size()); //10Thread.sleep(2000);} }線程池的方法
向線程池提交任務(wù)
execute()
execute()方法用于提交不需要返回值的任務(wù)Runnable,所以無法判斷任務(wù)是否被線程池執(zhí)行成功。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*; import java.util.stream.IntStream;@Slf4j public class ExecuteDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());IntStream.rangeClosed(1, 3).forEach(i -> threadPoolExecutor.execute(() -> log.info("Task {} working...", i))); // 提交任務(wù)threadPoolExecutor.shutdown(); // 關(guān)閉線程池} }運(yùn)行結(jié)果如下:
2020-10-13 14:58:09,500 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 1 working... 2020-10-13 14:58:09,502 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 2 working... 2020-10-13 14:58:09,502 INFO [pool-1-thread-1] (ExecuteDemo.java:12) - Task 3 working...submit()
submit()方法用于提交需要返回值的任務(wù)。線程池會(huì)返回一個(gè)future類型的對(duì)象,通過這個(gè)future對(duì)象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會(huì)阻塞當(dāng)前線程直到任務(wù)完成,get()方法也支持帶超時(shí)時(shí)間的阻塞。
運(yùn)行結(jié)果如下:
2020-10-13 15:03:22,306 INFO [pool-1-thread-1] (SubmitDemo.java:14) - Task begin... 2020-10-13 15:03:25,315 INFO [pool-1-thread-1] (SubmitDemo.java:20) - Task end. 2020-10-13 15:03:25,316 INFO [main] (SubmitDemo.java:27) - get result:task resultinvokeAll()
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.List; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream;@Slf4j public class InvokeAllDemo {public static void main(String[] args) throws InterruptedException, ExecutionException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());List<Callable<String>> tasks = IntStream.rangeClosed(1, 3).boxed().map(i -> (Callable<String>) () -> "task " + i).collect(Collectors.toList());List<Future<String>> futures = threadPoolExecutor.invokeAll(tasks);// 批量提交任務(wù)for (Future<String> future : futures) {log.info("result:{}", future.get()); // 阻塞獲取結(jié)果}threadPoolExecutor.shutdown();} }運(yùn)行結(jié)果如下:
2020-10-13 16:09:00,518 INFO [main] (InvokeAllDemo.java:25) - result:task 1 2020-10-13 16:09:00,520 INFO [main] (InvokeAllDemo.java:25) - result:task 2 2020-10-13 16:09:00,520 INFO [main] (InvokeAllDemo.java:25) - result:task 3invokeAny()
invokeAny批量提交任務(wù),哪個(gè)任務(wù)先執(zhí)行完畢,立刻返回此任務(wù)執(zhí)行結(jié)果,其它任務(wù)取消。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.List; import java.util.Random; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream;@Slf4j public class InvokeAnyDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());List<Callable<String>> tasks = IntStream.rangeClosed(1, 3).boxed().map(i -> (Callable<String>) () -> {log.info("task{} begin ...", i);TimeUnit.SECONDS.sleep(new Random().nextInt(10));log.info("task{} end .", i);return "task" + i;}).collect(Collectors.toList());String result = threadPoolExecutor.invokeAny(tasks); // 哪個(gè)任務(wù)執(zhí)行完就立馬返回,其他任務(wù)直接中斷l(xiāng)og.info("result:{}", result); // 阻塞獲取結(jié)果threadPoolExecutor.shutdown();} }運(yùn)行結(jié)果如下:
2020-10-13 16:09:35,121 INFO [pool-1-thread-1] (InvokeAnyDemo.java:21) - task1 begin ... 2020-10-13 16:09:35,121 INFO [pool-1-thread-2] (InvokeAnyDemo.java:21) - task2 begin ... 2020-10-13 16:09:35,121 INFO [pool-1-thread-3] (InvokeAnyDemo.java:21) - task3 begin ... 2020-10-13 16:09:38,125 INFO [pool-1-thread-3] (InvokeAnyDemo.java:23) - task3 end . 2020-10-13 16:09:38,125 INFO [main] (InvokeAnyDemo.java:29) - result:task3線程池的關(guān)閉
shutdown()
shutdown()方法會(huì)先將線程池的狀態(tài)改為SHUTDOWN,然后中斷空閑的線程,不會(huì)再接收新任務(wù),但已提交任務(wù)會(huì)執(zhí)行完。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.Random; import java.util.concurrent.*; import java.util.stream.IntStream;/*** 線程池的關(guān)閉之shutdown()*/ @Slf4j public class ShutdownDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());IntStream.rangeClosed(1, 10).forEach(i -> threadPoolExecutor.submit(() -> {log.info("task{} begin ...", i);try {TimeUnit.SECONDS.sleep(new Random().nextInt(3));} catch (InterruptedException e) {e.printStackTrace();}log.info("task{} end .", i);}));threadPoolExecutor.shutdown();threadPoolExecutor.submit(() -> log.info("submit task after shutdown"));} }運(yùn)行結(jié)果如下:
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@18769467 rejected from java.util.concurrent.ThreadPoolExecutor@46ee7fe8[Shutting down, pool size = 2, active threads = 2, queued tasks = 8, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)at com.morris.concurrent.threadpool.threadpoolexecutor.api.ShutdownDemo.main(ShutdownDemo.java:30) 2020-10-13 16:54:42,637 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task1 begin ... 2020-10-13 16:54:42,637 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task2 begin ... 2020-10-13 16:54:43,639 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task2 end . 2020-10-13 16:54:43,639 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task1 end . 2020-10-13 16:54:43,640 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task3 begin ... 2020-10-13 16:54:43,640 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task4 begin ... 2020-10-13 16:54:43,641 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task4 end . 2020-10-13 16:54:43,641 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task5 begin ... 2020-10-13 16:54:44,642 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task5 end . 2020-10-13 16:54:44,642 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task6 begin ... 2020-10-13 16:54:45,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task3 end . 2020-10-13 16:54:45,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task7 begin ... 2020-10-13 16:54:46,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task7 end . 2020-10-13 16:54:46,642 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task6 end . 2020-10-13 16:54:46,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task8 begin ... 2020-10-13 16:54:46,642 INFO [pool-1-thread-1] (ShutdownDemo.java:19) - task9 begin ... 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task8 end . 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:19) - task10 begin ... 2020-10-13 16:54:47,642 INFO [pool-1-thread-2] (ShutdownDemo.java:25) - task10 end . 2020-10-13 16:54:48,643 INFO [pool-1-thread-1] (ShutdownDemo.java:25) - task9 end .shutdownNow()
shutdownNow()會(huì)先將會(huì)先將線程池的狀態(tài)改為STOP,不會(huì)再接收新任務(wù),然后中斷所有的線程,如果執(zhí)行中的任務(wù)沒有對(duì)中斷進(jìn)行處理,那么這個(gè)任務(wù)將會(huì)繼續(xù)執(zhí)行直至完成,如果執(zhí)行中的任務(wù)對(duì)中斷進(jìn)行的處理,那么將按中斷進(jìn)行執(zhí)行,最后方法會(huì)返回等待隊(duì)列中未執(zhí)行的任務(wù)。
?
運(yùn)行結(jié)果如下:
2020-10-13 16:55:08,985 INFO [main] (ShutdownNowDemo.java:30) - tasks size:8 2020-10-13 16:55:08,987 INFO [pool-1-thread-2] (ShutdownNowDemo.java:20) - task2 begin ... 2020-10-13 16:55:08,987 INFO [pool-1-thread-1] (ShutdownNowDemo.java:20) - task1 begin ... 2020-10-13 16:55:08,988 INFO [pool-1-thread-2] (ShutdownNowDemo.java:26) - task2 end . 2020-10-13 16:55:08,989 INFO [pool-1-thread-1] (ShutdownNowDemo.java:26) - task1 end .awaitTermination()
調(diào)用shutdown()后,調(diào)用線程并不會(huì)等待所有任務(wù)運(yùn)行結(jié)束,可以利用awaitTermination()方法設(shè)置一個(gè)超時(shí)時(shí)間進(jìn)行阻塞等待,此方法返回了有兩種情況,要么所有的任務(wù)執(zhí)行完成,要么超時(shí)時(shí)間到了。
package com.morris.concurrent.threadpool.threadpoolexecutor.api;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /*** threadPoolExecutor.awaitTermination()的使用*/ @Slf4j public class AwaitTerminationDemo {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());threadPoolExecutor.submit(() -> {log.info("task begin ...");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}log.info("task end .");});threadPoolExecutor.shutdown(); // 關(guān)閉線程池threadPoolExecutor.awaitTermination(10, TimeUnit.SECONDS);log.info("main thread exit");} }運(yùn)行結(jié)果如下:
2020-10-13 17:01:45,397 INFO [pool-1-thread-1] (AwaitTerminationDemo.java:16) - task begin ... 2020-10-13 17:01:48,398 INFO [pool-1-thread-1] (AwaitTerminationDemo.java:22) - task end . 2020-10-13 17:01:48,398 INFO [main] (AwaitTerminationDemo.java:27) - main thread exit創(chuàng)建線程的工廠ThreadFactory
可以自定義ThreadFactory為線程池中的線程取一個(gè)有意義的名字,方便后面快速定位問題。
package com.morris.concurrent.threadpool.threadpoolexecutor.construct;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** 使用ThreadFactory自定義線程名*/ @Slf4j public class ThreadFactoryDemo {public static void main(String[] args) {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,new LinkedBlockingQueue<>(), new ThreadFactory() {private AtomicInteger idx = new AtomicInteger(1);private static final String THREAD_NAME_PREFIX = "mythread-pool-";@Overridepublic Thread newThread(Runnable r) {return new Thread(r, THREAD_NAME_PREFIX + idx.getAndIncrement());}});threadPoolExecutor.submit(()->{log.info("task begin ...");try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}log.info("task end .");});threadPoolExecutor.shutdown();} }?
總結(jié)
以上是生活随笔為你收集整理的谈谈java的线程池(创建、机制)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谈谈java的并发容器、Queue
- 下一篇: 谈谈java并发锁(重入锁、读写锁、公平