ExecutorCompletionService分析及使用
生活随笔
收集整理的這篇文章主要介紹了
ExecutorCompletionService分析及使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
當我們通過Executor提交一組并發執行的任務,并且希望在每一個任務完成后能立即得到結果,有兩種方式可以采取:
方式一:
通過一個list來保存一組future,然后在循環中輪訓這組future,直到每個future都已完成。如果我們不希望出現因為排在前面的任務阻塞導致后面先完成的任務的結果沒有及時獲取的情況,那么在調用get方式時,需要將超時時間設置為0?
1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "執行完任務:" + i; 14 } 15 } 16 17 public static void main(String[] args) { 18 testUseFuture(); 19 } 20 21 private static void testUseFuture() { 22 int numThread = 5; 23 ExecutorService executor = Executors.newFixedThreadPool(numThread); 24 List<Future<String>> futureList = new ArrayList<Future<String>>(); 25 for (int i = 0; i < numThread; i++) { 26 Future<String> future = executor 27 .submit(new ExecutorCompletionServiceTest.Task(i)); 28 futureList.add(future); 29 } 30 31 while (numThread > 0) { 32 for (Future<String> future : futureList) { 33 String result = null; 34 try { 35 result = future.get(0, TimeUnit.SECONDS); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } catch (ExecutionException e) { 39 e.printStackTrace(); 40 } catch (TimeoutException e) { 41 // 超時異常直接忽略 42 } 43 if (null != result) { 44 futureList.remove(future); 45 numThread--; 46 System.out.println(result); 47 // 此處必須break,否則會拋出并發修改異常。(也可以通過將futureList聲明為CopyOnWriteArrayList類型解決) 48 break; 49 } 50 } 51 } 52 } 53 }?方式二:
第一種方式顯得比較繁瑣,通過使用ExecutorCompletionService,則可以達到代碼最簡化的效果。
1 public class ExecutorCompletionServiceTest { 2 3 static class Task implements Callable<String> { 4 private int i; 5 6 public Task(int i) { 7 this.i = i; 8 } 9 10 @Override 11 public String call() throws Exception { 12 Thread.sleep(10000); 13 return Thread.currentThread().getName() + "執行完任務:" + i; 14 } 15 } 16 17 public static void main(String[] args) throws InterruptedException, ExecutionException { 18 testExecutorCompletionService(); 19 } 20 21 private static void testExecutorCompletionService() throws InterruptedException, ExecutionException{ 22 int numThread = 5; 23 ExecutorService executorService = Executors.newFixedThreadPool(numThread); 24 CompletionService<String> completionService = new ExecutorCompletionService<>(executorService); 25 for (int i = 0; i < numThread; i++) { 26 completionService.submit(new ExecutorCompletionServiceTest.Task(i)); 27 } 28 for (int i = 0; i < numThread; i++) { 29 System.out.println(completionService.take().get()); 30 } 31 executorService.shutdown(); 32 } 33 }ExecutorCompletionService實現了CompletionService接口,CompletionService是Executor和BlockingQueue的結合體。可以看下構造函數
1 public ExecutorCompletionService(Executor executor) { 2 if (executor == null) 3 throw new NullPointerException(); 4 this.executor = executor; 5 this.aes = (executor instanceof AbstractExecutorService) ? 6 (AbstractExecutorService) executor : null; 7 this.completionQueue = new LinkedBlockingQueue<Future<V>>(); 8 }任務的提交和執行都是委托給Executor來完成。當提交某個任務時,該任務首先將被包裝為一個QueueingFuture,
1 public Future<V> submit(Callable<V> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<V> f = newTaskFor(task); 4 executor.execute(new QueueingFuture(f)); 5 return f; 6 }QueueingFuture是FutureTask的一個子類,通過改寫該子類的done方法,可以實現當任務完成時,將結果放入到BlockingQueue中。
1 private class QueueingFuture extends FutureTask<Void> { 2 QueueingFuture(RunnableFuture<V> task) { 3 super(task, null); 4 this.task = task; 5 } 6 protected void done() { completionQueue.add(task); } 7 private final Future<V> task; 8 }而通過使用BlockingQueue的take或poll方法,則可以得到結果。在BlockingQueue不存在元素時,這兩個操作會阻塞,一旦有結果加入,則立即返回。
1 public Future<V> take() throws InterruptedException { 2 return completionQueue.take(); 3 } 4 5 public Future<V> poll() { 6 return completionQueue.poll(); 7 }?
總結
以上是生活随笔為你收集整理的ExecutorCompletionService分析及使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Axure制作App原型的尺寸设置
- 下一篇: js模块化和使用