java中ThreadPool的介绍和使用
文章目錄
- Thread Pool簡介
- Executors, Executor 和 ExecutorService
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- ForkJoinPool
java中ThreadPool的介紹和使用
Thread Pool簡介
在Java中,threads是和系統的threads相對應的,用來處理一系列的系統資源。不管在windows和linux下面,能開啟的線程個數都是有限的,如果你在java程序中無限制的創建thread,那么將會遇到無線程可創建的情況。
CPU的核數是有限的,如果同時有多個線程正在運行中,那么CPU將會根據線程的優先級進行輪循,給每個線程分配特定的CPU時間。所以線程也不是越多越好。
在java中,代表管理ThreadPool的接口有兩個:ExecutorService和Executor。
我們運行線程的步驟一般是這樣的:1. 創建一個ExecutorService。 2.將任務提交給ExecutorService。3.ExecutorService調度線程來運行任務。
畫個圖來表示:
下面我講一下,怎么在java中使用ThreadPool。
Executors, Executor 和 ExecutorService
Executors 提供了一系列簡便的方法,來幫助我們創建ThreadPool。
Executor接口定義了一個方法:
public interface Executor {/*** Executes the given command at some time in the future. The command* may execute in a new thread, in a pooled thread, or in the calling* thread, at the discretion of the {@code Executor} implementation.** @param command the runnable task* @throws RejectedExecutionException if this task cannot be* accepted for execution* @throws NullPointerException if command is null*/void execute(Runnable command); }ExecutorService繼承了Executor,提供了更多的線程池的操作。是對Executor的補充。
根據接口實現分離的原則,我們通常在java代碼中使用ExecutorService或者Executor,而不是具體的實現類。
我們看下怎么通過Executors來創建一個Executor和ExecutorService:
Executor executor = Executors.newSingleThreadExecutor();executor.execute(() -> log.info("in Executor"));ExecutorService executorService= Executors.newCachedThreadPool();executorService.submit(()->log.info("in ExecutorService"));executorService.shutdown();關于ExecutorService的細節,我們這里就多講了,感興趣的朋友可以參考之前我寫的ExecutorService的詳細文章。
ThreadPoolExecutor
ThreadPoolExecutor是ExecutorService接口的一個實現,它可以為線程池添加更加精細的配置,具體而言它可以控制這三個參數:corePoolSize, maximumPoolSize, 和 keepAliveTime。
PoolSize就是線程池里面的線程個數,corePoolSize表示的是線程池里面初始化和保持的最小的線程個數。
如果當前等待線程太多,可以設置maximumPoolSize來提供最大的線程池個數,從而線程池會創建更多的線程以供任務執行。
keepAliveTime是多余的線程未分配任務將會等待的時間。超出該時間,線程將會被線程池回收。
我們看下怎么創建一個ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor =new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());threadPoolExecutor.submit(()->log.info("submit through threadPoolExecutor"));threadPoolExecutor.shutdown();上面的例子中我們通過ThreadPoolExecutor的構造函數來創建ThreadPoolExecutor。
通常來說Executors已經內置了ThreadPoolExecutor的很多實現,我們來看下面的例子:
ThreadPoolExecutor executor1 =(ThreadPoolExecutor) Executors.newFixedThreadPool(2);executor1.submit(() -> {Thread.sleep(1000);return null;});executor1.submit(() -> {Thread.sleep(1000);return null;});executor1.submit(() -> {Thread.sleep(1000);return null;});log.info("executor1 poolsize {}",executor1.getPoolSize());log.info("executor1 queuesize {}", executor1.getQueue().size());executor1.shutdown();上的例子中我們Executors.newFixedThreadPool(2)來創建一個ThreadPoolExecutor。
上面的例子中我們提交了3個task。但是我們pool size只有2。所以還有一個1個不能立刻被執行,需要在queue中等待。
我們再看一個例子:
ThreadPoolExecutor executor2 =(ThreadPoolExecutor) Executors.newCachedThreadPool();executor2.submit(() -> {Thread.sleep(1000);return null;});executor2.submit(() -> {Thread.sleep(1000);return null;});executor2.submit(() -> {Thread.sleep(1000);return null;});log.info("executor2 poolsize {}", executor2.getPoolSize());log.info("executor2 queue size {}", executor2.getQueue().size());executor2.shutdown();上面的例子中我們使用Executors.newCachedThreadPool()來創建一個ThreadPoolExecutor。 運行之后我們可以看到poolsize是3,而queue size是0。這表明newCachedThreadPool會自動增加pool size。
如果thread在60秒鐘之類沒有被激活,則會被收回。
這里的Queue是一個SynchronousQueue,因為插入和取出基本上是同時進行的,所以這里的queue size基本都是0.
ScheduledThreadPoolExecutor
還有個很常用的ScheduledThreadPoolExecutor,它繼承自ThreadPoolExecutor, 并且實現了ScheduledExecutorService接口。
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService我們看下怎么使用:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(5);executor.schedule(() -> {log.info("Hello World");}, 500, TimeUnit.MILLISECONDS);上面的例子中,我們定義了一個定時任務將會在500毫秒之后執行。
之前我們也講到了ScheduledExecutorService還有兩個非常常用的方法:
- scheduleAtFixedRate - 以開始時間為間隔。
- scheduleWithFixedDelay - 以結束時間為間隔。
ForkJoinPool
ForkJoinPool是在java 7 中引入的新框架,我們將會在后面的文章中詳細講解。 這里做個簡單的介紹。
ForkJoinPool主要用來生成大量的任務來做算法運算。如果用線程來做的話,會消耗大量的線程。但是在fork/join框架中就不會出現這個問題。
在fork/join中,任何task都可以生成大量的子task,然后通過使用join()等待子task結束。
這里我們舉一個例子:
static class TreeNode {int value;Set<TreeNode> children;TreeNode(int value, TreeNode... children) {this.value = value;this.children = Sets.newHashSet(children);} }定義一個TreeNode,然后遍歷所有的value,將其加起來:
public class CountingTask extends RecursiveTask<Integer> {private final TreeNode node;public CountingTask(TreeNode node) {this.node = node;}@Overrideprotected Integer compute() {return node.value + node.children.stream().map(childNode -> new CountingTask(childNode).fork()).mapToInt(ForkJoinTask::join).sum();} }下面是調用的代碼:
public static void main(String[] args) {TreeNode tree = new TreeNode(5,new TreeNode(3), new TreeNode(2,new TreeNode(2), new TreeNode(8)));ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();int sum = forkJoinPool.invoke(new CountingTask(tree));}本文的例子請參考https://github.com/ddean2009/learn-java-concurrency/tree/master/threadPool
更多精彩內容且看:
- 區塊鏈從入門到放棄系列教程-涵蓋密碼學,超級賬本,以太坊,Libra,比特幣等持續更新
- Spring Boot 2.X系列教程:七天從無到有掌握Spring Boot-持續更新
- Spring 5.X系列教程:滿足你對Spring5的一切想象-持續更新
- java程序員從小工到專家成神之路(2020版)-持續更新中,附詳細文章教程
更多教程請參考 flydean的博客
總結
以上是生活随笔為你收集整理的java中ThreadPool的介绍和使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java中的daemon thread
- 下一篇: java 中的fork join框架