CompletableFuture 实现异步计算
在Markdown的語法中,<u>下劃線</u>中的文字會被解析器加上下劃線,為了不影響閱讀,本文中JDK文檔涉及到<U>都會替換為<N>,請各位注意。
概述
Java 1.8 新增加的 CompletableFuture 類內部是使用 ForkJoinPool 來實現的,CompletableFuture 實現了 Future接口 和 CompletionStage接口。
在之前的Future的介紹和基本用法一文中,我們了解到 Future 表示異步計算的結果。
CompletionStage 代表了一個特定的計算的階段,可以同步或者異步的被完成。
CompletionStage 可以被看作一個計算流水線上的一個單元,最終會產生一個最終結果,這意味著幾個 CompletionStage 可以串聯起來,一個完成的階段可以觸發下一階段的執行,接著觸發下一次,直到完成所有階段。
Future的弊端
Future是Java 5添加的類,用來描述一個異步計算的結果。你可以使用isDone方法檢查計算是否完成,或者使用get阻塞住調用線程,直到計算完成返回結果,你也可以使用cancel方法停止任務的執行。
雖然Future以及相關使用方法提供了異步執行任務的能力,但是對于結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的CPU資源,而且也不能及時地得到計算結果。
package net.ijiangtao.tech.concurrent.jsd.future.completable;import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future;/*** java5 future** @author ijiangtao* @create 2019-07-22 9:40**/ public class Java5Future {public static void main(String[] args) throws ExecutionException, InterruptedException {//通過 while 循環等待異步計算處理成功ExecutorService pool = Executors.newFixedThreadPool(10);Future<Integer> f1 = pool.submit(() -> {// 長時間的異步計算 ……Thread.sleep(1);// 然后返回結果return 1001;});while (!f1.isDone())System.out.println("is not done yet");;System.out.println("while isDone,result=" + f1.get());//通過阻塞的方式等待異步處理成功Future<Integer> f2 = pool.submit(() -> {// 長時間的異步計算 ……Thread.sleep(1);// 然后返回結果return 1002;});System.out.println("after blocking,result=" + f2.get());}} 復制代碼CompletableFuture的方法
前面我們提到 CompletableFuture 為我們提供了異步計算的實現,而這些實現都是通過它的方法實現的。
如果你打開它的文檔CompletableFuture-Java8Docs,你會發現CompletableFuture提供了將近60個方法。雖然方法很多,如果你仔細看的話,你會這些方法其中很多都是有相似性的。
只要你熟練掌握了這些方法,就能夠得心應手地使用 CompletableFuture 來進行異步計算了。
Java的CompletableFuture類總是遵循這樣的原則:
- 方法名不以Async結尾的方法由原來的線程計算
- 方法名以Async結尾且參數中沒有Executor的方法由默認的線程池ForkJoinPool.commonPool()計算
- 方法名以Async結尾且參數中有Executor的方法由指定的線程池Executor計算
下面不再一一贅述了。
supplyAsync ... 異步計算結果
| static CompletableFuture | supplyAsync(Supplier supplier) | Returns a new CompletableFuture that is asynchronously completed by a task running in the ForkJoinPool.commonPool() with the value obtained by calling the given Supplier. |
| static CompletableFuture | supplyAsync(Supplier supplier, Executor executor) | Returns a new CompletableFuture that is asynchronously completed by a task running in the given executor with the value obtained by calling the given Supplier. |
supplyAsync方法以Supplier函數式接口類型為參數,CompletableFuture的計算結果類型為U。因為該方法的參數類型都是函數式接口,所以可以使用lambda表達式實現異步任務。后面講解其他方法的時候,會舉例子。
runAsync方法也好理解,它以Runnable函數式接口類型為參數,所以CompletableFuture的計算結果也為空(Runnable的run方法返回值為空)。這里就不再一一介紹了,感興趣的小伙伴可以查看API文檔。
get ... 阻塞結果
| T | get() | Waits if necessary for this future to complete, and then returns its result. |
| T | get(long timeout, TimeUnit unit) | Waits if necessary for at most the given time for this future to complete, and then returns its result, if available. |
| T | getNow(T valueIfAbsent) | Returns the result value (or throws any encountered exception) if completed, else returns the given valueIfAbsent. |
| T | join() | Returns the result value when complete, or throws an (unchecked) exception if completed exceptionally. |
你可以像使用Future一樣使用CompletableFuture,來進行阻塞式的計算(雖然不推薦使用)。其中getNow方法比較特殊:結果已經計算完則返回結果或者拋出異常,否則返回給定的valueIfAbsent值。
join和get方法都可以阻塞到計算完成然后獲得返回結果,但兩者的處理流程有所不同,可以參考下面一段代碼來比較兩者處理異常的不同之處:
public static void main(String[] args) {try {new CompletableFutureDemo().test2();new CompletableFutureDemo().test3();} catch (Exception e) {e.printStackTrace();}}public void test2() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;});future.join();}public void test3() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 1 / 0;return 100;});future.get();} 復制代碼輸出如下:
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zeroat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582)at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) Caused by: java.lang.ArithmeticException: / by zeroat net.ijiangtao.tech.concurrent.jsd.future.completable.CompletableFutureDemo.lambda$test2$0(CompletableFutureDemo.java:32)at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)... 5 more 復制代碼thenApply ... 轉換結果
| CompletableFuture | thenApply(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function. |
| CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed using this stage's default asynchronous execution facility, with this stage's result as the argument to the supplied function. |
| CompletableFuture | thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | Returns a new CompletionStage that, when this stage completes normally, is executed using the supplied Executor, with this stage's result as the argument to the supplied function. |
使用CompletableFuture,我們不必因為等待一個計算完成而阻塞著調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個function。而且我們還可以將這些操作串聯起來,或者將CompletableFuture組合起來。
這一組函數的功能是當原來的CompletableFuture計算完后,將結果傳遞給函數fn,將fn的結果作為新的CompletableFuture計算結果。因此它的功能相當于將CompletableFuture 轉換成 CompletableFuture 。
請看下面的例子:
public static void main(String[] args) throws Exception {try {// new CompletableFutureDemo().test1();} catch (Exception e) {e.printStackTrace();}try {//new CompletableFutureDemo().test2();//new CompletableFutureDemo().test3();} catch (Exception e) {e.printStackTrace();}new CompletableFutureDemo().test4();}public void test4() throws Exception {// Create a CompletableFutureCompletableFuture<Integer> calculateFuture = CompletableFuture.supplyAsync(() -> {System.out.println("1");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new IllegalStateException(e);}System.out.println("2");return 1 + 2;});// Attach a callback to the Future using thenApply()CompletableFuture<String> resultFuture = calculateFuture.thenApply(number -> {System.out.println("3");return "1 + 2 is " + number;});// Block and get the result of the future.System.out.println(resultFuture.get());} 復制代碼輸出結果為:
1 2 3 1 + 2 is 3 復制代碼thenAccept ... 純消費結果
| CompletableFuture | thenAccept(Consumer<? super T> action) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied action. |
| CompletableFuture | thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied action. |
篇幅原因,Async和Executor方法不再列舉。
只對結果執行Action,而不返回新的計算值,因此計算值為Void。這就好像生產者生產了消息,消費者消費消息以后不再進行消息的生產一樣,因此thenAccept是對計算結果的純消費。
例如如下方法:
public void test5() throws Exception {// thenAccept() exampleCompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "ijiangtao";}).thenAccept(name -> {System.out.println("Hi, " + name);});System.out.println(future.get()); } 復制代碼thenAccept的get返回為null:
Hi, ijiangtao null 復制代碼thenAcceptBoth可以消費兩者(生產和消費)的結果,下面提供一個例子:
public void test6() throws Exception {CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}return "ijiangtao";}), (s1, s2) -> {System.out.println(s1 + " " + s2);});while (true){} } 復制代碼輸出如下:
hello ijiangtao 復制代碼thenCombine ... 消費結果并返回
| <U,V> CompletableFuture | thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage both complete normally, is executed with the two results as arguments to the supplied function. |
| <U,V> CompletableFuture | thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | Returns a new CompletionStage that, when this and the other given stage complete normally, is executed using this stage's default asynchronous execution facility, with the two results as arguments to the supplied function. |
從功能上來講, thenCombine的功能更類似thenAcceptBoth,只不過thenAcceptBoth是純消費,它的函數參數沒有返回值,而thenCombine的函數參數fn有返回值。
public void test7() throws Exception {CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {return 1+2;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {return "1+2 is";});CompletableFuture<String> f = future1.thenCombine(future2, (x, y) -> y + " " + x);System.out.println(f.get()); // 輸出:1+2 is 3 } 復制代碼thenCompose ... 非嵌套整合
| CompletableFuture | thenCompose(Function<? super T,? extends CompletionStage> fn) | Returns a new CompletionStage that, when this stage completes normally, is executed with this stage as the argument to the supplied function. |
由于篇幅原因,Async和Executor方法不再列舉。
thenCompose方法接受一個Function作為參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture。
假如你需要將兩個CompletableFutures相互整合,如果使用thenApply,則結果會是嵌套的CompletableFuture:
CompletableFuture<String> getUsersName(Long id) {return CompletableFuture.supplyAsync(() -> {return "ijiangtao";}); }CompletableFuture<Integer> getUserAge(String userName) {return CompletableFuture.supplyAsync(() -> {return 20;}); }public void test8(Long id) throws Exception {CompletableFuture<CompletableFuture<Integer>> result1 = getUsersName(id).thenApply(userName -> getUserAge(userName)); } 復制代碼這時候可以使用thenCompose來獲得第二個計算的CompletableFuture:
public void test9(Long id) throws Exception {CompletableFuture<Integer> result2 = getUsersName(id).thenCompose(userName -> getUserAge(userName)); } 復制代碼whenComplete ... 完成以后
| CompletableFuture | whenComplete(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action when this stage completes. |
| CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using this stage's default asynchronous execution facility when this stage completes. |
| CompletableFuture | whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | Returns a new CompletionStage with the same result or exception as this stage, that executes the given action using the supplied Executor when this stage completes. |
當CompletableFuture的計算結果完成,或者拋出異常的時候,我們可以執行特定的Action。whenComplete的參數Action的類型是BiConsumer<? super T,? super Throwable>,它可以處理正常的計算結果,或者異常情況。注意這幾個方法都會返回CompletableFuture,當Action執行完畢后它的結果返回原始的CompletableFuture的計算結果或者返回異常。
whenComplete方法不以Async結尾,意味著Action使用相同的線程執行,而以Async結尾的方法可能會使用其它的線程去執行(如果使用相同的線程池,也可能會被同一個線程選中執行)。
下面演示一下異常情況:
public void test10() throws Exception {String result = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}if (1 == 1) {throw new RuntimeException("an RuntimeException");}return "s1";}).whenComplete((s, t) -> {System.out.println("whenComplete s:"+s);System.out.println("whenComplete exception:"+t.getMessage());}).exceptionally(e -> {System.out.println("exceptionally exception:"+e.getMessage());return "hello ijiangtao";}).join();System.out.println(result); } 復制代碼輸出:
whenComplete s:null whenComplete exception:java.lang.RuntimeException: an RuntimeException exceptionally exception:java.lang.RuntimeException: an RuntimeException hello ijiangtao 復制代碼總結
Java5新增的Future類,可以實現阻塞式的異步計算,但這種阻塞的方式顯然和我們的異步編程的初衷相違背。為了解決這個問題,JDK吸收了Guava的設計思想,加入了Future的諸多擴展功能形成了CompletableFuture。
本文重點介紹了CompletableFuture的不同類型的API,掌握了這些API對于使用非阻塞的函數式異步編程進行日常開發非常有幫助,同時也為下面深入了解異步編程的各種原理和特性打下了良好的基礎。
相關資源
CompletableFuture - javase 8 docs
CompletableFuture - Guide To CompletableFuture
CompletableFuture - Java CompletableFuture Tutorial with Examples
CompletableFuture - Java 8: Writing asynchronous code with CompletableFuture
《寫給大忙人的JavaSE9核心技術》- 10.2 異步計算
CompletableFuture - 通過實例理解 JDK8 的 CompletableFuture
CompletableFuture - CompletableFuture 詳解
CompletableFuture - 使用 Java 8 的 CompletableFuture 實現函數式的回調
CompletableFuture - Java CompletableFuture 詳解
CompletableFuture - [譯]20個使用 Java CompletableFuture的例子
轉載于:https://juejin.im/post/5d35d6ebe51d45109b01b270
總結
以上是生活随笔為你收集整理的CompletableFuture 实现异步计算的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CMS 源码解读
- 下一篇: 从生产故障解锁RocketMQ集群部署的