Java高并发编程:Callable、Future和FutureTask
1. Callable
泛型接口,用于獲取線程執行完的結果,Callable的聲明如下
public interface Callable<V> {// 返回 V 類型的結果V call() throws Exception; }Callable 接口類似于Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,并且無法拋出經過檢查的異常,而Callable是可返回結果并且可以拋出異常的任務
public class MyCallable implements Callable{private int number;public MyCallable(int number) {this.number = number;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int x = 1; x <= number; x++) {sum += x;}return sum;} }public class CallableDemo {ExecutorService pool = Executors.newFixedThreadPool(2);Future<Integer> future = pool.submit(new SumCallable(10));Integer sum = future.get();pool.shutdown();class SumCallable implements Callable<Integer> {private int number;public SumCallable(int number){this.number = number;}@Overridepublic Integer call() throws Exception {int sum = 0;for (int i=0; i<number; i++){sum += i;}return sum;}} }2. Future
Future 為線程池提供了一個可管理的任務標準,它提供了對Runnable或Callable任務的執行結果進行取消、查詢是否完成、獲取結果、設置結果操作
Future 接口表示異步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,并獲取計算的結果。計算完成后只能使用get()方法來獲取結果,如有必要,計算完成前可以阻塞此方法
Future取得的結果類型和Callable返回的結果類型必須一致,這是通過泛型來實現的。Callable要采用ExecutorService的submit()方法提交,返回的future對象可以取消任務。Future聲明如下
public interface Future<V> {boolean cancel(boolean var1);// 該任務是否已經取消boolean isCancelled();// 判斷是否已經完成boolean isDone();// 獲取結果,該方法會阻塞V get() throws InterruptedException, ExecutionException;// 獲取結果,如果還未完成那么等待,直到timeout或返回結果,該方法會阻塞V get(long var1, TimeUnit var3) throws InterruptedException, ExecutionException, TimeoutException; }Future的簡單使用
// 創建線程池對象 ExecutorService pool = Executors.newFixedThreadPool(2); // 可以執行Runnable對象或者Callable對象代表的線程 Future<Integer> future = pool.submit(new MyCallable(100)); Integer i1 = future.get(); pool.shutdown();Future常用方法
| cancel() | 取消任務 |
| isCanceled() | 任務是否取消 |
| isDone() | 任務是否完成 |
| get() | 獲取結果,get()方法會阻塞,直到任務返回結果 |
| set() | 設置結果 |
3. FutureTask
Future只是定義了一些接口規范,而FutureTask則是它的實現類
public class FutureTask<V> implements RunnableFuture<V>{// 代碼省略 }public interface RunnableFuture<V> extends Runnable, Future<V>{void run(); }FutureTask會像Thread包裝Runnable那樣對Runnable和Callable進行包裝,Runnable和Callable由構造函數注入
public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW; // ensure visibility of callable }由于FutureTask實現了Runnable,因此,它既可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行,并且還可以直接通過get()方法獲取執行結果,該方法會阻塞,直到結果返回。因此,FutureTask既是Future、Runnable,又是包裝了Callable,它是兩者的合體
public class FutureDemo {// 線程池static ExecutorService mExecutor = Executors.newSingleThreadExecutor();// main函數public static void main(String[] args) {try {futureWithRunnable();futureWithCallable();futureTask();} catch (Exception e) {}}/*** 其中Runnable實現的是void run()方法,無返回值;Callable實現的是 V* call()方法,并且可以返回執行結果。其中Runnable可以提交給Thread來包裝下* ,直接啟動一個線程來執行,而Callable則一般都是提交給ExecuteService來執行。*/private static void futureWithRunnable() throws InterruptedException, ExecutionException {// 提交runnable則沒有返回值, future沒有數據Future<?> result = mExecutor.submit(new Runnable() {@Overridepublic void run() {fibc(20);}});System.out.println("future result from runnable : " + result.get());}private static void futureWithCallable() throws InterruptedException, ExecutionException {/*** 提交Callable, 有返回值, future中能夠獲取返回值*/Future<Integer> result2 = mExecutor.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return fibc(20);}});System.out.println("future result from callable : "+ result2.get());}private static void futureTask() throws InterruptedException, ExecutionException {/*** FutureTask則是一個RunnableFuture<V>,即實現了Runnbale又實現了Futrue<V>這兩個接口,* 另外它還可以包裝Runnable(實際上會轉換為Callable)和Callable* <V>,所以一般來講是一個符合體了,它可以通過Thread包裝來直接執行,也可以提交給ExecuteService來執行* ,并且還可以通過v get()返回執行結果,在線程體沒有執行完成的時候,主線程一直阻塞等待,執行完則直接返回結果。*/FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {return fibc(20);}});// 提交futureTaskmExecutor.submit(futureTask);System.out.println("future result from futureTask : "+ futureTask.get());}// 效率底下的斐波那契數列, 耗時的操作private static int fibc(int num) {if (num == 0) {return 0;}if (num == 1) {return 1;}return fibc(num - 1) + fibc(num - 2);} }輸出結果
future result from runnable : null future result from Callable : 6765 future result from futureTask : 67654. CompletionService
CompletionService用于提交一組Callable任務,其take()方法返回已完成的一個Callable任務對應的Future對象。
示例:這里數據的獲取好比同時種了好幾塊地的麥子,然后等待收割,秋收時,哪塊先熟,先收割哪塊
import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; public class CallableAndFuture { public static void main(String[] args){ //創建一個單獨的線程 ExecutorService threadPool = Executors.newSingleThreadExecutor(); //future泛型與Callable的類型一致 Future<String> future = threadPool.submit(new Callable<String>(){ @Override public String call() throws Exception { Thread.sleep(3000); return "hello"; } }); System.out.println("等待結果……"); //在指定時timeout內等待,未等到拋出TimeoutException //System.out.println("拿到結果:" + future.get(long timeout,TimeUnit unit)); try { System.out.println("拿到結果:" + future.get()); //獲取線程執行后的結果 } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } //CompletionService用于提交一組Callable任務, //其take方法返回已完成的一個Callable任務對應的Future對象。 ExecutorService threadPool2 = Executors.newFixedThreadPool(10); CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2); //創建10任務 for(int i=1;i<=10;i++){ final int seq = i; //將任務提交給線程池 completionService.submit(new Callable<Integer>(){ @Override public Integer call() throws Exception { Thread.sleep(new Random().nextInt(5000)); return seq; } }); } //獲取結果,哪個結果先返回就先獲得 for(int i=0;i<10;i++){ try { System.out.println(completionService.take().get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }輸出結果
等待結果…… 拿到結果:hello 3 6 10 2 4 8 5 1 7 9總結
以上是生活随笔為你收集整理的Java高并发编程:Callable、Future和FutureTask的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java高并发编程:HandlerThr
- 下一篇: Java高并发编程:原子类