當我們通過Executor提交一組并發執行的任務,并且希望在每一個任務完成后能立即得到結果,有兩種方式可以采取:
?
方式一:
通過一個list來保存一組future,然后在循環中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,那么在調用get方式時,需要將超時時間設置為0?
Java代碼 ?
public?class?CompletionServiceTest?{????????static?class?Task?implements?Callable<String>{??????????private?int?i;????????????????????public?Task(int?i){??????????????this.i?=?i;??????????}????????????@Override??????????public?String?call()?throws?Exception?{??????????????Thread.sleep(10000);??????????????return?Thread.currentThread().getName()?+?"執行完任務:"?+?i;??????????}?????????}????????????public?static?void?main(String[]?args){??????????testUseFuture();??????}????????????private?static?void?testUseFuture(){??????????int?numThread?=?5;??????????ExecutorService?executor?=?Executors.newFixedThreadPool(numThread);??????????List<Future<String>>?futureList?=?new?ArrayList<Future<String>>();??????????for(int?i?=?0;i<numThread;i++?){??????????????Future<String>?future?=?executor.submit(new?CompletionServiceTest.Task(i));??????????????futureList.add(future);??????????}????????????????????????????while(numThread?>?0){??????????????for(Future<String>?future?:?futureList){??????????????????String?result?=?null;??????????????????try?{??????????????????????result?=?future.get(0,?TimeUnit.SECONDS);??????????????????}?catch?(InterruptedException?e)?{??????????????????????e.printStackTrace();??????????????????}?catch?(ExecutionException?e)?{??????????????????????e.printStackTrace();??????????????????}?catch?(TimeoutException?e)?{??????????????????????//超時異常直接忽略??????????????????}??????????????????if(null?!=?result){??????????????????????futureList.remove(future);??????????????????????numThread--;??????????????????????System.out.println(result);??????????????????????//此處必須break,否則會拋出并發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決)??????????????????????break;??????????????????}??????????????}??????????}??????}??}?? ?方式二:
第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到代碼最簡化的效果。
Java代碼 ?
public?class?CompletionServiceTest?{????????static?class?Task?implements?Callable<String>{??????????private?int?i;????????????????????public?Task(int?i){??????????????this.i?=?i;??????????}????????????@Override??????????public?String?call()?throws?Exception?{??????????????Thread.sleep(10000);??????????????return?Thread.currentThread().getName()?+?"執行完任務:"?+?i;??????????}?????????}????????????public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException{??????????testExecutorCompletionService();??????}????????????private?static?void?testExecutorCompletionService()?throws?InterruptedException,?ExecutionException{??????????int?numThread?=?5;??????????ExecutorService?executor?=?Executors.newFixedThreadPool(numThread);??????????CompletionService<String>?completionService?=?new?ExecutorCompletionService<String>(executor);??????????for(int?i?=?0;i<numThread;i++?){??????????????completionService.submit(new?CompletionServiceTest.Task(i));??????????}??}????????????????????for(int?i?=?0;i<numThread;i++?){???????????????????System.out.println(completionService.take().get());??????????}????????????????}?? ?
ExecutorCompletionService分析:
?CompletionService是Executor和BlockingQueue的結合體。
Java代碼 ?
public?ExecutorCompletionService(Executor?executor)?{??????????if?(executor?==?null)??????????????throw?new?NullPointerException();??????????this.executor?=?executor;??????????this.aes?=?(executor?instanceof?AbstractExecutorService)????????????????(AbstractExecutorService)?executor?:?null;??????????this.completionQueue?=?new?LinkedBlockingQueue<Future<V>>();??????}?? ?任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,
Java代碼 ?
public?Future<V>?submit(Callable<V>?task)?{??????????if?(task?==?null)?throw?new?NullPointerException();??????????RunnableFuture<V>?f?=?newTaskFor(task);??????????executor.execute(new?QueueingFuture(f));??????????return?f;??????}?? ?QueueingFuture是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。
?
Java代碼 ?
private?class?QueueingFuture?extends?FutureTask<Void>?{??????????QueueingFuture(RunnableFuture<V>?task)?{??????????????super(task,?null);??????????????this.task?=?task;??????????}??????????protected?void?done()?{?completionQueue.add(task);?}??????????private?final?Future<V>?task;??????}?? ?而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。
Java代碼 ?
public?Future<V>?take()?throws?InterruptedException?{??????return?completionQueue.take();??}????public?Future<V>?poll()?{??????return?completionQueue.poll();??}?原文:http://xw-z1985.iteye.com/blog/1997077
轉載于:https://www.cnblogs.com/langtianya/p/5013353.html
總結
以上是生活随笔為你收集整理的获取Executor提交的并发执行的任务返回结果的两种方式/ExecutorCompletionService使用...的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。