Java 8:CompletableFuture的权威指南
Java 8即將到來,因此該學習新功能了。 盡管Java 7和Java 6只是次要的發行版,但版本8將向前邁出一大步。 也許太大了? 今天,我將為您詳細介紹JDK 8中的新抽象– CompletableFuture<T> 。 眾所周知,Java 8有望在不到一年的時間內發布,因此本文基于具有lambda支持的JDK 8 build 88 。 CompletableFuture<T>通過提供功能性的單子(!)操作并促進異步的,事件驅動的編程模型(而不是在較早的Java中進行阻塞)來擴展Future<T> 。 如果您打開CompletableFuture<T> JavaDoc,您肯定會不知所措。 大約有五十種方法 (!),其中一些方法非常隱秘和奇特,例如:
public <U,V> CompletableFuture<V> thenCombineAsync(CompletableFuture<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor)別擔心,請繼續閱讀。 CompletableFuture使用SettableFuture收集了番石榴中ListenableFuture所有功能。 此外,內置的lambda支持使它更接近Scala / Akka期貨 。 聽起來好得令人難以置信,但請繼續閱讀。 CompletableFuture有兩個優于Future<T>主要方面-異步回調/轉換支持以及可以在任何時間從任何線程設置CompletableFuture值的功能。
提取/修改包裝值
通常,期貨代表由其他線程運行的一段代碼。 但這并非總是如此。 有時您想創建一個Future表示某個已知事件,例如JMS消息到達 。 因此,您具有Future<Message>但此未來沒有任何異步作業。 您只想在JMS消息到達時完成(解決)將來,而這是由事件驅動的。 在這種情況下,您可以簡單地創建CompletableFuture ,將其返回給客戶端,并且只要您認為結果可用,就可以complete() future并解鎖等待該將來的所有客戶端。
對于初學者,您可以簡單地憑空創建新的CompletableFuture并將其提供給您的客戶:
public CompletableFuture<String> ask() {final CompletableFuture<String> future = new CompletableFuture<>();//...return future; }請注意,此未來與任何Callable<String> ,任何線程池,任何異步作業都沒有關聯。 如果現在客戶端代碼調用ask().get() ,它將永遠阻塞。 如果它注冊了一些完成回調,它們將永遠不會觸發。 那有什么意義呢? 現在您可以說:
future.complete("42")…這時,所有在Future.get()上阻止的客戶端都將獲得結果字符串。 完成回調也會立即觸發。 當您要表示將來的任務時,這非常方便,但不一定要在某個執行線程上運行計算任務。 CompletableFuture.complete()只能被調用一次,后續調用將被忽略。 但是有一個稱為CompletableFuture.obtrudeValue(...)的后門,它將用新值覆蓋Future先前值。 請謹慎使用。
有時您想發出失敗的信號。 如您所知, Future對象可以處理包裝的結果或異常。 如果您想進一步傳遞一些異常,則可以使用CompletableFuture.completeExceptionally(ex) (和obtrudeException(ex)替代以前的異常的邪惡兄弟)。 completeExceptionally()還會解鎖所有正在等待的客戶端,但是這次從get()拋出異常。 說到get() ,還有CompletableFuture.join()方法,在錯誤處理方面有一些細微的變化。 但總的來說,它們是相同的。 最后還有一個CompletableFuture.getNow(valueIfAbsent)方法不會阻塞,但是如果Future還沒有完成,則返回默認值。 在構建我們不想等待太多的強大系統時很有用。
最后一個static實用程序方法為completedFuture(value) ,該方法返回已完成的Future對象。 對于測試或編寫某些適配器層可能很有用。
創建并獲取
好的,那么手動創建CompletableFuture是我們唯一的選擇嗎? 不完全的。 與正常的Future一樣,我們可以使用以下工廠方法系列將現有任務包裝到CompletableFuture :
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);不將Executor作為參數而是以...Async結尾的方法將使用ForkJoinPool.commonPool() (JDK 8中引入的全局通用池)。 這適用于CompletableFuture類中的大多數方法。 runAsync()很容易理解,請注意,它采用Runnable ,因此它返回CompletableFuture<Void>因為Runnable不返回任何內容。 如果需要異步處理某些東西并返回結果,請使用Supplier<U> :
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {//...long running...return "42";} }, executor);但是,嘿,我們在Java 8中有lambda!
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {//...long running...return "42"; }, executor);甚至:
final CompletableFuture<String> future =CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);本文與Lambda項目無關,但是我將廣泛使用Lambda。
轉換并作用于一個
所以我說CompletableFuture優于Future但是您還沒有看到為什么? 簡而言之,這是因為CompletableFuture是一個monad和一個函子。 我幫不上忙嗎? 當將來完成時, Scala和JavaScript都允許注冊異步回調。 我們無需等待就可以準備就緒。 我們可以簡單地說: 在結果到達時運行此函數 。 此外,我們可以堆疊這些函數,將多個Future組合在一起,等等。例如,如果我們有一個從String到Integer的函數,則可以將CompletableFuture<String>為CompletableFuture<Integer而不用拆開它。 這是通過thenApply()系列方法實現的:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);如前所述...Async為CompletableFuture上的大多數操作提供了...Async版本,因此在后續部分中將跳過它們。 只需記住,第一種方法將在將來完成的同一線程中應用函數,而其余兩種將在不同的線程池中異步應用它。
讓我們看看thenApply()工作方式:
CompletableFuture<String> f1 = //... CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt); CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);或在一個陳述中:
CompletableFuture<Double> f3 =f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);您會在此處看到一系列轉換。 從String到Integer ,再到Double 。 但是最重??要的是,這些轉換既不會立即執行也不會阻塞。 只需記住它們,當原始f1完成時便會為您執行。 如果某些轉換很耗時,則可以提供自己的Executor來異步運行它們。 請注意,此操作等效于Scala中的單子map 。
完成時運行代碼(
CompletableFuture<Void> thenAccept(Consumer<? super T> block); CompletableFuture<Void> thenRun(Runnable action);這兩種方法是未來管道中典型的“最終”階段。 它們使您可以在準備就緒時消費未來的價值。 在thenAccept()提供最終值的同時, thenRun執行了甚至無法訪問計算值的Runnable 。 例:
future.thenAcceptAsync(dbl -> log.debug("Result: {}", dbl), executor); log.debug("Continuing"); ...Async變體也可用于兩種方法,具有隱式和顯式執行器。 我對此不夠強調:
thenAccept() / thenRun()方法不會阻塞 (即使沒有顯式executor )。 像對待事件監聽器/處理程序那樣對待它們,將它們附加到將來,并將在將來執行。 即使future甚至還沒有完成, "Continuing"消息也會立即出現。
單個
到目前為止,我們僅討論了計算結果。 但是異常呢? 我們也可以異步處理它們嗎? 當然!
CompletableFuture<String> safe =future.exceptionally(ex -> "We have a problem: " + ex.getMessage());exceptionally()接受一個函數,當原始的future拋出異常時將調用該函數。 然后,我們就有機會通過將此異常轉換為與Future的類型兼容的值來進行恢復。 safe進一步轉換將不再產生異常,而是從提供的函數返回的String 。
一個更靈活的方法是handle() ,它接受一個接收正確結果或異常的函數:
CompletableFuture<Integer> safe = future.handle((ok, ex) -> {if (ok != null) {return Integer.parseInt(ok);} else {log.warn("Problem", ex);return -1;} });總是調用handle() ,結果或異常參數都不為null 。 這是一站式的萬能策略。
將兩個
一個CompletableFuture異步處理很不錯,但是當多個此類期貨以各種方式組合在一起時,它的確顯示了其強大功能。
結合(鏈接)兩個期貨(
有時,您想根據未來的價值運行某些功能(準備就緒時)。 但是此函數也將返回將來。 CompletableFuture應該足夠聰明,以至于與CompletableFuture<CompletableFuture<T>>相對,我們的函數結果現在應該用作頂級將來。 因此, thenCompose()方法等效于Scala中的flatMap :
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);...Async變體也可用。 在下面的示例中,當應用返回CompletableFuture<Double>的calculateRelevance()函數時,請仔細查看thenApply() ( map )和thenCompose() ( flatMap )之間的類型和區別:
CompletableFuture<Document> docFuture = //...CompletableFuture<CompletableFuture<Double>> f =docFuture.thenApply(this::calculateRelevance);CompletableFuture<Double> relevanceFuture =docFuture.thenCompose(this::calculateRelevance);//...private CompletableFuture<Double> calculateRelevance(Document doc) //...thenCompose()是一種必不可少的方法,它允許構建健壯的異步管道,而無需阻塞或等待中間步驟。
轉換兩個期貨的價值(
盡管thenCompose()用于鏈接一個依賴于另一個的期貨,然后當兩個都完成時, thenCombine了兩個獨立的期貨:
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)...Async變體也可用。 假設您有兩個CompletableFuture ,一個加載Customer ,另一個加載最近的Shop 。 它們彼此完全獨立,但是當它們都完成時,您想使用它們的值來計算Route 。 這是一個剝離的示例:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); CompletableFuture<Shop> shopFuture = closestShop(); CompletableFuture<Route> routeFuture =customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));//...private Route findRoute(Customer customer, Shop shop) //...請注意,在Java 8中,您可以使用以下簡單的this::findRoute方法參考來替換(cust, shop) -> findRoute(cust, shop) :
customerFuture.thenCombine(shopFuture, this::findRoute);這樣您就知道了。 我們有customerFuture和shopFuture 。 然后routeFuture將它們包裝起來并“等待”完成。 當它們都準備就緒時,它將運行我們提供的findRoute()結果的函數( findRoute() )。 因此,當兩個基礎的期貨被解析并完成 findRoute()時, routeFuture將完成。
等待
如果不是只希望在完成結果時就通知兩個結果,而不會產生新的CompletableFuture ,而是可以使用thenAcceptBoth() / runAfterBoth()系列方法( ...Async變體也可用)。 它們的工作方式與thenAccept()和thenRun()類似,但是要等待兩個thenRun() ,而不是一個thenRun() :
<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block) CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)想象一下,在上面的示例中,您只是想發送一些事件或立即刷新GUI,而不是生成新的CompletableFuture<Route> 。 這可以通過thenAcceptBoth()輕松實現:
customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {final Route route = findRoute(cust, shop);//refresh GUI with route });我希望我錯了,但也許你們中的一些人在問自己一個問題: 為什么我不能簡單地阻止這兩個期貨交易? 像這兒:
Future<Customer> customerFuture = loadCustomerDetails(123); Future<Shop> shopFuture = closestShop(); findRoute(customerFuture.get(), shopFuture.get());好吧,當然可以。 但是CompletableFuture的全部目的是允許異步的,事件驅動的編程模型,而不是阻塞并急于等待結果。 因此,從功能上講,上面的兩個代碼段是等效的,但后者不必要地占用了一個執行線程。
等待第一個
CompletableFuture API的另一個有趣的部分是能夠等待第一個 (而不是全部 )完成的將來。 當您有兩個任務產生相同類型的結果,而您只關心響應時間,而不關注哪個任務首先產生時,這會很方便。 API方法( ...Async變體也可用):
CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block) CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)例如,您要集成兩個系統。 一個具有較小的平均響應時間但具有較高的標準偏差。 另一個通常較慢,但更可預測。 為了同時兼顧兩個方面(性能和可預測性),您需要同時調用兩個系統,并等待第一個系統完成。 通常它是第一個,但是如果變慢,第二個會在可接受的時間內結束:
CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); fast.acceptEither(predictable, s -> {System.out.println("Result: " + s); });s表示來自fetchFast()或fetchPredictably() String回復。 我們既不知道也不在乎。
改造先完成
applyToEither()是一個大哥哥acceptEither() 當兩個期貨中的較快完成時,后者只是簡單地調用一些代碼,而applyToEither()將返回一個新的期貨。 當兩個基礎期貨中的第一個完成時,該未來將完成。 API有點類似( ...Async版本也可用):
<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)在完成的第一個Future的結果上調用額外的fn函數。 我真的不確定這種專門方法的目的是什么,畢竟可以簡單地使用: fast.applyToEither(predictable).thenApply(fn) 。 由于我們一直使用此API,但實際上并不需要額外的功能應用程序,因此我將僅使用Function.identity()占位符:
CompletableFuture<String> fast = fetchFast(); CompletableFuture<String> predictable = fetchPredictably(); CompletableFuture<String> firstDone =fast.applyToEither(predictable, Function.<String>identity());然后可以傳遞firstDone未來。 注意,從客戶的角度來看,兩個期貨實際上落后于firstDone是隱藏的。 客戶端只是等待將來完成, applyToEither()會在兩者中的任何一個先完成時通知客戶端。
將多個
因此,我們現在知道如何等待兩個期貨完成(使用thenCombine() )和第一個期貨完成( applyToEither() )。 但是它可以擴展到任意數量的期貨嗎? 當然,使用static助手方法:
static CompletableFuture<Void< allOf(CompletableFuture<?<... cfs) static CompletableFuture<Object< anyOf(CompletableFuture<?<... cfs)allOf()接收一組期貨,并返回一個期貨,該期貨在所有基礎期貨都完成時(屏障等待所有)完成。 另一方面, anyOf()將僅等待最快的基礎期貨。 請查看退貨期貨的通用類型。 不太符合您的期望嗎? 我們將在下一篇文章中解決這個問題。
摘要
我們研究了幾乎整個CompletableFuture API 。 我敢肯定,這是不堪重負的,因此在下一篇文章中,我們將很快利用CompletableFuture功能和Java 8 lambda來開發簡單的Web爬網程序的另一種實現。 我們還將研究CompletableFuture缺點和不足。
參考: Java 8: JCG合作伙伴 Tomasz Nurkiewicz在Java和社區博客上提供的CompletableFuture權威指南 。翻譯自: https://www.javacodegeeks.com/2013/05/java-8-definitive-guide-to-completablefuture.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Java 8:CompletableFuture的权威指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分数DRL:在OptaPlanner中更
- 下一篇: 美国失业率公布时间是在什么时候?