高并发编程-自定义简易的线程池(2),体会原理
生活随笔
收集整理的這篇文章主要介紹了
高并发编程-自定义简易的线程池(2),体会原理
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 概述
- 示例
概述
高并發(fā)編程-自定義簡(jiǎn)易的線程池(1),體會(huì)原理 中只實(shí)現(xiàn)了任務(wù)隊(duì)列,我們這里把其余的幾個(gè)也補(bǔ)充進(jìn)來
- 拒絕策略
- 關(guān)閉線程池
- 最小 最大 活動(dòng)線程數(shù)
- …
示例
比較簡(jiǎn)單,直接上代碼,見注釋
package com.artisan.test;import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List;public class SimpleThreadPool extends Thread {// 線程數(shù)private int size;// 隊(duì)列大小private final int queueSize;// 默認(rèn)隊(duì)列大小private final static int DEFAULT_TASK_QUEUE_SIZE = 2000;// 工作線程相關(guān)屬性private static volatile int seq = 0;private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");// 任務(wù)隊(duì)列private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();// 線程隊(duì)列private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();// 拒絕策略private final DiscardPolicy discardPolicy;// 默認(rèn)拒絕策略public final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {throw new DiscardException("Discard This Task.");};//是否銷毀private volatile boolean destroy = false;// 最小線程數(shù)private int min;// 最大線程數(shù)private int max;// 活動(dòng)線程數(shù)private int active;/*** 默認(rèn)無參構(gòu)造函數(shù)*/public SimpleThreadPool() {// 調(diào)用有參構(gòu)造函數(shù)this(4, 8, 12, DEFAULT_TASK_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);}/*** 構(gòu)造函數(shù)* @param min* @param active* @param max* @param queueSize* @param discardPolicy*/public SimpleThreadPool(int min, int active, int max, int queueSize, DiscardPolicy discardPolicy) {this.min = min;this.active = active;this.max = max;this.queueSize = queueSize;this.discardPolicy = discardPolicy;init();}/*** 初始化方法*/private void init() {for (int i = 0; i < this.min; i++) {//createWorkTask();}this.size = min;this.start();}/*** 提交任務(wù)到 TASK_QUEUE* @param runnable*/public void submit(Runnable runnable) {if (destroy)throw new IllegalStateException("The thread pool already destroy and not allow submit task.");synchronized (TASK_QUEUE) {if (TASK_QUEUE.size() > queueSize)discardPolicy.discard();TASK_QUEUE.addLast(runnable);TASK_QUEUE.notifyAll();}}/*** 管理線程池本身的參數(shù)*/@Overridepublic void run() {while (!destroy) {System.out.printf("Pool#Min:%d,Active:%d,Max:%d,Current:%d,QueueSize:%d\n",this.min, this.active, this.max, this.size, TASK_QUEUE.size());try {Thread.sleep(5_000L);if (TASK_QUEUE.size() > active && size < active) {for (int i = size; i < active; i++) {createWorkTask();}System.out.println("The pool incremented to active.");size = active;} else if (TASK_QUEUE.size() > max && size < max) {for (int i = size; i < max; i++) {createWorkTask();}System.out.println("The pool incremented to max.");size = max;}synchronized (THREAD_QUEUE) {if (TASK_QUEUE.isEmpty() && size > active) {System.out.println("=========Reduce========");int releaseSize = size - active;for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator(); it.hasNext(); ) {if (releaseSize <= 0)break;WorkerTask task = it.next();task.close();task.interrupt();it.remove();releaseSize--;}size = active;}}} catch (InterruptedException e) {e.printStackTrace();}}}/*** 創(chuàng)建工作線程*/private void createWorkTask() {WorkerTask task = new WorkerTask(GROUP, THREAD_PREFIX + (seq++));task.start();THREAD_QUEUE.add(task);}/*** 關(guān)閉線程池* @throws InterruptedException*/public void shutdown() throws InterruptedException {while (!TASK_QUEUE.isEmpty()) {Thread.sleep(50);}synchronized (THREAD_QUEUE) {int initVal = THREAD_QUEUE.size();while (initVal > 0) {for (WorkerTask task : THREAD_QUEUE) {if (task.getTaskState() == TaskState.BLOCKED) {task.interrupt();task.close();initVal--;} else {Thread.sleep(10);}}}}System.out.println(GROUP.activeCount());this.destroy = true;System.out.println("The thread pool disposed.");}public int getQueueSize() {return queueSize;}public int getSize() {return size;}public boolean isDestroy() {return this.destroy;}public int getMin() {return min;}public int getMax() {return max;}public int getActive() {return active;}/*** 工作線程的狀態(tài)*/private enum TaskState {FREE, RUNNING, BLOCKED, DEAD}/*** 觸發(fā)決絕策略拋出的異常*/public static class DiscardException extends RuntimeException {public DiscardException(String message) {super(message);}}/*** 拒絕策略接口*/public interface DiscardPolicy {void discard() throws DiscardException;}/*** 工作線程*/private static class WorkerTask extends Thread {private volatile TaskState taskState = TaskState.FREE;public WorkerTask(ThreadGroup group, String name) {super(group, name);}public TaskState getTaskState() {return this.taskState;}public void run() {OUTER:while (this.taskState != TaskState.DEAD) {Runnable runnable;synchronized (TASK_QUEUE) {while (TASK_QUEUE.isEmpty()) {try {taskState = TaskState.BLOCKED;TASK_QUEUE.wait();} catch (InterruptedException e) {System.out.println("Closed.");break OUTER;}}runnable = TASK_QUEUE.removeFirst();}if (runnable != null) {taskState = TaskState.RUNNING;runnable.run();taskState = TaskState.FREE;}}}/*** 關(guān)閉*/public void close() {this.taskState = TaskState.DEAD;}}/*** 測(cè)試* @param args* @throws InterruptedException*/public static void main(String[] args) throws InterruptedException {SimpleThreadPool threadPool = new SimpleThreadPool();for (int i = 0; i < 40; i++) {threadPool.submit(() -> {System.out.println("The runnable be serviced by " + Thread.currentThread() + " start.");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("The runnable be serviced by " + Thread.currentThread() + " finished.");});}Thread.sleep(10000);threadPool.shutdown();/* Thread.sleep(10000);threadPool.shutdown();threadPool.submit(() -> System.out.println("======="));*/} }運(yùn)行日志:
"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=53638:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.test.SimpleThreadPool The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. Pool#Min:4,Active:8,Max:12,Current:4,QueueSize:38 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The pool incremented to active. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. Pool#Min:4,Active:8,Max:12,Current:8,QueueSize:31 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] start. The pool incremented to max. Pool#Min:4,Active:8,Max:12,Current:12,QueueSize:15 The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] start. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-4,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-7,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-6,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-5,5,Pool_Group] finished. Closed. Closed. Closed. Closed. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-0,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-2,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-1,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-3,5,Pool_Group] finished. Closed. Closed. Closed. Closed. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-8,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-10,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-11,5,Pool_Group] finished. The runnable be serviced by Thread[SIMPLE_THREAD_POOL-9,5,Pool_Group] finished. Closed. 4 =========Reduce======== The thread pool disposed. Closed. Closed. Closed.Process finished with exit code 0總結(jié)
以上是生活随笔為你收集整理的高并发编程-自定义简易的线程池(2),体会原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高并发编程-自定义简易的线程池(1),体
- 下一篇: 高并发编程-Wait Set 多线程的“