Java中如何使用非阻塞异步编程——CompletableFuture
分享一波:程序員賺外快-必看的巔峰干貨
對于Node開發(fā)者來說,非阻塞異步編程是他們引以為傲的地方。而在JDK8中,也引入了非阻塞異步編程的概念。所謂非阻塞異步編程,就是一種不需要等待返回結(jié)果的多線程的回調(diào)方法的封裝。使用非阻塞異步編程,可以很大程度上解決高并發(fā)場景的各種問題,提高程序的運(yùn)行效率。
為什么要使用非阻塞異步編程
在jdk8之前,我們使用java的多線程編程,一般是通過Runnable中的run方法進(jìn)行的。這種方法有個(gè)明顯的缺點(diǎn):沒有返回值。這時(shí)候,大家會使用Callable+Future的方式去實(shí)現(xiàn),代碼如下。
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future stringFuture = executor.submit(new Callable() {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return “async thread”;
}
});
Thread.sleep(1000);
System.out.println(“main thread”);
System.out.println(stringFuture.get());
}
這無疑是對高并發(fā)訪問的一種緩沖方法。這種方式有一個(gè)致命的缺點(diǎn)就是阻塞式調(diào)用,當(dāng)調(diào)用了get方法之后,會有大量的時(shí)間耗費(fèi)在等待返回值之中。
不管怎么看,這種做法貌似都不太妥當(dāng),至少在代碼美觀性上就看起來很蛋疼。而且某些場景無法使用,比如:
多個(gè)異步線程執(zhí)行時(shí)間可能不一致,我們的主線程不能一直等著。 兩個(gè)異步任務(wù)之間相互獨(dú)立,但是第二個(gè)依賴第一個(gè)的執(zhí)行結(jié)果在這種場景下,CompletableFuture的優(yōu)勢就展現(xiàn)出來了 。同時(shí),CompletableFuture的封裝中使用了函數(shù)式編程,這讓我們的代碼顯得更加簡潔、優(yōu)雅。
不了解函數(shù)式編程的朋友,可以參考我之前的博客。JDK8新特性
CompletableFuture使用詳解
runAsync和supplyAsync方法
CompletableFuture提供了四個(gè)靜態(tài)方法來創(chuàng)建一個(gè)異步操作。
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執(zhí)行異步代碼。如果指定線程池,則使用指定的線程池運(yùn)行。以下所有的方法都類同。
runAsync方法不支持返回值。 supplyAsync可以支持返回值。代碼示例
/*** 無返回值** @throws Exception*/ @Test public void testRunAsync() throws Exception {CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (Exception ignored) {}System.out.println("run end ...");});future.get(); }/*** 有返回值** @throws Exception*/ @Test public void testSupplyAsync() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {System.out.println("run end...");return System.currentTimeMillis();});Long time = future.get();System.out.println(time); }計(jì)算結(jié)果完成時(shí)的回調(diào)方法
當(dāng)CompletableFuture的計(jì)算結(jié)果完成,或者拋出異常的時(shí)候,可以執(zhí)行特定的操作。
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
這里需要說的一點(diǎn)是,whenComplete和whenCompleteAsync的區(qū)別。
whenComplete:使用執(zhí)行當(dāng)前任務(wù)的線程繼續(xù)執(zhí)行whenComplete的任務(wù)。 whenCompleteAsync:使用新的線程執(zhí)行任務(wù)。 exceptionally:執(zhí)行出現(xiàn)異常時(shí),走這個(gè)方法。代碼示例
/*** 當(dāng)CompletableFuture的計(jì)算結(jié)果完成,或者拋出異常的時(shí)候,可以執(zhí)行特定的Action。* whenComplete:是執(zhí)行當(dāng)前任務(wù)的線程執(zhí)行繼續(xù)執(zhí)行 whenComplete 的任務(wù)。* whenCompleteAsync:是執(zhí)行把 whenCompleteAsync 這個(gè)任務(wù)繼續(xù)提交給線程池來進(jìn)行執(zhí)行。* exceptionally:執(zhí)行出現(xiàn)異常時(shí),走這個(gè)方法** @throws Exception*/ @Test public void testWhenComplete() throws Exception {CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("運(yùn)行結(jié)束");}).whenComplete((t, action) -> {System.out.println("執(zhí)行完成");}).exceptionally(t -> {System.out.println("出現(xiàn)異常:" + t.getMessage());return null;});TimeUnit.SECONDS.sleep(2); }thenApply
當(dāng)一個(gè)線程依賴另一個(gè)線程時(shí),可以使用thenApply方法把這兩個(gè)線程串行化,第二個(gè)任務(wù)依賴第一個(gè)任務(wù)的返回值。
代碼示例
/*** 當(dāng)一個(gè)線程依賴另一個(gè)線程時(shí),可以使用 thenApply 方法來把這兩個(gè)線程串行化。* 第二個(gè)任務(wù)依賴第一個(gè)任務(wù)的結(jié)果** @throws Exception*/ @Test public void testThenApply() throws Exception {CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {long result = new Random().nextInt();System.out.println("result:" + result);return result;}).thenApply(t -> {long result = t * 5;System.out.println("result2:" + result);return result;});Long result = future.get();System.out.println(result); }handle
handle是執(zhí)行任務(wù)完成時(shí)對結(jié)果的處理。與thenApply方法處理方式基本一致,
不同的是,handle是在任務(wù)完成后執(zhí)行,不管這個(gè)任務(wù)是否出現(xiàn)了異常,而thenApply只可以執(zhí)行正常的任務(wù),任務(wù)出現(xiàn)了異常則不執(zhí)行。
代碼示例
/*** handle 是執(zhí)行任務(wù)完成時(shí)對結(jié)果的處理。* handle 方法和 thenApply 方法處理方式基本一樣。* 不同的是 handle 是在任務(wù)完成后再執(zhí)行,還可以處理異常的任務(wù)。* thenApply 只可以執(zhí)行正常的任務(wù),任務(wù)出現(xiàn)異常則不執(zhí)行 thenApply 方法。** @throws Exception*/ @Test public void testHandle() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int i = 10 / 0;return i;}).handle((p, t) -> {int result = -1;if (t == null) {result = p * 2;} else {System.out.println(t.getMessage());}return result;});System.out.println(future.get()); }thenAccept
thenAccept用于接收任務(wù)的處理結(jié)果,并消費(fèi)處理,無返回結(jié)果。
代碼示例
/*** 接收任務(wù)的處理結(jié)果,并消費(fèi)處理,無返回結(jié)果。** @throws Exception*/ @Test public void testThenAccept() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return new Random().nextInt();}).thenAccept(num -> {System.out.println(num);});System.out.println(future.get()); }thenRun
上個(gè)任務(wù)執(zhí)行完之后再執(zhí)行thenRun的任務(wù),二者只存在先后執(zhí)行順序的關(guān)系,后者并不依賴前者的計(jì)算結(jié)果,同時(shí),沒有返回值。
代碼示例
/*** 該方法同 thenAccept 方法類似。不同的是上個(gè)任務(wù)處理完成后,并不會把計(jì)算的結(jié)果傳給 thenRun 方法。* 只是處理玩任務(wù)后,執(zhí)行 thenRun 的后續(xù)操作。** @throws Exception*/ @Test public void testThenRun() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return new Random().nextInt();}).thenRun(() -> {System.out.println("進(jìn)入了thenRun");});System.out.println(future.get()); }thenCombine
thenCombine會把兩個(gè)CompletableFuture的任務(wù)都執(zhí)行完成后,把兩個(gè)任務(wù)的返回值一塊交給thenCombine處理(有返回值)。
代碼示例
/*** thenCombine 會把 兩個(gè) CompletableFuture 的任務(wù)都執(zhí)行完成后* 把兩個(gè)任務(wù)的結(jié)果一塊交給 thenCombine 來處理。** @throws Exception*/ @Test public void testThenCombine() throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";}).thenCombine(CompletableFuture.supplyAsync(() -> {return "world";}), (t1, t2) -> {return t1 + " " + t2;});System.out.println(future.get()); }thenAcceptBoth
當(dāng)兩個(gè)CompletableFuture都執(zhí)行完成后,把結(jié)果一塊交給thenAcceptBoth處理(無返回值)
代碼示例
/*** 當(dāng)兩個(gè) CompletableFuture 都執(zhí)行完成后* 把結(jié)果一塊交給thenAcceptBoth來進(jìn)行消耗** @throws Exception*/ @Test public void testThenAcceptBoth() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).thenAcceptBoth(CompletableFuture.supplyAsync(() -> {return "world";}), (t1, t2) -> {System.out.println(t1 + " " + t2);});System.out.println(future.get()); }applyToEither
兩個(gè)CompletableFuture,誰執(zhí)行返回的結(jié)果快,就用哪個(gè)的結(jié)果進(jìn)行下一步操作(有返回值)。
代碼示例
/*** 兩個(gè)CompletableFuture,誰執(zhí)行返回的結(jié)果快,我就用那個(gè)CompletionStage的結(jié)果進(jìn)行下一步的轉(zhuǎn)化操作** @throws Exception*/ @Test public void testApplyToEither() throws Exception {CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}return "hello";}).applyToEither(CompletableFuture.supplyAsync(() -> {return "world";}), (t) -> {return t;});System.out.println(future.get()); }acceptEither
兩個(gè)CompletableFuture,誰執(zhí)行返回的結(jié)果快,就用哪個(gè)的結(jié)果進(jìn)行下一步操作(無返回值)。
代碼示例
/*** 兩個(gè)CompletableFuture,誰執(zhí)行返回的結(jié)果快,我就用那個(gè)CompletionStage的結(jié)果進(jìn)行下一步的消耗操作。** @throws Exception*/ @Test public void testAcceptEither() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).acceptEither(CompletableFuture.supplyAsync(() -> {return "world";}), t1 -> {System.out.println(t1);});System.out.println(future.get()); }runAfterEither
兩個(gè)CompletableFuture,任何一個(gè)完成了都會執(zhí)行下一步操作
代碼示例
/*** 兩個(gè)CompletableFuture,任何一個(gè)完成了都會執(zhí)行下一步的操作** @throws Exception*/ @Test public void testRunAfterEither() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).runAfterEither(CompletableFuture.supplyAsync(() -> {return "world";}), () -> {System.out.println("執(zhí)行完了");});System.out.println(future.get()); }runAfterBoth
兩個(gè)CompletableFuture,都完成了才會執(zhí)行下一步操作。
代碼示例
/*** 兩個(gè)CompletableFuture,都完成了計(jì)算才會執(zhí)行下一步的操作** @throws Exception*/ @Test public void testRunAfterBoth() throws Exception {CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {return "hello";}).runAfterBoth(CompletableFuture.supplyAsync(() -> {return "world";}), () -> {System.out.println("執(zhí)行完了");});System.out.println(future.get()); }thenCompose
thenCompose方法允許對兩個(gè)CompletableFuture進(jìn)行流水線操作,當(dāng)?shù)谝粋€(gè)操作完成時(shí),將其結(jié)果作為參數(shù)傳遞給第二個(gè)操作。
代碼示例
/*** thenCompose 方法允許你對兩個(gè) CompletableFuture 進(jìn)行流水線操作,* 第一個(gè)操作完成時(shí),將其結(jié)果作為參數(shù)傳遞給第二個(gè)操作。* @throws Exception*/ @Test public void testThenCompose() throws Exception {CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {int t = new Random().nextInt();System.out.println(t);return t;}).thenCompose(param -> {return CompletableFuture.supplyAsync(() -> {int t = param * 2;System.out.println("t2=" + t);return t;});});System.out.println(future.get()); }結(jié)語
CompletableFuture是jdk8中新增的一個(gè)特性,特點(diǎn)是非阻塞異步編程。合理的使用非阻塞異步編程,比如將兩步關(guān)聯(lián)不大的操作并行處理,可以優(yōu)化代碼的執(zhí)行效率。同時(shí),在高并發(fā)場景下,CompletableFuture也可以進(jìn)行有效的性能優(yōu)化。
*************************************優(yōu)雅的分割線 **********************************
分享一波:程序員賺外快-必看的巔峰干貨
如果以上內(nèi)容對你覺得有用,并想獲取更多的賺錢方式和免費(fèi)的技術(shù)教程
請關(guān)注微信公眾號:HB荷包
一個(gè)能讓你學(xué)習(xí)技術(shù)和賺錢方法的公眾號,持續(xù)更新
總結(jié)
以上是生活随笔為你收集整理的Java中如何使用非阻塞异步编程——CompletableFuture的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: dubbo控制台安装
- 下一篇: JDBC 连接Hive 简单样例(开启K