Future 和 ExecutorCompletionService 对比和使用
附加:Java 4種線程池介紹請查看?
談談new Thread的弊端及Java四種線程池的使用
當我們通過Executor提交一組并發執行的任務,并且希望在每一個任務完成后能立即得到結果,有兩種方式可以采取:
?
方式一:
通過一個list來保存一組future,然后在循環中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,那么在調用get方式時,需要將超時時間設置為0
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args){ testUseFuture(); } private static void testUseFuture(){ int numThread = 5; ExecutorService executor = Executors.newFixedThreadPool(numThread); List<Future<String>> futureList = new ArrayList<Future<String>>(); for(int i = 0;i<numThread;i++ ){ Future<String> future = executor.submit(new CompletionServiceTest.Task(i)); futureList.add(future); } while(numThread > 0){ for(Future<String> future : futureList){ String result = null; try { result = future.get(0, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { //超時異常直接忽略//future.cancel(true);//超時設置任務取消} if(null != result){ futureList.remove(future); numThread--; System.out.println(result); //此處必須break,否則會拋出并發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決) break; } } } } }
?
?方式二:
第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到代碼最簡化的效果。
public class CompletionServiceTest { static class Task implements Callable<String>{ private int i; public Task(int i){ this.i = i; } @Override public String call() throws Exception { Thread.sleep(10000); return Thread.currentThread().getName() + "執行完任務:" + i; } } public static void main(String[] args) throws InterruptedException, ExecutionException{ testExecutorCompletionService(); } private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ int numThread = 3; ExecutorService executor = Executors.newFixedThreadPool(numThread); CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); for(int i = 0;i<numThread;i++ ){ completionService.submit(new CompletionServiceTest.Task(i)); } } for(int i = 0;i<numThread;i++ ){ System.out.println(completionService.take().get()); //獲取執行結果} }?
ExecutorCompletionService分析:
?CompletionService是Executor和BlockingQueue的結合體。
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue<Future<V>>(); }?
任務的提交和執行都是委托給Executor來完成。在構造函數中創建一個BlockingQueue來保存計算完成的結果,當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,
public Future<V> submit(Callable<V> task) { if (task == null) throw new NullPointerException(); RunnableFuture<V> f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; }?QueueingFuture,這是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。
??
private class QueueingFuture extends FutureTask<Void> {
? ? ? ? QueueingFuture(RunnableFuture<V> task) {
? ? ? ? ? ? super(task, null);
? ? ? ? ? ? this.task = task;
? ? ? ? }
? ? ? ? protected void done() { completionQueue.add(task); }
? ? ? ? private final Future<V> task;
? ? }
?
?
?而通過使用BlockingQueue的take(阻塞獲取)或poll(非阻塞獲取)方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。
附加知識點:
take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到Blocking有新的對象被加入為止;
poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,取不到時返回nul
public Future<V> take() throws InterruptedException { return completionQueue.take(); } public Future<V> poll() { return completionQueue.poll(); }?
轉載于:https://www.cnblogs.com/cnmenglang/p/6273401.html
總結
以上是生活随笔為你收集整理的Future 和 ExecutorCompletionService 对比和使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一些零碎知识
- 下一篇: Prim算法的3个版本