java 检视_Java高并发系列——检视阅读(五)
JUC中工具類CompletableFuture
CompletableFuture是java8中新增的一個類,算是對Future的一種增強,用起來很方便,也是會經常用到的一個工具類,熟悉一下。
CompletionStage接口
CompletionStage代表異步計算過程中的某一個階段,一個階段完成以后可能會觸發另外一個階段
一個階段的計算執行可以是一個Function,Consumer或者Runnable。比如:stage.thenApply(x -> square(x)).thenAccept(x -> System.out.print(x)).thenRun(() -> System.out.println())
一個階段的執行可能是被單個階段的完成觸發,也可能是由多個階段一起觸發
CompletableFuture類
在Java8中,CompletableFuture提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,并且提供了函數式編程的能力,可以通過回調的方式處理計算結果,也提供了轉換和組合 CompletableFuture 的方法。
它可能代表一個明確完成的Future,也有可能代表一個完成階段( CompletionStage ),它支持在計算完成以后觸發一些函數或執行某些動作。
它實現了Future和CompletionStage接口
常見的方法,熟悉一下:
runAsync 和 supplyAsync方法
CompletableFuture 提供了四個靜態方法來創建一個異步操作。
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static CompletableFuture supplyAsync(Supplier supplier)
public static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
沒有指定Executor的方法會使用ForkJoinPool.commonPool() 作為它的線程池執行異步代碼。如果指定線程池,則使用指定的線程池運行。以下所有的方法都類同。
runAsync方法不支持返回值。
supplyAsync可以支持返回值。
示例:
public class CompletableFutureTest1 {
public static void main(String[] args) throws Exception {
CompletableFutureTest1.runAsync();
CompletableFutureTest1.supplyAsync();
}
//runAsync方法不支持返回值 Runnable
public static void runAsync() throws Exception {
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
});
future.get();
}
//supplyAsync可以支持返回值 Supplier
public static void supplyAsync() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
System.out.println("run end ...");
return System.currentTimeMillis();
});
//如果沒有future.get()阻塞等待結果的話,因為CompletableFuture.supplyAsync()方法默認是守護線程形式執行任務,在主線程結束后會跟著退出,
// 如果傳入的是線程池去執行,這不是守護線程,不會導致退出
long time = future.get();
System.out.println("time = "+time);
}
}
輸出:
run end ...
run end ...
time = 1599556248764
計算結果完成時的回調方法
當CompletableFuture的計算結果完成,或者拋出異常的時候,可以執行特定的Action。主要是下面的方法:
public CompletableFuture whenComplete(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function fn)
可以看到Action的類型是BiConsumer它可以處理正常的計算結果,或者異常情況。
whenComplete 和 whenCompleteAsync 的區別:
whenComplete:當前任務的線程繼續執行 whenComplete 的任務。
whenCompleteAsync:把 whenCompleteAsync 這個任務繼續提交給線程池來進行執行。
示例:
public class CompletableFutureTest1 {
public static void main(String[] args) throws Exception {
CompletableFutureTest1.whenComplete();
CompletableFutureTest1.whenCompleteBySupply();
}
public static void whenComplete() throws Exception {
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
int i = 12 / 0;
//run end ...
//執行完成!
//int i = 12 / 0;
}
System.out.println("run end ...");
});
//對執行成功或者執行異常做處理的回調方法
future.whenComplete((avoid, throwable) -> {
//先判斷是否拋異常分開處理
if (throwable != null) {
System.out.println("執行失敗!" + throwable.getMessage());
} else {
System.out.println("執行完成!");
}
});
//對執行異常做處理的回調方法
future.exceptionally(throwable -> {
System.out.println("執行失敗!" + throwable.getMessage());
return null;
}
);
TimeUnit.SECONDS.sleep(2);
}
public static void whenCompleteBySupply() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
if (new Random().nextInt() % 2 >= 0) {
//int i = 12 / 0;
//run end ...
//執行完成!
//int i = 12 / 0;
}
System.out.println("run end ...");
return "success";
});
//whenComplete在thenAccept之前執行
future.thenAccept(result -> {
System.out.println(result);
});
//對執行成功或者執行異常做處理的回調方法
future.whenComplete((avoid, throwable) -> {
//先判斷是否拋異常分開處理
if (throwable != null) {
System.out.println("執行失敗!" + throwable.getMessage());
} else {
System.out.println("執行完成!");
}
});
//對執行異常做處理的回調方法
future.exceptionally(throwable -> {
System.out.println("執行失敗!" + throwable.getMessage());
return null;
}
);
TimeUnit.SECONDS.sleep(2);
}
}
輸出:
執行失敗!java.lang.ArithmeticException: / by zero
執行失敗!java.lang.ArithmeticException: / by zero
run end ...
執行完成!
success
thenApply 方法
當一個線程依賴另一個線程時,可以使用 thenApply、thenApplyAsync 方法來把這兩個線程串行化。
public CompletableFuture thenApply(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
public CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)
Function super T,? extends U>
T:上一個任務返回結果的類型
U:當前任務的返回值類型
示例:
public class CompletableFutureTest2 {
public static void main(String[] args) throws Exception {
CompletableFutureTest2.thenApply();
}
//多個CompletableFuture可以串行執行
//當一個線程依賴另一個線程時,可以使用 thenApply 方法來把這兩個線程串行化。
//多個任務串行執行,第二個任務依賴第一個任務的結果。
private static void thenApply() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
long result = new Random().nextInt(100);
System.out.println("result1=" + result);
return result;
}
).thenApply((t -> {
long result = t * 5;
System.out.println("result2=" + result);
return result;
}));
//方式一:阻塞等待結果
long result = future.get();
System.out.println("result2: " + result);
//方式二:調用成功后接收任務的處理結果,并消費處理,無返回結果
future.thenAccept((r) -> {
System.out.println("result2: " + r);
});
}
}
輸出:
result1=41
result2=205
result2: 205
result2: 205
handle 方法——可以處理正常和異常情況的thenApply 方法
handle 是執行任務完成時對結果的處理。
handle 方法和 thenApply 方法處理方式基本一樣。不同的是 handle 是在任務完成后再執行,還可以處理異常的任務。thenApply 只可以執行正常的任務,任務出現異常則不執行 thenApply 方法。
public CompletionStage handle(BiFunction super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn);
public CompletionStage handleAsync(BiFunction super T, Throwable, ? extends U> fn,Executor executor);
示例:在 handle 中可以根據任務是否有異常來進行做相應的后續處理操作。而 thenApply 方法,如果上個任務出現錯誤,則不會執行 thenApply 方法。
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.handle();
}
public static void handle() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int i = 10 / 0;
return new Random().nextInt(10);
}
}).handle(
(param, throwable) -> {
int result = -1;
if (throwable == null) {
result = param * 2;
} else {
System.out.println(throwable.getMessage());
}
return result;
}
/*new BiFunction() {
@Override
public Integer apply(Integer param, Throwable throwable) {
int result = -1;
if(throwable==null){
result = param * 2;
}else{
System.out.println(throwable.getMessage());
}
return result;
}
}*/);
System.out.println(future.get());
}
}
輸出:
java.lang.ArithmeticException: / by zero
-1
thenAccept 消費處理結果——無返回結果
接收任務的處理結果,并消費處理,無返回結果。
public CompletionStage thenAccept(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action);
public CompletionStage thenAcceptAsync(Consumer super T> action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.handle();
CompletableFutureTest3.thenAccept();
}
public static void thenAccept() throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return new Random().nextInt(10);
}
).thenAccept(integer -> {
System.out.println(integer);
});
future.get();
}
}
//輸出:5
thenRun 方法——繼續執行下一個Runnable任務,不獲取上一個任務的處理結果
跟 thenAccept 方法不一樣的是,不關心任務的處理結果。只要上面的任務執行完成,就開始執行 thenRun 。
public CompletionStage thenRun(Runnable action);
public CompletionStage thenRunAsync(Runnable action);
public CompletionStage thenRunAsync(Runnable action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenRun();
}
public static void thenRun() throws Exception{
CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextInt(10);
}
}).thenRun(() -> {
System.out.println("thenRun ...");
});
future.get();
}
}
//2秒后輸出:thenRun ...
thenCombine 合并任務
thenCombine 會把 兩個 CompletionStage 的任務都執行完成后,把兩個任務的結果一塊交給 thenCombine 來處理。
public CompletionStage thenCombine(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn);
public CompletionStage thenCombineAsync(CompletionStage extends U> other,BiFunction super T,? super U,? extends V> fn,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenCombine();
}
private static void thenCombine() throws Exception {
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
return "hello";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
return "world";
});
CompletableFuture result = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " " + result2;
});
System.out.println(result.get());
}
}
//輸出:hello world
thenAcceptBoth
當兩個CompletionStage都執行完成后,把結果一塊交給thenAcceptBoth來進行消耗。
public CompletionStage thenAcceptBoth(CompletionStage extends U> other,BiConsumer super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action);
public CompletionStage thenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action, Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenAcceptBoth();
//等待守護進程執行完
TimeUnit.SECONDS.sleep(5);
}
private static void thenAcceptBoth() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.thenAcceptBoth(f2, (result1, result2) -> {
System.out.println("f1=" + result1 + ";f2=" + result2 + ";");
});
}
}
輸出:
f1=1
f2=1
f1=1;f2=1;
applyToEither 方法——有返回值消耗
兩個CompletionStage,誰執行返回的結果快,我就用那個CompletionStage的結果進行下一步的轉化操作。
public CompletionStage applyToEither(CompletionStage extends T> other,Function super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn);
public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.applyToEither();
}
private static void applyToEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(()->{
int t = 1;
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
});
CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
int t = 2;
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
});
CompletableFuture result = f1.applyToEither(f2, (r)->{
System.out.println(r);
return r * 2;
});
System.out.println(result.get());
}
輸出:
f1=1
1
2
acceptEither 方法——無返回值消耗
兩個CompletionStage,誰執行返回的結果快,我就用那個CompletionStage的結果進行下一步的消耗操作。注意,這時候其實兩個CompletionStage都是會執行完的,只是我們只獲取其中的一個比較快的結果而已,參考示例的輸出。
public CompletionStage acceptEither(CompletionStage extends T> other,Consumer super T> action);
public CompletionStage acceptEitherAsync(CompletionStage extends T> other,Consumer super T> action);
public CompletionStage acceptEitherAsync(CompletionStage extends T> other,Consumer super T> action,Executor executor);
示例:
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
CompletableFutureTest3.acceptEither();
TimeUnit.SECONDS.sleep(5);
}
private static void acceptEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
});
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
});
f1.acceptEither(f2, (t) -> {
System.out.println(t);
});
}
}
輸出:
f1=1
1
f2=2
runAfterEither 方法
兩個CompletionStage,任何一個完成了都會執行下一步的操作(Runnable),兩個CompletionStage都是會執行完的.
public CompletionStage runAfterEither(CompletionStage> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage> other,Runnable action);
public CompletionStage runAfterEitherAsync(CompletionStage> other,Runnable action,Executor executor);
示例代碼
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
//CompletableFutureTest3.acceptEither();
CompletableFutureTest3.runAfterEither();
TimeUnit.SECONDS.sleep(5);
}
private static void runAfterEither() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1=" + t);
return t;
}
});
CompletableFuture f2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2=" + t);
return t;
}
});
f1.runAfterEither(f2, ()->{
System.out.println("上面有一個已經完成了。");
});
}
}
輸出:
f1=0
上面有一個已經完成了。
f2=1
runAfterBoth
兩個CompletionStage,都完成了計算才會執行下一步的操作(Runnable),注意輸出順序,runAfterBoth方法要等兩個CompletionStage都執行完了才會執行。
public CompletionStage runAfterBoth(CompletionStage> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage> other,Runnable action);
public CompletionStage runAfterBothAsync(CompletionStage> other,Runnable action,Executor executor);
示例代碼
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
//CompletableFutureTest3.applyToEither();
//CompletableFutureTest3.acceptEither();
//CompletableFutureTest3.runAfterEither();
CompletableFutureTest3.runAfterBoth();
TimeUnit.SECONDS.sleep(5);
}
private static void runAfterBoth() throws Exception {
CompletableFuture f1 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f1="+t);
return t;
}
});
CompletableFuture f2 = CompletableFuture.supplyAsync(new Supplier() {
@Override
public Integer get() {
int t = new Random().nextInt(3);
try {
TimeUnit.SECONDS.sleep(t);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("f2="+t);
return t;
}
});
f1.runAfterBoth(f2, new Runnable() {
@Override
public void run() {
System.out.println("上面兩個任務都執行完成了。");
}
});
}
}
輸出:
f1=1
f2=2
上面兩個任務都執行完成了。
thenCompose 方法
thenCompose 方法允許你對兩個 CompletionStage 進行流水線操作,第一個操作完成時,將其結果作為參數傳遞給第二個操作。
public CompletableFuture thenCompose(Function super T, ? extends CompletionStage> fn);
public CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn) ;
public CompletableFuture thenComposeAsync(Function super T, ? extends CompletionStage> fn, Executor executor) ;
示例代碼
public class CompletableFutureTest3 {
public static void main(String[] args) throws Exception {
CompletableFutureTest3.thenCompose();
TimeUnit.SECONDS.sleep(3);
}
private static void thenCompose() throws Exception {
CompletableFuture f = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
System.out.println("t1=" + t);
return t;
}).thenCompose((param) -> {
return CompletableFuture.supplyAsync(() -> {
int t = param * 2;
System.out.println("t2=" + t);
return t;
});
});
System.out.println("thenCompose result : " + f.get());
}
}
輸出:
t1=1
t2=2
thenCompose result : 2
疑問:
Q:thenAcceptBoth與thenCombine 的區別是什么?
A:thenAcceptBoth無返回值消耗執行,thenCombine 會有返回值。一般accept都是沒有返回值的,apply是有返回值的。
Q:thenCompose 與thenApply 方法 的區別是什么?不都是串行執行下一個任務,并把第一個任務作為參數傳遞給第二個任務么?
獲取線程執行結果的6種方法
方式1:Thread的join()方法實現
代碼中通過join方式阻塞了當前主線程,當thread線程執行完畢之后,join方法才會繼續執行。
join的方式,只能阻塞一個線程,如果其他線程中也需要獲取thread線程的執行結果,join方法無能為力了。
示例:
public class ThreadJoinTest {
//用于封裝結果
static class Result {
T result;
public T getResult() {
return result;
}
public void setResult(T result) {
this.result = result;
}
}
public static void main(String[] args) throws InterruptedException {
Result result = new Result<>();
Thread t = new Thread(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
result.setResult("success");
System.out.println("end thread!");
});
t.start();
//讓主線程等待thread線程執行完畢之后再繼續,join方法會讓當前線程阻塞
t.join();
System.out.println("main get result="+result.getResult());
}
}
輸出:
start thread!
end thread!
main get result=success
方式2:CountDownLatch實現
使用CountDownLatch可以讓一個或者多個線程等待一批線程完成之后,自己再繼續.
示例:
public class CountDownLatchTest2 {
static class Result{
private T result;
public T getResult() {
return result;
}
public void setResult(T result) {
this.result = result;
}
}
public static void main(String[] args) throws InterruptedException {
Result result = new Result<>();
CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
result.setResult("success");
System.out.println("end thread!");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
latch.countDown();
}
});
t.start();
latch.await();
System.out.println("main get result="+result.getResult());
}
}
輸出:
start thread!
end thread!
main get result=success
方式3:ExecutorService.submit方法實現——ThreadPoolExecutor
使用ExecutorService.submit方法實現的,此方法返回一個Future,future.get()會讓當前線程阻塞,直到Future關聯的任務執行完畢。
示例:
public class ThreadPoolExecutorTest2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 5, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
Future future = executor.submit(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
executor.shutdown();
System.out.println("main get result="+future.get());
}
}
輸出同上。
方式4:FutureTask方式1——作為Runnable傳給Thread執行
線程池的submit方法傳入的Callable對象本質上也是包裝成一個FutureTask來執行。
代碼中使用FutureTask實現的,FutureTask實現了Runnable接口,并且內部帶返回值,所以可以傳遞給Thread直接運行,futureTask.get()會阻塞當前線程,直到FutureTask構造方法傳遞的任務執行完畢,get方法才會返回。
示例:
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創建一個FutureTask
FutureTask futureTask = new FutureTask<>(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
//將futureTask傳遞一個線程運行
new Thread(futureTask).start();
//futureTask.get()會阻塞當前線程,直到futureTask執行完畢
String result = futureTask.get();
System.out.println("main get result=" + result);
}
}
方式5:FutureTask方式2——構造FutureTask對象及執行內容,直接在Thread里面跑run方法
當futureTask的run()方法執行完畢之后,futureTask.get()會從阻塞中返回。
示例:
public class FutureTaskTest1 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//創建一個FutureTask
FutureTask futureTask = new FutureTask<>(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
//將futureTask傳遞一個線程運行
new Thread(()->futureTask.run()).start();
//futureTask.get()會阻塞當前線程,直到futureTask執行完畢
String result = futureTask.get();
System.out.println("main get result=" + result);
}
}
方式6:CompletableFuture方式實現
CompletableFuture.supplyAsync可以用來異步執行一個帶返回值的任務,調用completableFuture.get()
會阻塞當前線程,直到任務執行完畢,get方法才會返回。
public class CompletableFutureTest4 {
public static void main(String[] args) throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("start thread!");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end thread!");
return "success";
});
// future.get()會阻塞當前線程直到獲得結果
System.out.println("main get result="+future.get());
}
}
高并發中計數器的四種實現方式
需求:一個jvm中實現一個計數器功能,需保證多線程情況下數據正確性。
我們來模擬50個線程,每個線程對計數器遞增100萬次,最終結果應該是5000萬。
我們使用4種方式實現,看一下其性能,然后引出為什么需要使用LongAdder、LongAccumulator。
方式一:使用加鎖的方式實現——synchronized或Lock
從示例輸出結果看,ReentrantLock的效率明顯比synchronized差了2-3倍。
示例:
public class SynchronizeCalculator {
private static long count = 0;
private static Lock lock = new ReentrantLock();
public synchronized static void incrment() {
count++;
}
public static void incrmentByLock() {
lock.lock();
try {
count++;
}finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count = 0;
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
incrment();
//incrmentByLock();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("結果:%s,耗時(ms):%s", count, (t2 - t1)));
executor.shutdown();
}
}
輸出:
//synchronized
結果:50000000,耗時(ms):490
結果:50000000,耗時(ms):1574
結果:50000000,耗時(ms):399
結果:50000000,耗時(ms):395
結果:50000000,耗時(ms):396
//lock
結果:50000000,耗時(ms):1289
結果:50000000,耗時(ms):1239
結果:50000000,耗時(ms):1224
結果:50000000,耗時(ms):1219
結果:50000000,耗時(ms):1246
方式2:AtomicLong實現
AtomicLong內部采用CAS的方式實現,并發量大的情況下,CAS失敗率比較高,導致性能比synchronized還低一些。并發量不是太大的情況下,CAS性能還是可以的。
示例:
public class AtomicLongTest {
private static AtomicLong count = new AtomicLong(0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.set(0);
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.getAndIncrement();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("結果:%s,耗時(ms):%s", count.get(), (t2 - t1)));
executor.shutdown();
}
}
輸出:
結果:50000000,耗時(ms):1018
結果:50000000,耗時(ms):1442
結果:50000000,耗時(ms):1033
結果:50000000,耗時(ms):935
結果:50000000,耗時(ms):1320
方式3:LongAdder實現——相當于鎖分段技術
先介紹一下LongAdder,說到LongAdder,不得不提的就是AtomicLong,AtomicLong是JDK1.5開始出現的,里面主要使用了一個long類型的value作為成員變量,然后使用循環的CAS操作去操作value的值,并發量比較大的情況下,CAS操作失敗的概率較高,內部失敗了會重試,導致耗時可能會增加。
LongAdder是JDK1.8開始出現的,所提供的API基本上可以替換掉原先的AtomicLong。LongAdder在并發量比較大的情況下,操作數據的時候,相當于把這個數字分成了很多份數字,然后交給多個人去管控,每個管控者負責保證部分數字在多線程情況下操作的正確性。當多線程訪問的時,通過hash算法映射到具體管控者去操作數據,最后再匯總所有的管控者的數據,得到最終結果。相當于降低了并發情況下鎖的粒度,所以效率比較高,看一下下面的圖,方便理解:
示例:
代碼中new LongAdder()創建一個LongAdder對象,內部數字初始值是0,調用increment()方法可以對LongAdder內部的值原子遞增1。reset()方法可以重置LongAdder的值,使其歸0。
public class LongAdderTest {
private static LongAdder count = new LongAdder();
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.increment();
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("結果:%s,耗時(ms):%s", count.sum(), (t2 - t1)));
executor.shutdown();
}
}
輸出:
結果:50000000,耗時(ms):209
結果:50000000,耗時(ms):133
結果:50000000,耗時(ms):149
結果:50000000,耗時(ms):146
結果:50000000,耗時(ms):148
方式4:LongAccumulator實現
LongAccumulator介紹
LongAccumulator是LongAdder的功能增強版。LongAdder的API只有對數值的加減,而LongAccumulator提供了自定義的函數操作,其構造函數如下:
/**
* accumulatorFunction:需要執行的二元函數(接收2個long作為形參,并返回1個long)
* identity:初始值
**/
public LongAccumulator(LongBinaryOperator accumulatorFunction, long identity) {
this.function = accumulatorFunction;
base = this.identity = identity;
}
示例:
LongAccumulator的效率和LongAdder差不多,不過更靈活一些。
調用new LongAdder()等價于new LongAccumulator((x, y) -> x + y, 0L)。
從上面4個示例的結果來看,LongAdder、LongAccumulator全面超越同步鎖及AtomicLong的方式,建議在使用AtomicLong的地方可以直接替換為LongAdder、LongAccumulator,吞吐量更高一些。
public class LongAccumulatorTest {
private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 0);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.accumulate(1);
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("結果:%s,耗時(ms):%s", count.longValue(), (t2 - t1)));
executor.shutdown();
}
}
輸出:
結果:50000000,耗時(ms):152
結果:50000000,耗時(ms):148
結果:50000000,耗時(ms):137
結果:50000000,耗時(ms):137
結果:50000000,耗時(ms):144
疑問:
Q:LongAccumulator.reset方法并不能重置重置LongAccumulator的identity:初始值正確,使其恢復原來的初始值。當初始值為0是不會發生這個問題,而當我們設置初始值如1時,就會導致后續的計算操作增加了5份初始值,目前猜測原因是因為代碼中LongAccumulator在并發量比較大的情況下,操作數據的時候,相當于把這個數字分成了很多份數字 ,而初始化的時候也是初始化了多份數據,導致初始值疊加了多份。不知道這是個bug么?待解惑。
在此記錄下來希望有遇到這種情況的同學注意。解決方法便是要么初始值identity=0不會有這種問題;或者有需要使用reset方法重置的改為重新創建個LongAccumulator處理。
源碼:
public void reset() {
Cell[] as = cells; Cell a;
base = identity;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
//對多個cell進行初始值賦值導致后面計算疊加了多份初始值
a.value = identity;
}
}
}
示例:
public class LongAccumulatorTest {
//設置初始值為1查看輸出結果
private static volatile LongAccumulator count = new LongAccumulator((x, y) -> x + y, 1);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 5; i++) {
count.reset();
averageTest();
}
}
public static void averageTest() throws InterruptedException {
long t1 = System.currentTimeMillis();
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
CountDownLatch latch = new CountDownLatch(50);
for (int i = 0; i < 50; i++) {
executor.execute(() -> {
try {
for (int j = 0; j < 1000000; j++) {
count.accumulate(1);
}
} finally {
latch.countDown();
}
});
}
latch.await();
long t2 = System.currentTimeMillis();
System.out.println(String.format("結果:%s,耗時(ms):%s", count.longValue(), (t2 - t1)));
executor.shutdown();
}
}
輸出:這時候你會發現只有第一次計算是正確的,只要使用了rest方法重置就會導致這個錯誤。
結果:50000001,耗時(ms):185
結果:50000005,耗時(ms):143
結果:50000005,耗時(ms):139
結果:50000005,耗時(ms):162
結果:50000005,耗時(ms):142
演示公平鎖和非公平鎖
先理解一下什么是公平鎖、非公平鎖?
公平鎖和非公平鎖體現在別人釋放鎖的一瞬間,如果前面已經有排隊的,新來的是否可以插隊,如果可以插隊表示是非公平的,如果不可用插隊,只能排在最后面,是公平的方式。
示例:
@Slf4j
public class ReentrantLockTest2 {
public static void main(String[] args) throws InterruptedException {
ReentrantLockTest2.LockTest(false);
TimeUnit.SECONDS.sleep(4);
log.error("-------------------------------");
ReentrantLockTest2.LockTest(true);
}
public static void LockTest(boolean fair) throws InterruptedException {
ReentrantLock lock = new ReentrantLock(fair);
new Thread(() -> {
lock.lock();
try {
log.error(Thread.currentThread().getName() + " start!");
TimeUnit.SECONDS.sleep(3);
new Thread(() -> {
//注意線程池要當前線程創建的才能使用,如果傳給新開的線程會獲取不到線程池
test("后到組",lock);
}).start();
//test(executorAfter,lock);
log.error(Thread.currentThread().getName() + "end!");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Hold Lock 4 Test Thread").start();
test("先到組",lock);
TimeUnit.SECONDS.sleep(3);
}
private static void test(String name,Lock lock){
//自定義包含策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(5),
new DemoThreadFactory(name), new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
executor.execute(() -> {
lock.lock();
try {
log.error("獲取到鎖!");
} finally {
lock.unlock();
}
});
}
executor.shutdown();
}
}
輸出:
14:45:23.204 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
14:45:26.211 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
14:45:26.211 [From DemoThreadFactory's 先到組-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.211 [From DemoThreadFactory's 先到組-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 后到組-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.212 [From DemoThreadFactory's 先到組-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.213 [From DemoThreadFactory's 后到組-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.218 [From DemoThreadFactory's 后到組-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.218 [From DemoThreadFactory's 先到組-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.218 [From DemoThreadFactory's 后到組-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.218 [From DemoThreadFactory's 后到組-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.219 [From DemoThreadFactory's 后到組-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.219 [From DemoThreadFactory's 后到組-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.219 [From DemoThreadFactory's 后到組-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.219 [From DemoThreadFactory's 后到組-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:26.219 [From DemoThreadFactory's 后到組-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:30.205 [main] ERROR com.self.current.ReentrantLockTest2 - -------------------------------
14:45:30.205 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Thread start!
14:45:33.206 [Hold Lock 4 Test Thread] ERROR com.self.current.ReentrantLockTest2 - Hold Lock 4 Test Threadend!
14:45:33.206 [From DemoThreadFactory's 先到組-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.206 [From DemoThreadFactory's 先到組-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.209 [From DemoThreadFactory's 先到組-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.209 [From DemoThreadFactory's 先到組-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.209 [From DemoThreadFactory's 先到組-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.209 [From DemoThreadFactory's 先到組-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 先到組-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 先到組-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 先到組-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 先到組-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 后到組-Worker-2] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.210 [From DemoThreadFactory's 后到組-Worker-1] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-6] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-7] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-5] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-4] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-3] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.211 [From DemoThreadFactory's 后到組-Worker-9] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.212 [From DemoThreadFactory's 后到組-Worker-10] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
14:45:33.212 [From DemoThreadFactory's 后到組-Worker-8] ERROR com.self.current.ReentrantLockTest2 - 獲取到鎖!
google提供的一些好用的并發工具類
關于并發方面的,juc已幫我們提供了很多好用的工具,而谷歌在此基礎上做了擴展,使并發編程更容易,這些工具放在guava.jar包中。
guava maven配置
com.google.guava
guava
27.0-jre
guava中常用幾個類
MoreExecutors:提供了一些靜態方法,是對juc中的Executors類的一個擴展。
Futures:也提供了很多靜態方法,是對juc中Future的一個擴展。
案例1:異步執行任務完畢之后回調——相當于CompletableFuture的whenComplete方法
ListeningExecutorService接口繼承于juc中的ExecutorService接口,對ExecutorService做了一些擴展,看其名字中帶有Listening,說明這個接口自帶監聽的功能,可以監聽異步執行任務的結果。通過MoreExecutors.listeningDecorator創建一個ListeningExecutorService對象,需傳遞一個ExecutorService參數,傳遞的ExecutorService負責異步執行任務。
ListeningExecutorService的submit方法用來異步執行一個任務,返回ListenableFuture,ListenableFuture接口繼承于juc中的Future接口,對Future做了擴展,使其帶有監聽的功能。調用submit.addListener可以在執行的任務上添加監聽器,當任務執行完畢之后會回調這個監聽器中的方法。
ListenableFuture的get方法會阻塞當前線程直到任務執行完畢。
另一種回調方式是通過調用Futures的靜態方法addCallback在異步執行的任務中添加回調,回調的對象是一個FutureCallback,此對象有2個方法,任務執行成功調用onSuccess,執行失敗調用onFailure。
失敗的情況可以將代碼中int i = 10 / 0;注釋去掉,執行一下可以看看效果。
示例:
@Slf4j
public class GuavaTest {
//相當于CompletableFuture的whenComplete方法
public static void main1(String[] args) throws ExecutionException, InterruptedException {
//創建一個線程池
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
//異步執行一個任務
ListenableFuture submit = executorService.submit(() -> {
log.error("{}", System.currentTimeMillis());
//休眠2秒,默認耗時
TimeUnit.SECONDS.sleep(2);
log.error("{}", System.currentTimeMillis());
return 10;
});
//當任務執行完畢之后回調對應的方法
submit.addListener(() -> {
log.error("任務執行完畢了,我被回調了");
}, MoreExecutors.directExecutor());
log.error("{}", submit.get());
} finally {
delegate.shutdown();
}
}
//相當于CompletableFuture的whenComplete方法
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
TimeUnit.SECONDS, new LinkedBlockingQueue<>(10),
new DemoThreadFactory("訂單創建組"), new ThreadPoolExecutor.AbortPolicy());
ListeningExecutorService service = MoreExecutors.listeningDecorator(executor);
try {
ListenableFuture future = service.submit(() -> {
log.error("{}", System.currentTimeMillis());
//休眠2秒,默認耗時
TimeUnit.SECONDS.sleep(2);
//int i = 10 / 0;
log.error("{}", System.currentTimeMillis());
return 10;
});
Futures.addCallback(future, new FutureCallback() {
@Override
public void onSuccess(Integer integer) {
log.error("執行成功:{}", integer);
}
@Override
public void onFailure(Throwable throwable) {
log.error("執行失敗:{}", throwable.getMessage());
}
});
log.error("{}", future.get());
} finally {
service.shutdown();
}
}
}
輸出:
15:32:54.480 [From DemoThreadFactory's 訂單創建組-Worker-1] ERROR com.self.current.GuavaTest - 1599809574477
15:32:56.487 [From DemoThreadFactory's 訂單創建組-Worker-1] ERROR com.self.current.GuavaTest - 1599809576487
15:32:56.488 [main] ERROR com.self.current.GuavaTest - 10
15:32:56.488 [From DemoThreadFactory's 訂單創建組-Worker-1] ERROR com.self.current.GuavaTest - 執行成功:10
示例2:獲取一批異步任務的執行結果——Futures.allAsList(futureList).get()
結果中按順序輸出了6個異步任務的結果,此處用到了Futures.allAsList方法,看一下此方法的聲明:
public static ListenableFuture> allAsList(
Iterable extends ListenableFuture extends V>> futures)
傳遞一批ListenableFuture,返回一個ListenableFuture>,內部將一批結果轉換為了一個ListenableFuture對象。
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.error("star");
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
List> futureList = new ArrayList<>();
for (int i = 5; i >= 0; i--) {
int j = i;
futureList.add(executorService.submit(() -> {
TimeUnit.SECONDS.sleep(j);
return j;
}));
}
//把多個ListenableFuture轉換為一個ListenableFuture
//ListenableFuture> listListenableFuture = Futures.allAsList(futureList);
//獲取一批任務的執行結果
List resultList = Futures.allAsList(futureList).get();
//輸出
resultList.forEach(item -> {
log.error("{}", item);
});
} finally {
delegate.shutdown();
}
}
輸出:
15:45:51.160 [main] ERROR com.self.current.GuavaTest - star
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 5
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 4
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 3
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 2
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 1
15:45:56.185 [main] ERROR com.self.current.GuavaTest - 0
示例3:一批任務異步執行完畢之后回調——包裝futureList傳遞給Futures.addCallback 方法
異步執行一批任務,最后計算其和,代碼中異步執行了一批任務,所有任務完成之后,回調了上面的onSuccess方法,內部對所有的結果進行sum操作。
示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
log.error("start");
ExecutorService delegate = Executors.newFixedThreadPool(5);
try {
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(delegate);
List> futureList = new ArrayList<>();
for (int i = 5; i >= 0; i--) {
int j = i;
futureList.add(executorService.submit(() -> {
TimeUnit.SECONDS.sleep(j);
return j;
}));
}
//把多個ListenableFuture轉換為一個ListenableFuture
ListenableFuture> listListenableFuture = Futures.allAsList(futureList);
Futures.addCallback(listListenableFuture, new FutureCallback>() {
@Override
public void onSuccess(List result) {
log.error("result中所有結果之和:"+result.stream().reduce(Integer::sum).get());
}
@Override
public void onFailure(Throwable throwable) {
log.error("執行任務發生異常:" + throwable.getMessage(), throwable);
}
});
} finally {
delegate.shutdown();
}
}
輸出:
15:57:00.539 [main] ERROR com.self.current.GuavaTest - start
15:57:05.560 [pool-2-thread-1] ERROR com.self.current.GuavaTest - result中所有結果之和:15
其他疑問:
Q:運行下面這個例子結束不了,debug倒是可以,這是為什么呢?Thread[Monitor Ctrl-Break,5,main]是哪來的?
public class VolatileTest1 {
public static volatile int num = 0;
public static void add(){
num++;
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[10];
for (Thread thread : threads) {
thread = new Thread(()->{
for (int i = 0; i < 1000; i++) {
VolatileTest1.add();
}
});
thread.start();
thread.join();
}
//2
//java.lang.ThreadGroup[name=main,maxpri=10]
// Thread[main,5,main]
// Thread[Monitor Ctrl-Break,5,main]
//結束不了,debug倒是可以,這是為什么呢?Thread[Monitor Ctrl-Break,5,main]是哪來的?
while (Thread.activeCount() >1){
Thread.yield();
System.out.println(Thread.activeCount());
ThreadGroup parent = Thread.currentThread().getThreadGroup();
parent.list();
}
System.out.println("num="+num);
}
}
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的java 检视_Java高并发系列——检视阅读(五)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python3精要(3)-python对
- 下一篇: mysql insert 错误码_利用