手写Java线程池_超详细解说_绝对能运行_代码超详细注释
線程池
問題背景
只是單純使用 new Thread(runnable).start(); 的方式創(chuàng)建線程, 將會導致嚴重的程序性能問題: 1.線程創(chuàng)建, 銷毀需要消耗很大的系統(tǒng)資源; 2.虛擬機創(chuàng)建線程的數量是有限的; 2.線程調度切換也將使程序性能下降; 針對這些問題, 對線程數量進行管理, 有效地重復利用線程, 將會很好地提高程序性能.
線程池原理
使用隊列創(chuàng)建一定數量的線程, 當有任務的時候, 使用隊列中線程執(zhí)行任務(如果任務過多, 就將其放入任務隊列, 進入等待執(zhí)行狀態(tài)), 任務執(zhí)行完就自動回收線程隊列中的線程(任務過少或者任務數量小于線程數量, 超出的線程將會銷毀, 做到線程隊列具有伸縮性);
根據上面描述, 我們自己的線程池將具有一下特點:
1.內部使用隊列來管理線程, 管理提交的任務.
2.控制線程數量, 做到線程隊列具有良好的伸縮性.
3.當任務數過多, 或者任務隊列已經飽和, 將使用任務拒絕策略, 告訴對應的任務提交者.
4.使用線程工廠定制線程隊列中, 每個線程的名字, 狀態(tài), 是否為守護線程等等.
線程池類圖結構
任務隊列, 隊列使用limit限制提交任務的大小, 實現RunnableQueue接口(RunnableQueue接口負責: 1.接收用戶提交的任務; 2.獲取任務隊列中的任務; 3.查看任務隊列大小), LinkedRunnableQueue實現RunnableQueue中的方法, 并且針對用戶提交不同的任務以及線程池種類(ThreadPool)的不同, 決定是否執(zhí)行拒絕策略(拒絕策略具有多個, 拒絕方式取決于用戶自定義, 在線程池內部具有默認的拒絕策略實現);
實現Runnable接口, 在run方法中獲取RunnableQueue中的任務, 然后執(zhí)行RunnQueue中的任務, InternalTask中的run方法是一個while循環(huán)循環(huán)結束條件取決于是否關閉該線程(關閉線程據需要設置flag變量, 當flage為false, 線程run方法結束, 自動結束生命), 而不是當前用戶提交的任務是否執(zhí)行完!!!; InternalTask主要是對RunnableQueue的一種封裝; stop方法主要是設置線程flag(flag主要判斷當前線程是否關閉)
線程池原型:
1.實現Runnable接口(此處注明, BasicThreadPool是繼承Thread, 但是Thread內容太多了不能很好地在UML圖中顯示, 所以我就把他刪除了, 只留下了實現Runnable接口), 因為線程池自身執(zhí)行也需要一個線程, 所以繼承Thread, 這樣可以在BasicThreadPool的構造方法中執(zhí)行start(), run方法中執(zhí)行創(chuàng)建線程的操作(線程池內部執(zhí)行任務的線程); 創(chuàng)建線程取決于線程池設置的最大線程數, 核心線程數, 初始化線程數, 用戶提交的任務數;
2.實現ThreadPool接口(該接口主要用于定義線程池的基本操作, 比如執(zhí)行任務, 獲取線程池的一些基本屬性) ;
3.內部具有2個內部類(ThreadTask負責對InternalTask進行封裝, DefaultThreadFactory主要定義默認的線程創(chuàng)建方式), 不同的線程池中擁有不同的默認創(chuàng)建方式, 因此將線程創(chuàng)建方式設置為內部類;
4.在BasicThreadPool中使用newThread方法創(chuàng)建線程(這些線程用于執(zhí)行ThreadTask中的任務);
5.線程池原型中具有2個隊列, 第一個是剛才上面提的RunnQueue(負責執(zhí)行的任務), 第二個是ThreadQueue(負責存儲創(chuàng)建的每一個線程, 使用ArrayQueue實現, 這樣很好地維護管理了線程, 做到資源重用)
6.removeThread方法: 刪除多余的線程, 當用戶提交的任務數量小于線程池中創(chuàng)建的線程數量, 那么就刪除一定數量的線程, 這樣才不會浪費線程資源.
7.在構造方法中設置基本屬性, 以及當前線程池的拒絕策略.
每個接口, 類的詳細定義
ThreadPool(interface 定義線程池基本操作)
package com.concurrent.customthreadpool;/*** 線程池接口* @author regotto*/ public interface ThreadPool {/*** 執(zhí)行提交的Runnable任務* @param runnable*/void execute(Runnable runnable);/*** 關閉線程池*/void shutdown();/*** 獲得線程池初始化大小* @return initSize*/int getInitSize();/*** 獲得線程池最大線程數* @return maxSize*/int getMaxSize();/*** 獲取線程池核心線程數* @return coreSize*/int getCoreSize();/*** 獲取線程池中用于緩存任務隊列的大小* @return queueSize*/int getQueueSize();/*** 獲取線程池中國活躍的線程數量* @return activeCount*/int getActiveCount();/*** 查看線程池是否shutdown* @return boolan*/boolean isShutdown();}RunnableQueue(interface 任務隊列)
package com.concurrent.customthreadpool;/*** 存放提交的Runnable, 使用BlockedQueue, 設置limit* @author regotto*/ public interface RunnableQueue {/*** 緩存提交到線程池中的任務* @param runnable*/void offer(Runnable runnable);/*** 從緩存中獲取Runnable任務* 如果沒有任務, 調用者線程掛起, 在某些特定的時候拋出中斷異常* @throws InterruptedException* @return runnable*/Runnable take() throws InterruptedException;/*** 緩沖區(qū)大小* @return size*/int size();}LinkedRunnableQueue(class 對RunnableQueue的封裝, 用戶提交任務, 線程執(zhí)行任務, 此過程使用生產者-消費者模式實現)
package com.concurrent.customthreadpool;import java.util.LinkedList;/*** 線程池的內部線程隊列, 緩沖區(qū)* @author regotto*/ public class LinkedRunnableQueue implements RunnableQueue{/*** limit: 限制當前runnableList中還能存放多少內容* denyPolicy: 拒絕策略* runnableList: 存放runnable的緩沖區(qū)* threadPool: 線程池*/private final int limit;private final RunnableDenyPolicy denyPolicy;private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunnableQueue(int limit, RunnableDenyPolicy denyPolicy, ThreadPool threadPool){this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;}@Overridepublic void offer(Runnable runnable) {synchronized (runnableList) {if (runnableList.size() >= limit) {//用戶提交的任務大于限制條件, 執(zhí)行對應的拒絕策略System.out.println(runnableList.size() + " >= " + limit + " execute deny policy");denyPolicy.reject(runnable, threadPool);} else {//添加任務到任務隊列尾部, 有任務存在, 喚醒剛才wait的線程runnableList.addLast(runnable);runnableList.notifyAll();}}}@Overridepublic Runnable take() throws InterruptedException{synchronized (runnableList) {while (runnableList.isEmpty()) {try {//從RunnableQueue中取出任務, 如果任務為空, 使當前線程waitrunnableList.wait();} catch (InterruptedException e) {throw e;}}//移除任務緩沖區(qū)的第一個return runnableList.removeFirst();}}@Overridepublic int size() {synchronized (runnableList) {return runnableList.size();}} }InternalTask(class 對RunnableQueue中任務的執(zhí)行)
package com.concurrent.customthreadpool;/*** 用于線程池內部, 獲取runnableQueue中的runnable* @author regotto*/ public class InternalTask implements Runnable {private final RunnableQueue runnableQueue;private volatile boolean running = true;public InternalTask(RunnableQueue runnableQueue){this.runnableQueue = runnableQueue;}@Overridepublic void run() {//如果線程沒有關閉, 就讓該線程死循環(huán), 處理每一個提交的任務while (running && !Thread.currentThread().isInterrupted()){try {//處于中斷時候的線程不做處理//獲取RunnableQueue中任務, 然后執(zhí)行Runnable take = runnableQueue.take();System.out.println("runnableQueue.take(): " + take.toString());take.run();} catch (InterruptedException e) {running = false;break;}}}/*** 停止當前任務, 設置其running為false, 在shutdown中處理*/public void stop(){this.running = false;}}RunnableDenyPolicy(interface 任務拒絕策略)
package com.concurrent.customthreadpool;/*** 當任務數提交超過緩沖區(qū)limit, 執(zhí)行對應的任務拒絕策略* @author regotto*/ @FunctionalInterface public interface RunnableDenyPolicy {/*** 對提交到threadPool的runnable是否執(zhí)行reject* @param runnable* @param threadPool*/void reject(Runnable runnable, ThreadPool threadPool);/*** 該策略使用空方法直接丟棄任務*/class DiscardDenyPolicy implements RunnableDenyPolicy {@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {System.out.println(runnable + "不做處理");}}/*** 該策略拋出一個RunnableDenyException*/class AbortDenyPolicy implements RunnableDenyPolicy {@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {throw new RunnableDenyException("The" + runnable + "will be abort");}}/***該策略Runnable給提交者所在的線程中運行, 不加入到線程中*/class RunnerDenyPolicy implements RunnableDenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {if (threadPool.isShutdown()) {runnable.run();}}}}RunnableDenyException(class 處理RunnableDenyPolicy拋出的運行時異常)
package com.concurrent.customthreadpool;public class RunnableDenyException extends RuntimeException {public RunnableDenyException(String message) {super(message);} }ThreadFactory(interface 定義創(chuàng)建線程的接口)
package com.concurrent.customthreadpool;/*** 創(chuàng)建線程接口, 定制線程屬于哪一個group, 是否為守護線程, 優(yōu)先級, 名字等* @author regotto*/ public interface ThreadFactory {/*** 創(chuàng)建定制化線程* @param runnable* @return thread*/Thread createThread(Runnable runnable);}BasicThreadPool(class 線程池的實現)
package com.concurrent.customthreadpool;import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger;/*** 默認的自定義的線程池, 內部使用Queue進行維護* @author regotto*/ public class BasicThreadPool extends Thread implements ThreadPool{/**initSize: 初始化線程數* maxSize: 線程池最大線程數* coreSize: 線程核心數* activeCount: 當前活躍線程數* threadFactory: 線程工廠, 配置線程創(chuàng)建需要的參數* runnableQueue: 任務隊列* isShutdown: 是否關閉線程池* threadQueue: 工作線程隊列* DEFAULT_THREAD_FACTORY: 默認的線程工廠* keepAliveTime: 線程存活時間*/private final int initSize;private final int maxSize;private final int coreSize;private int activeCount;private final ThreadFactory threadFactory;private final RunnableQueue runnableQueue;private volatile boolean isShutdown = false;private Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static RunnableDenyPolicy DEFAULT_DENY_POLICY = new RunnableDenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final static long DEFAULT_KEEP_ALIVE_TIME = 10;private final long keepAliveTime;private final TimeUnit timeUnit;public BasicThreadPool(int initSize, int maxSize, int coreSize, int queueSize) {this(initSize, maxSize, coreSize, DEFAULT_THREAD_FACTORY, queueSize, DEFAULT_DENY_POLICY,DEFAULT_KEEP_ALIVE_TIME, TimeUnit.SECONDS);}public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory, int queueSize,RunnableDenyPolicy denyPolicy, long keepAliveTime, TimeUnit timeUnit) {this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunnableQueue(queueSize, denyPolicy, this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}/*** 初始化線程池, 創(chuàng)建initThread*/private void init() {start();for (int i = 0; i < initSize; ++i) {newThread();}}/*** 創(chuàng)建線程添加到線程隊列, 然后用該線程執(zhí)行ThreadTask任務(層層封裝, 封裝用戶提交的任務)*/private void newThread() {InternalTask internalTask = new InternalTask(runnableQueue);//使用自定義的線程工廠創(chuàng)建線程Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread, internalTask);System.out.println(threadTask.thread.getName() + "被添加");//添加到線程隊列threadQueue.offer(threadTask);this.activeCount++;//被添加后的線程執(zhí)行startthread.start();}@Overridepublic void execute(Runnable runnable) {if (this.isShutdown) {throw new IllegalStateException("The thread pool id destroy");}//將用戶提交的任務放到runnableQueue中, 等待線程隊列中線程執(zhí)行this.runnableQueue.offer(runnable);}private void removeThread() {//ArrayDeque的remove就是removeFirstThreadTask threadTask = threadQueue.remove();//設置當前線程flag, 在InternalTask中跳出循環(huán)自動結束線程生命threadTask.internalTask.stop();this.activeCount--;}@Overridepublic void run() {while (!isShutdown && !isInterrupted()) {try {timeUnit.sleep(keepAliveTime);} catch (InterruptedException e) {isShutdown = true;break;}synchronized (this) {if (isShutdown) {break;}//當前隊列中有任務還沒有處理, 且activeCount < coreSizeif (runnableQueue.size() > 0 && activeCount < coreSize) {//此處i曾寫做i=0,導致多創(chuàng)建了一個線程,在沒有任務的時候該線程一直保持wait//因為關閉pool,該線程沒有add到threadQueue,導致Interrupt失敗,最終導致線程一直運行中for (int i = initSize; i < coreSize; ++i) {newThread();}//防止后面的if判斷創(chuàng)建線程數超過coreSize, 在coreSize還沒有滿的時候, 只執(zhí)行當前的ifcontinue;}//當上面if中創(chuàng)建的線程數不足的時候, 就擴大線程池線程數, 直到maxSizeif (runnableQueue.size() > 0 && activeCount < maxSize) {for (int i = coreSize; i < maxSize; ++i) {newThread();}}//當沒有任務, 但是activeCount線程數超出coreSize大小, 回收超出coreSize的線程if (runnableQueue.size() == 0 && activeCount > coreSize) {for (int i = coreSize; i < activeCount; ++i) {removeThread();}}}}}@Overridepublic void shutdown() {synchronized (this) {if (!isShutdown) {isShutdown = true;System.out.println("threadQueue size:" + threadQueue.size());threadQueue.forEach(threadTask -> {//調用internalTask中stop, 設置當前線程運行標志為falsethreadTask.internalTask.stop();//設置線程中斷狀態(tài)threadTask.thread.interrupt();System.out.println(threadTask.thread.getName());});System.out.println("threadQueue中線程已經關閉");//當前線程池自己也要關閉this.interrupt();}}}@Overridepublic int getInitSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.initSize;}@Overridepublic int getMaxSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.maxSize;}@Overridepublic int getCoreSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.coreSize;}@Overridepublic int getQueueSize() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.runnableQueue.size();}@Overridepublic int getActiveCount() {if (isShutdown) {throw new IllegalStateException("The thread pool is destroy");}return this.activeCount;}@Overridepublic boolean isShutdown() {return this.isShutdown;}/*** 內部類, 定義自己默認的線程工廠*/private static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);//設置線程組private static final ThreadGroup GROUP = new ThreadGroup("MyThreadPool-" + GROUP_COUNTER.getAndDecrement());private static final AtomicInteger COUNTER = new AtomicInteger(0);@Overridepublic Thread createThread(Runnable runnable) {//創(chuàng)建定制化的線程return new Thread(GROUP, runnable, " thread-pool-" + COUNTER.getAndDecrement());}}/*** 封裝InternalTask, 與每次創(chuàng)建的線程綁定在一起*/private class ThreadTask {Thread thread;InternalTask internalTask;ThreadTask(Thread thread, InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}} }ThreadPoolTest(class 測試類)
package com.concurrent.customthreadpool;import java.util.concurrent.TimeUnit;/*** 用于測試自定的線程池* @author regotto*/ public class ThreadPoolTest {public static void main(String[] args) {//初始化線程數:2, 最大線程數:6, 核心線程數:4, 任務隊列大小:1000final BasicThreadPool threadPool = new BasicThreadPool(2, 6, 4, 1000);//創(chuàng)建20個任務提交進行執(zhí)行for (int i = 0; i < 20; ++i) {threadPool.execute(() -> {try {TimeUnit.SECONDS.sleep(3);System.out.println(Thread.currentThread().getName() + "is running and done.");} catch (InterruptedException e) {e.printStackTrace();}});}//此處用于測試線程池運行時基本信息狀態(tài) // for (int j = 0; j < 1000; ++j) { // System.out.println("getActiveCount: " + threadPool.getActiveCount()); // System.out.println("getQueueSize: " + threadPool.getQueueSize()); // System.out.println("getCoreSize: " + threadPool.getCoreSize()); // System.out.println("getMaxSize: " + threadPool.getMaxSize()); // try { // TimeUnit.SECONDS.sleep(3); // } catch (InterruptedException e) { // e.printStackTrace(); // } // }try {TimeUnit.SECONDS.sleep(25);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("shutdown");//測試線程池shutdown功能threadPool.shutdown();} }總結
上面有錯, 還請指出, 如果認為我寫的還不錯, 還請點個贊, 多多支持一下, O(∩_∩)O~~
總結
以上是生活随笔為你收集整理的手写Java线程池_超详细解说_绝对能运行_代码超详细注释的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sql 整改措施 注入_SQL注入的漏洞
- 下一篇: 台达n2系列变频器_台达变频器C2000