Java并发编程实战————Executor框架与任务执行
引言
本篇博客介紹通過“執(zhí)行任務(wù)”的機(jī)制來設(shè)計應(yīng)用程序時需要掌握的一些知識。所有的內(nèi)容均提煉自《Java并發(fā)編程實戰(zhàn)》中第六章的內(nèi)容。
大多數(shù)并發(fā)應(yīng)用程序都是圍繞“任務(wù)執(zhí)行”來構(gòu)造的:任務(wù)通常是一些抽象的且離散的工作單元。
當(dāng)圍繞“任務(wù)執(zhí)行”來設(shè)計應(yīng)用程序結(jié)構(gòu)時,第一步,就是要找出清晰的任務(wù)邊界。在理想情況下,各個任務(wù)之間是相互獨(dú)立的:任務(wù)并不依賴于其他任務(wù)的狀態(tài)、結(jié)果或邊界效應(yīng)。
大多數(shù)服務(wù)器應(yīng)用程序提供了一種自然的任務(wù)邊界選擇方式:以獨(dú)立的客戶請求為邊界。
不好的例子:串行與“為每個任務(wù)創(chuàng)建線程”
在書中6.1節(jié),介紹了由最簡單的串行執(zhí)行任務(wù)到為每個任務(wù)創(chuàng)建一個線程這兩種執(zhí)行任務(wù)的方式。應(yīng)該說這兩種方式都是不可取的。
這一節(jié)主要是為了引出下一節(jié)介紹的“任務(wù)執(zhí)行框架”。
其中“串行執(zhí)行任務(wù)”的缺點(diǎn)是在一般的服務(wù)器應(yīng)用程序中,無法提高吞吐率或快速響應(yīng)性。
而“為每個任務(wù)創(chuàng)建線程”的方式的問題在于可能導(dǎo)致:高性能開銷、高資源消耗、影響穩(wěn)定性。
【重點(diǎn)】在工作或面試中也會遇到這個極富針對性的問題,即大量創(chuàng)建線程會存在哪些問題?
1、高性能開銷:創(chuàng)建和銷毀都需要一定的代價,創(chuàng)建過程需要時間,延遲處理請求,也需要jvm和操作系統(tǒng)提供一些輔助操作。
2、高資源消耗:活躍的線程會消耗系統(tǒng)資源,尤其是內(nèi)存。當(dāng)可運(yùn)行的線程數(shù)量多余可用處理器的數(shù)量,那么會有大量空閑的線程占用內(nèi)存,不僅給垃圾回收帶來壓力,在競爭CPU的時候還將產(chǎn)生額外的性能開銷。
3、影響穩(wěn)定性:大量線程占用內(nèi)存,內(nèi)存不足,導(dǎo)致可能拋出OutOfMemoryError,系統(tǒng)崩潰。
線程數(shù)量的限制
書中在這里簡單引出一個概念:穩(wěn)定性。
根據(jù)前后文的聯(lián)系,這里具體指的是:應(yīng)用程序不會因為線程過多而拋出OutOfMemoryError異常。
為了達(dá)到這種穩(wěn)定性,在可創(chuàng)建線程數(shù)量上存在一個限制。這個限制受平臺以及多個因素影響,包括JVM啟動參數(shù)、Thread構(gòu)造函數(shù)中請求的棧大小、底層操作系統(tǒng)對線程的限制等。例如,在32位機(jī)器上,其中一個主要的限制因素是線程棧的地址空間。每個線程都維護(hù)兩個執(zhí)行棧,一個用于Java代碼,另一個用于原生代碼。
通常,JVM在默認(rèn)情況下會生成一個復(fù)合棧,大小約0.5M~1M(這個值可以通過JVM標(biāo)志 -Xss或通過Thread的構(gòu)造函數(shù)來修改),那么:線程數(shù)量 ≈ 2^32(bit) / 0.5(MB) ≈幾千或幾萬。
因此,在一定范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,但如果超出這個范圍,再創(chuàng)建更多的線程只會降低程序的執(zhí)行速度。
Executor接口
public?interface?Executor?{??void?execute(Runnable?command);?? }??Executor是一個非常簡單的接口,只有一個execute(Runnable) 方法,它是其他的靈活且強(qiáng)大的異步任務(wù)框架的基礎(chǔ)。通過這種方式,用Runnable來表示任務(wù),可以將任務(wù)的提交過程與執(zhí)行過程解耦。
Executor本身就是基于生產(chǎn)者消費(fèi)者,提交任務(wù)相當(dāng)于生產(chǎn)者,執(zhí)行任務(wù)相當(dāng)于消費(fèi)者,因此,如果要在程序中實現(xiàn)一個生產(chǎn)者-消費(fèi)者的設(shè)計,那么最簡單的方式通常就是使用Executor。
什么是執(zhí)行策略?
執(zhí)行策略,定義了任務(wù)執(zhí)行的“what、where、when、how”等方面,主要是描述根據(jù)不同的資源而選擇不同的執(zhí)行方式,一個最優(yōu)執(zhí)行策略應(yīng)當(dāng)是與硬件資源最匹配的。
線程池
先來看一下四種常用線程池的創(chuàng)建:
ExecutorService?newFixedThreadPool?=?Executors.newFixedThreadPool(10);?? ExecutorService?newCachedThreadPool?=?Executors.newCachedThreadPool();?? ScheduledExecutorService?newScheduledThreadPool?=?Executors.newScheduledThreadPool(10);?? ExecutorService?newSingleThreadExecutor?=?Executors.newSingleThreadExecutor();其中:ExecutorService extends Executor,ScheduledExecutorService extends ExecutorService 。?
1、newFixedThreadPool(int) :創(chuàng)建一個定額線程池,每提交一個任務(wù)創(chuàng)建一個線程,達(dá)到數(shù)量限制后不再增加,這時線程池的規(guī)模將不再變化(如果某個線程由于發(fā)生了未預(yù)期的異常而結(jié)束,那么線程池會補(bǔ)充一個新的線程)
2、NewCachedThreadPool() : 創(chuàng)建一個可緩存的線程池,線程池的規(guī)模不存在任何限制,當(dāng)線程多余任務(wù)時,回收空閑線程;當(dāng)任務(wù)增加時,創(chuàng)建新線程。
3、NewSingleThreadExecutor:單線程的Executor,如果這個線程異常結(jié)束,會創(chuàng)建另一個線程來替代。NewSingleThreadExecutor能確保依照任務(wù)在隊列中的順序串行執(zhí)行(例如FIFO、LIFO、優(yōu)先級)。
4、NewScheduleThreadPool:創(chuàng)建一個固定長度的線程池,而且以延遲或定時的方式來執(zhí)行任務(wù),類似于Timer。
Executor的生命周期
?JVM只有在所有(非守護(hù))線程全部終止后才會退出,無法正確地關(guān)閉Executor,JVM將無法結(jié)束。
Executor以異步的方式來執(zhí)行任務(wù),導(dǎo)致了提交任務(wù)的狀態(tài)不是立即可見的,即有些任務(wù)可能已經(jīng)完成,有些可能正在執(zhí)行,還有些可能正在隊列中等待執(zhí)行。
ExecutorSevice接口就是為了解決執(zhí)行服務(wù)的生命周期問題,擴(kuò)展了Executor接口。它添加了一些用于聲明周期管理的方法(同時還有一些用于任務(wù)提交的便利方法):
public?interface?ExecutorService?extends?Executor?{??void?shutdown();??List<Runnable>?shutdownNow();??boolean?isShutdown();??boolean?isTerminated();??boolean?awaitTermination(long?timeout,?TimeUnit?unit)??throws?InterruptedException;??//?......其他用于任務(wù)提交的便利方法?? }??這五個方法是聲明周期管理的方法,其余的都是與任務(wù)提交相關(guān)的方法,比如,可以提交比較大的集合Callable對象的方法:
invokeAll(Collection<? extends Callable<T>> tasks)【重點(diǎn)】ExecutorService的三種狀態(tài):運(yùn)行、關(guān)閉、已終止 。
ExecutorService在初始創(chuàng)建時處于運(yùn)行狀態(tài)。shutdown()方法將執(zhí)行平緩的關(guān)閉過程:不再接受新的任務(wù),同時等待已經(jīng)提交的任務(wù)執(zhí)行完成——包括那些還未開始執(zhí)行的任務(wù)。shutdownNow()方法將執(zhí)行粗暴的關(guān)閉方式:它將嘗試取消所有運(yùn)行中的任務(wù),并且不再啟動隊列中尚未開始執(zhí)行的任務(wù)。
延遲任務(wù)與周期任務(wù)
Timer類負(fù)責(zé)管理延遲任務(wù)以及周期任務(wù),但它本身存在缺陷,因此通常要用ScheduleThreadPoolExecutor的構(gòu)造函數(shù)或newScheduleThreadPool工廠方法來創(chuàng)建該類對象。
Timer的缺陷在于,Timer在執(zhí)行所有定時任務(wù)時只會創(chuàng)建一個線程。如果某個任務(wù)的執(zhí)行時間過長,那么將破壞其他TimerTask的定時精確性。
Timer還有一個問題就是,Timer線程不會捕獲異常,當(dāng)TimerTask拋出未檢查異常時將終止定時線程。Timer也不會恢復(fù)線程的執(zhí)行,而是會錯誤地任務(wù)整個Timer都被取消了。這就造成:已經(jīng)被調(diào)度但尚未執(zhí)行的TimerTask將不會再執(zhí)行,新的任務(wù)也不會被調(diào)度。稱之為“線程泄漏”。
【重點(diǎn)】生命周期小結(jié)
Runnable和Callable等任務(wù)的生命周期:創(chuàng)建、提交、開始、完成、取消。
Future表示的就是一個任務(wù)的生命周期。
Thread的生命周期:創(chuàng)建、就緒、運(yùn)行、阻塞、死亡(或結(jié)束)。
ExecutorService的生命周期(因為它繼承自Executor,因此也是Executor的生命周期):創(chuàng)建、運(yùn)行、關(guān)閉、已終止。
Callable與Future
callable
Runnable有一個局限性是沒有返回值,也沒辦法拋出受檢異常。對于某些異步獲得結(jié)果的任務(wù)無法勝任,Callable應(yīng)運(yùn)而生。
它是Runnable的升級版,既可以使用Callable<Void>來達(dá)到Runnable一樣的效果,同時也可以使用Callable<T> 來指定返回結(jié)果。
創(chuàng)建Callable的方式有兩種:構(gòu)造函數(shù)、靜態(tài)的封裝方法。
Callable<String>?callableTask?=?new?Callable<String>()?{??@Override??public?String?call()?throws?Exception?{??return?"this?is?a?callable?task....";??}?? };?Java 8 style:
Callable<String>?callableTask?=?()?->?{??return?"this?is?a?callable?task....";?? };靜態(tài)方法:Executors.callable(Runnable task, T result):
Callable<String>?call?=?Executors.callable(()?->?{System.out.println("this?is?a?runnable?task..."); },?"done!");Future
future表示一個任務(wù)的生命周期。主要提供了一些方法用于判斷任務(wù)處于哪個階段,還可以獲取任務(wù)的結(jié)果甚至是取消任務(wù)。它本身還有一層隱含意義是,任務(wù)的生命周期只能前進(jìn),不能后退,當(dāng)一個任務(wù)處于“完成”狀態(tài),就永遠(yuǎn)停留在“完成”狀態(tài)上。這一點(diǎn)和ExecutorService的生命周期一樣。
Future接口:
public?interface?Future<V>?{??boolean?cancel(boolean?mayInterruptIfRunning);??boolean?isCancelled();??boolean?isDone();??V?get()?throws?InterruptedException,?ExecutionException;??V?get(long?timeout,?TimeUnit?unit)??throws?InterruptedException,?ExecutionException,?TimeoutException;?? }創(chuàng)建Future的方式通常是使用ExecutorService的submit()方法獲取返回值。如果想通過構(gòu)造器的方式顯式地創(chuàng)建一個任務(wù)的生命周期管理對象,可以使用FutureTask。
FutureTask<String>?runnFutureTask?=?new?FutureTask<String>(runnable,?"done!");?? FutureTask<String> callFutureTask = new FutureTask<>(callable);FutureTask類實現(xiàn)了Runnable和Future兩個接口。
(說明:FutureTask是Java 5加入的類,Java 6又為它補(bǔ)充了一個新的RunnableFuture接口,Runnable接口和Future接口被提升到了RunnableFuture接口上,這更像是一種重構(gòu)手段,我個人認(rèn)為在實際開發(fā)中用途可能不及直接使用FutureTask)
由于FutureTask實現(xiàn)了Runnable接口,因此可以將它提交給Executor來執(zhí)行,或者直接調(diào)用它的run方法。
是的,FutureTask的run()方法可以直接執(zhí)行任務(wù),而不需要什么start。
Future.get
get()方法的行為取決于任務(wù)的狀態(tài)(尚未開始、正在運(yùn)行、已完成)如果任務(wù)已經(jīng)完成,那么get會立即返回或拋出一個Exception;如果任務(wù)沒有完成,那么get將阻塞直到任務(wù)完成。如果任務(wù)拋出異常,那么get將該異常封裝成ExecutionException并重新拋出,可以通過getCause來進(jìn)一步獲得被封裝的初始異常。如果任務(wù)被取消,那么get將拋出CancellationException。
異構(gòu)任務(wù)并行化存在的局限
A與B兩個完全不同的任務(wù)通過并行方式可以實現(xiàn)小幅度的性能提升,但是如果想大幅度的提升存在一定的困難。因此,得出一個結(jié)論是,只有當(dāng)大量相互獨(dú)立且同構(gòu)的任務(wù)可以并發(fā)進(jìn)行處理時,才能體現(xiàn)出真正的性能提升。
CompletionService與它的子類ExecutorCompletionService
CompletionService是Executor與BlockingQueue的融合。
回顧一下BlockingQueue的一些特性:
BlockingQueue接口是Queue的子接口,有兩個最主要的實現(xiàn),LinkedBlockingQueue(無界隊列)和ArrayBlockingQueue(有界隊列)。take()或poll()方法都是BlockingQueue的取頭元素的方法,唯一不同的是當(dāng)沒有可用的頭元素時,take會無限期等待(阻塞),poll可以設(shè)置一個超時時間,一旦超時,將返回null。
CompletionService是在任務(wù)執(zhí)行的功能上加入了隊列的特性,很明顯是用于處理一批允許有返回值的任務(wù)。
用法:創(chuàng)建一個CompletionService(ExecutorCompletionService對象)。【ExecutorCompletionService的構(gòu)造器允許我們傳入一個ExecutorService(用于采取不同的執(zhí)行策略)和一個BlockingQueue(該參數(shù)可選,默認(rèn)LinkedBlockingQueue)】然后可以將一組Callable任務(wù)提交給CompletionService來執(zhí)行,然后使用類似隊列操作的take或poll方法來獲取已完成的結(jié)果,這些結(jié)果會在完成時被封裝為Future。
【擴(kuò)展】ExecutorCompletionService的實現(xiàn)很簡單。首先通過構(gòu)造函數(shù)創(chuàng)建一個BlockingQueue來保存計算結(jié)果,然后當(dāng)計算完成時,調(diào)用FutureTask的done方法,放入隊列。展開:當(dāng)提交某個任務(wù)時,該任務(wù)將首先包裝為一個QueueingFuture,這是FutureTask【回顧:FutureTask實現(xiàn)了Future、Runnable】的一個子類,QueueingFuture改寫了FutureTask的done方法——將結(jié)果放入BlockingQueue中。take和poll方法委托給BlockingQueue方法,這些方法會在得到結(jié)果之前阻塞。
為任務(wù)設(shè)置時限
有時候,如果某個任務(wù)無法在指定時間內(nèi)完成,那么將不再需要它的結(jié)果,此時可以放棄這個任務(wù)。Future.get中支持這種需求:當(dāng)結(jié)果可用時,它將立即返回,如果在指定時限內(nèi)沒有計算出結(jié)果,那么拋出TimeoutException。
在使用時限任務(wù)時需要注意,當(dāng)這些任務(wù)超市后應(yīng)該立即停止,從而避免為繼續(xù)計算一個不再使用的結(jié)果而浪費(fèi)計算資源。
【使用Future.get為單個任務(wù)設(shè)置時限,如果希望對一組任務(wù)設(shè)置計算時限,比如前面介紹的CompletionService,那么可以使用poll方法來設(shè)置執(zhí)行時間】
invokeAll方法
ExecutorServie接口中有兩個重載的invokeAll方法:
<T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks)??throws?InterruptedException;?? <T>?List<Future<T>>?invokeAll(Collection<??extends?Callable<T>>?tasks,??long?timeout,?TimeUnit?unit)??throws?InterruptedException;invokeAll方法支持將多個任務(wù)提交到一個ExecutorService并獲得結(jié)果。invokeAll方法的參數(shù)為一組任務(wù),并返回一組Future。invokeAll按照任務(wù)集合中迭代器的順序?qū)⑺械腇uture添加到返回的集合中,從而使調(diào)用者能夠?qū)⒏鱾€Future與其表示的Callable關(guān)聯(lián)起來。
當(dāng)所有任務(wù)都執(zhí)行完畢時,或者調(diào)用線程被中斷時,又或者超過指定時限時,invokeAll都會返回。當(dāng)超過指定時限,任何還未完成的任務(wù)都會取消。當(dāng)invokeAll返回后,每個任務(wù)要么正常完成,要么被取消,而客戶端代碼可以調(diào)用get或isCancelled來判斷究竟是何種情況。
第六章小結(jié)
通過圍繞任務(wù)執(zhí)行來設(shè)計應(yīng)用程序,可以簡化開發(fā)過程,并有助于實現(xiàn)并發(fā)。
Executor框架將任務(wù)提交與執(zhí)行策略解耦開來,同時還支持多種不同類型的執(zhí)行策略。當(dāng)需要創(chuàng)建線程來執(zhí)行任務(wù)時,可以考慮使用Executor。
要想在將應(yīng)用程序分解為不同的任務(wù)時獲得最大的好處,必須定義清晰的任務(wù)邊界。某些應(yīng)用程序中存在著比較明顯的任務(wù)邊界,而在其他一些程序中則需要進(jìn)一步分析才能揭示出粒度更細(xì)的并行性。
總結(jié)
以上是生活随笔為你收集整理的Java并发编程实战————Executor框架与任务执行的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: LeetCode算法入门- Genera
- 下一篇: 制图折断线_CAD制图初学入门之CAD标