ExecutorCompletionService 源码分析
概要
在ExecutorService的submit方法中可以獲取返回值,通過Future的get方法,但是這個Future類存在缺陷,Future接口調用get()方法取得處理后的返回結果時具有阻塞性,也就是說調用Future的get方法時,任務沒有執(zhí)行完成,則get方法要一直阻塞等到任務完成為止。 這樣大大的影響了系統(tǒng)的性能,這就是Future的最大缺點。為此,java1.5以后提供了CompletionServlice來解決這個問題。
CompletionService 接口CompletionService的功能是異步的方式,一邊生產任務,一邊處理完成的任務結果,這樣可以將執(zhí)行的任務與處理任務隔離開來進行處理,使用submit執(zhí)行任務,使用塔克獲取已完成的任務,并按照這些任務的完成的時間順序來處理他們的結果。
示例
向ExecutorService 提交一組任務,哪個任務先完成,就把完成任務的返回結果打印出來。
public class CompletionServiceExecutorDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(10);// 同時運行多個任務,那個任務先返回數據,就先獲取該數據CompletionService<String> completionService = new ExecutorCompletionService<String>(threadPool);for (int i = 1; i <= 10; i++) {final int seq = i;completionService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {int waitTime = new Random().nextInt(10);TimeUnit.SECONDS.sleep(waitTime);return "callable:"+seq+" 執(zhí)行時間:"+waitTime+"s";}});}for (int i = 1; i <= 10; i++) {try {Future<String> future = completionService.take();System.out.println(future.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}threadPool.shutdown();} }執(zhí)行結果如下:
callable:6 執(zhí)行時間:1s callable:2 執(zhí)行時間:3s callable:10 執(zhí)行時間:3s callable:1 執(zhí)行時間:4s callable:4 執(zhí)行時間:5s callable:8 執(zhí)行時間:5s callable:7 執(zhí)行時間:7s callable:5 執(zhí)行時間:8s callable:9 執(zhí)行時間:9s callable:3 執(zhí)行時間:9s從打印結果可以看出,這些任務是按照任務執(zhí)行完成的順序打印的,先執(zhí)行完就先返回結果。
ExecutorCompletionService 源碼分析
ExecutorCompletionService 類結構如下
public class ExecutorCompletionService<V> implements CompletionService<V> {private final Executor executor; //線程池private final AbstractExecutorService aes; private final BlockingQueue<Future<V>> completionQueue; //任務完成隊列private class QueueingFuture extends FutureTask<Void> {QueueingFuture(RunnableFuture<V> task) {super(task, null);this.task = task;}protected void done() { completionQueue.add(task); }private final Future<V> task;}ExecutorCompletionService 類中定義了一個QueueingFuture 的內部類,繼承于FutureTask類,內部重寫了FutureTask的done方法,該方法是在FutureTask任務執(zhí)行完成后會調用的方法,在FutureTask中該方法未實現任何邏輯。
重寫done方法,在任務處理完成后把該FutureTask任務放入到阻塞隊列(BlockingQueue)中,然后我們就可以從阻塞隊列中take執(zhí)行完成的任務,進行想用的處理。
這里是實現ExecutorCompletionService的核心邏輯。
newTaskFor 方法
private RunnableFuture<V> newTaskFor(Callable<V> task) {if (aes == null)return new FutureTask<V>(task);elsereturn aes.newTaskFor(task);}private RunnableFuture<V> newTaskFor(Runnable task, V result) {if (aes == null)return new FutureTask<V>(task, result);elsereturn aes.newTaskFor(task, result);}ExecutorCompletionService 支持Callable和Runnable任務
1. 把用戶提交的Callable任務轉成FutureTask。
2. 把用戶提交的Runnable任務轉成FutureTask。
ExecutorCompletionService 構造方法1
public ExecutorCompletionService(Executor executor) {if (executor == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = new LinkedBlockingQueue<Future<V>>();}(aes作用:把用戶提交的Callable和Runnable任務轉換成FutureTask)
ExecutorCompletionService 構造方法2
public ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) {if (executor == null || completionQueue == null)throw new NullPointerException();this.executor = executor;this.aes = (executor instanceof AbstractExecutorService) ?(AbstractExecutorService) executor : null;this.completionQueue = completionQueue;}該構造可以指定一個阻塞隊列,其它功能同上構造方法。
submit方法
public Future<V> submit(Callable<V> task) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task);executor.execute(new QueueingFuture(f));return f;}public Future<V> submit(Runnable task, V result) {if (task == null) throw new NullPointerException();RunnableFuture<V> f = newTaskFor(task, result);executor.execute(new QueueingFuture(f));return f;}該方法可以向ExecutorCompletionService 中提交要執(zhí)行的任務。
支持Callable和Runnable兩種類型的任務。
如果提交的Runnable任務,則執(zhí)行完后返回的結果為null。
從阻塞隊列中獲取執(zhí)行完成的任務的,如果隊列為空且任務沒有全部完成,則阻塞當前線程,直到有任務執(zhí)行完成。
public Future<V> poll() {return completionQueue.poll();}public Future<V> poll(long timeout, TimeUnit unit)throws InterruptedException {return completionQueue.poll(timeout, unit);}}ExecutorCompletionService支持非阻塞方式從阻塞隊列中獲取已完成的任務
1. 可以通過poll方法來從阻塞隊列中獲取任務,如果隊列為空,則直接返回null,不會阻塞當前線程。
2.支持等待多長時間來從阻塞隊列中獲取已經完成的任務。
總結
ExecutorCompletionService的實現原理是內部使用了FutureTask來實現異步的任務執(zhí)行。通過一個內部類繼承FutureTask,并實現了FutureTask的一個done方法。該done方法會在任務執(zhí)行完成之后調用該方法,在任務執(zhí)行完之后把當前的FutureTask放入到阻塞隊列中。這樣就實現了先執(zhí)行完成的任務先存放到阻塞隊列中,應用程序可以從阻塞隊列中提前獲取先執(zhí)行完的任務。
本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點擊這里快速進入簡書
GIT地址:http://git.oschina.net/brucekankan/
點擊這里快速進入GIT
總結
以上是生活随笔為你收集整理的ExecutorCompletionService 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于ReentrantLock发生死锁的
- 下一篇: BufferedInputStream