CompletableFuture 异步编程
本文從實例出發,介紹?CompletableFuture?基本用法。不過講的再多,不如親自上手練習一下。所以建議各位小伙伴看完,上機練習一把,快速掌握?CompletableFuture。
全文摘要:
-
Future?VS?CompletableFuture
-
CompletableFuture?基本用法
0x00. 前言
一些業務場景我們需要使用多線程異步執行任務,加快任務執行速度。Java 提供?Runnable?Future<V>?兩個接口用來實現異步任務邏輯。
雖然?Future<V>?可以獲取任務執行結果,但是獲取方式十方不變。我們不得不使用Future#get?阻塞調用線程,或者使用輪詢方式判斷?Future#isDone?任務是否結束,再獲取結果。
這兩種處理方式都不是很優雅,JDK8 之前并發類庫沒有提供相關的異步回調實現方式。沒辦法,我們只好借助第三方類庫,如?Guava,擴展?Future,增加支持回調功能。相關代碼如下:
雖然這種方式增強了 Java 異步編程能力,但是還是無法解決多個異步任務需要相互依賴的場景。
舉一個生活上的例子,假如我們需要出去旅游,需要完成三個任務:
-
任務一:訂購航班
-
任務二:訂購酒店
-
任務三:訂購租車服務
很顯然任務一和任務二沒有相關性,可以單獨執行。但是任務三必須等待任務一與任務二結束之后,才能訂購租車服務。
為了使任務三時執行時能獲取到任務一與任務二執行結果,我們還需要借助?CountDownLatch?。
0x01. CompletableFuture
JDK8 之后,Java 新增一個功能十分強大的類:CompletableFuture。單獨使用這個類就可以輕松的完成上面的需求:
大家可以先不用管?CompletableFuture?相關?API,下面將會具體講解。
對比?Future<V>,CompletableFuture?優點在于:
-
不需要手工分配線程,JDK 自動分配
-
代碼語義清晰,異步任務鏈式調用
-
支持編排異步任務
怎么樣,是不是功能很強大?接下來抓穩了,小黑哥要發車了。
?
1.1 方法一覽
首先來通過 IDE 查看下這個類提供的方法:
稍微數一下,這個類總共有 50 多個方法,我的天。。。
?
不過也不要怕,小黑哥幫你們歸納好了,跟著小黑哥的節奏,帶你們掌握?CompletableFuture。
若圖片不清晰,可以關注『程序通事』,回復:『233』,獲取該思維導圖
1.2 創建 CompletableFuture 實例
創建?CompletableFuture?對象實例我們可以使用如下幾個方法:
第一個方法創建一個具有默認結果的?CompletableFuture,這個沒啥好講。我們重點講述下下面四個異步方法。
前兩個方法?runAsync?不支持返回值,而?supplyAsync可以支持返回結果。
這個兩個方法默認將會使用公共的?ForkJoinPool?線程池執行,這個線程池默認線程數是?CPU?的核數。
可以設置 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 來設置 ForkJoinPool 線程池的線程數
使用共享線程池將會有個弊端,一旦有任務被阻塞,將會造成其他任務沒機會執行。所以強烈建議使用后兩個方法,根據任務類型不同,主動創建線程池,進行資源隔離,避免互相干擾。
1.3 設置任務結果
CompletableFuture?提供以下方法,可以主動設置任務結果。
1?boolean?complete(T?value) 2?boolean?completeExceptionally(Throwable?ex)第一個方法,主動設置?CompletableFuture?任務執行結果,若返回?true,表示設置成功。如果返回?false,設置失敗,這是因為任務已經執行結束,已經有了執行結果。
示例代碼如下:
1//?執行異步任務2CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{3??System.out.println("cf?任務執行開始");4??sleep(10,?TimeUnit.SECONDS);5??System.out.println("cf?任務執行結束");6??return?"樓下小黑哥";7});8//9Executors.newSingleThreadScheduledExecutor().execute(()?->?{ 10??sleep(5,?TimeUnit.SECONDS); 11??System.out.println("主動設置?cf?任務結果"); 12??//?設置任務結果,由于?cf?任務未執行結束,結果返回?true 13??cf.complete("程序通事"); 14}); 15//?由于 cf 未執行結束,將會被阻塞。5 秒后,另外一個線程主動設置任務結果 16System.out.println("get:"?+?cf.get()); 17//?等待?cf?任務執行結束 18sleep(10,?TimeUnit.SECONDS); 19//?由于已經設置任務結果,cf?執行結束任務結果將會被拋棄 20System.out.println("get:"?+?cf.get()); 21/*** 22???*?cf?任務執行開始 23???*?主動設置?cf?任務結果 24???* get:程序通事 25???*?cf?任務執行結束 26???* get:程序通事 27*/這里需要注意一點,一旦?complete?設置成功,CompletableFuture?返回結果就不會被更改,即使后續?CompletableFuture?任務執行結束。
第二個方法,給?CompletableFuture?設置異常對象。若設置成功,如果調用?get?等方法獲取結果,將會拋錯。
示例代碼如下:
1//?執行異步任務2CompletableFuture?cf?=?CompletableFuture.supplyAsync(()?->?{3????System.out.println("cf?任務執行開始");4????sleep(10,?TimeUnit.SECONDS);5????System.out.println("cf?任務執行結束");6????return?"樓下小黑哥";7});8//9Executors.newSingleThreadScheduledExecutor().execute(()?->?{ 10????sleep(5,?TimeUnit.SECONDS); 11????System.out.println("主動設置?cf?異常"); 12????//?設置任務結果,由于?cf?任務未執行結束,結果返回?true 13????cf.completeExceptionally(new?RuntimeException("啊,掛了")); 14}); 15//?由于 cf 未執行結束,前 5 秒將會被阻塞。后續程序拋出異常,結束 16System.out.println("get:"?+?cf.get()); 17/*** 18?*?cf?任務執行開始 19?*?主動設置?cf?異常 20?*?java.util.concurrent.ExecutionException:?java.lang.RuntimeException:?啊,掛了 21?*?...... 22?*/1.4 CompletionStage
CompletableFuture?分別實現兩個接口?Future與?CompletionStage。
Future?接口大家都比較熟悉,這里主要講講?CompletionStage。
CompletableFuture?大部分方法來自CompletionStage?接口,正是因為這個接口,CompletableFuture才有如此強大功能。
想要理解?CompletionStage?接口,我們需要先了解任務的時序關系的。我們可以將任務時序關系分為以下幾種:
-
串行執行關系
-
并行執行關系
-
AND 匯聚關系
-
OR 匯聚關系
1.5 串行執行關系
任務串行執行,下一個任務必須等待上一個任務完成才可以繼續執行。
CompletionStage?有四組接口可以描述串行這種關系,分別為:
thenApply?方法需要傳入核心參數為?Function<T,R>類型。這個類核心方法為:
1?R?apply(T?t)所以這個接口將會把上一個任務返回結果當做入參,執行結束將會返回結果。
thenAccept?方法需要傳入參數對象為?Consumer<T>類型,這個類核心方法為:
1void?accept(T?t)返回值?void?可以看出,這個方法不支持返回結果,但是需要將上一個任務執行結果當做參數傳入。
thenRun?方法需要傳入參數對象為?Runnable?類型,這個類大家應該都比較熟悉,核心方法既不支持傳入參數,也不會返回執行結果。
thenCompose?方法作用與?thenApply?一樣,只不過?thenCompose?需要返回新的 ?CompletionStage。這么理解比較抽象,可以集合代碼一起理解。
方法中帶有?Async?,代表可以異步執行,這個系列還有重載方法,可以傳入自定義的線程池,上圖未展示,讀者只可以自行查看 API。
最后我們通過代碼展示?thenApply?使用方式:
1CompletableFuture<String>?cf 2????????=?CompletableFuture.supplyAsync(()?->?"hello,樓下小黑哥")//?1 3????????.thenApply(s?->?s?+?"@程序通事")?//?2 4????????.thenApply(String::toUpperCase);?//?3 5System.out.println(cf.join()); 6//?輸出結果?HELLO,樓下小黑哥@程序通事這段代碼比較簡單,首先我們開啟一個異步任務,接著串行執行后續兩個任務。任務 2 需要等待任務1 執行完成,任務 3 需要等待任務 2。
上面方法,大家需要記住了 ?Function<T,R>,Consumer<T>,Runnable?三者區別,根據場景選擇使用。
1.6 AND 匯聚關系
AND 匯聚關系代表所有任務完成之后,才能進行下一個任務。
如上所示,只有任務 A 與任務 B 都完成之后,任務 C 才會開始執行。
CompletionStage?有以下接口描述這種關系。
thenCombine?方法核心參數?BiFunction?,作用與?Function一樣,只不過?BiFunction?可以接受兩個參數,而?Function?只能接受一個參數。
thenAcceptBoth?方法核心參數BiConsumer?作用也與?Consumer一樣,不過其需要接受兩個參數。
runAfterBoth??方法核心參數最簡單,上面已經介紹過,不再介紹。
這三組方法只能完成兩個任務 AND 匯聚關系,如果需要完成多個任務匯聚關系,需要使用?CompletableFuture#allOf,不過這里需要注意,這個方法是不支持返回任務結果。
AND 匯聚關系相關示例代碼,開頭已經使用過了,這里再粘貼一下,方便大家理解:
1.7 OR 匯聚關系
有 AND 匯聚關系,當然也存在 OR 匯聚關系。OR 匯聚關系代表只要多個任務中任一任務完成,就可以接著接著執行下一任務。
CompletionStage?有以下接口描述這種關系:
前面三組接口方法傳參與 AND 匯聚關系一致,這里也不再詳細解釋了。
當然 OR 匯聚關系可以使用?CompletableFuture#anyOf?執行多個任務。
下面示例代碼展示如何使用?applyToEither?完成 OR 關系。
1CompletableFuture<String>?cf2????????=?CompletableFuture.supplyAsync(()?->?{3????sleep(5,?TimeUnit.SECONDS);4????return?"hello,樓下小黑哥";5});//?167CompletableFuture<String>?cf2?=?cf.supplyAsync(()?->?{8????sleep(3,?TimeUnit.SECONDS);9????return?"hello,程序通事"; 10}); 11//?執行?OR?關系 12CompletableFuture<String>?cf3?=?cf2.applyToEither(cf,?s?->?s); 13 14//?輸出結果,由于?cf2?只休眠?3?秒,優先執行完畢 15System.out.println(cf2.join()); 16//?結果:hello,程序通事1.8 異常處理
CompletableFuture??方法執行過程若產生異常,當調用?get,join獲取任務結果才會拋出異常。
上面代碼我們顯示使用?try..catch?處理上面的異常。不過這種方式不太優雅,CompletionStage?提供幾個方法,可以優雅處理異常。
exceptionally?使用方式類似于?try..catch?中?catch代碼塊中異常處理。
whenComplete?與?handle?方法就類似于?try..catch..finanlly?中?finally?代碼塊。無論是否發生異常,都將會執行的。這兩個方法區別在于 ?handle??支持返回結果。
下面示例代碼展示?handle?用法:
1CompletableFuture<Integer>2????????f0?=?CompletableFuture.supplyAsync(()?->?(7?/?0))3????????.thenApply(r?->?r?*?10)4????????.handle((integer,?throwable)?->?{5????????????//?如果異常存在,打印異常,并且返回默認值6????????????if?(throwable?!=?null)?{7????????????????throwable.printStackTrace();8????????????????return?0;9????????????}?else?{ 10????????????????//?如果 11????????????????return?integer; 12????????????} 13????????}); 14 15 16System.out.println(f0.join()); 17/** 18?*java.util.concurrent.CompletionException:?java.lang.ArithmeticException:?/?by?zero 19?*?..... 20?*? 21?*?0 22?*/0x02. 總結
JDK8 提供?CompletableFuture?功能非常強大,可以編排異步任務,完成串行執行,并行執行,AND 匯聚關系,OR 匯聚關系。
不過這個類方法實在太多,且方法還需要傳入各種函數式接口,新手剛開始使用會直接會被弄懵逼。這里幫大家在總結一下三類核心參數的作用
-
Function?這類函數接口既支持接收參數,也支持返回值
-
Consumer?這類接口函數只支持接受參數,不支持返回值
-
Runnable?這類接口不支持接受參數,也不支持返回值
搞清楚函數參數作用以后,然后根據串行,AND 匯聚關系,OR 匯聚關系歸納一下相關方法,這樣就比較好理解了
最后再貼一下,文章開頭的思維導圖,希望對你有幫助。
0x03. 幫助文檔
極客時間-并發編程專欄
https://colobu.com/2016/02/29/Java-CompletableFuture
https://www.ibm.com/developerworks/cn/java/j-cf-of-jdk8/index.html
最后說一句(求關注)
CompletableFuture?很早之前就有關注,本以為跟?Future一樣,使用挺簡單,誰知道學的時候才發現好難。各種 API 方法看的頭有點大。
后來看到極客時間-『并發編程』專欄使用歸納方式分類?CompletableFuture?各種方法,一下子就看懂了。所以這篇文章也參考這種歸納方式。
總結
以上是生活随笔為你收集整理的CompletableFuture 异步编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑常见毛病及治理
- 下一篇: linux的grub损坏,如何利用Gru