给Future一个Promise
對java開發(fā)者來說,經(jīng)常需要在一個線程中另起一個線程來異步干其他事,就涉及到熟悉的Thread和Runnable。使用方式如下:
System.out.println("Do something ..."); new Thread(new Runnable() {@Overridepublic void run() {System.out.println("Async do something ....");} });System.out.println("Done ...");或者java8使用線程池的方式:
ExecutorService executorService = Executors.newFixedThreadPool(1); executorService.submit(() -> {System.out.println("Async do something ...."); });executorService.shutdown();有時候,我們希望知道執(zhí)行結(jié)果或者拿到線程執(zhí)行的返回值,Future就是為了解決這個問題,可以判斷任務(wù)(線程)是否完成,可以中斷任務(wù),可以獲取執(zhí)行結(jié)果。如下:
JDK實現(xiàn)
ExecutorService executor = Executors.newCachedThreadPool(); Future<Integer> future = executor.submit(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 100; }); System.out.println("Do something ..."); try {while (true) {if (future.isDone()) {Integer integer = future.get();System.out.println("Future return value " + integer);break;}System.out.println("wait.....");Thread.sleep(500);} } catch (InterruptedException | ExecutionException e) {e.printStackTrace(); } executor.shutdown();}Do something …
wait…..
wait…..
wait…..
wait…..
Future return value 100
或者ScheduledExecutorService:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); ScheduledFuture<?> future = executor.schedule(task, 5, TimeUnit.SECONDS); while (true) {done = future.isDone();if (done) {break;}System.out.printf("Remaining Delay: %sms ",future.getDelay(TimeUnit.MILLISECONDS));Thread.sleep(1000); }executor.shutdown();Remaining Delay: 4999ms
Remaining Delay: 3973ms
Remaining Delay: 2969ms
Remaining Delay: 1966ms
Remaining Delay: 962ms
Scheduling: 148060026198219
但代碼中也體現(xiàn)出一個問題,如果獲取返回值,只能阻塞等待,或者定時查詢是否執(zhí)行完,增加編碼的復(fù)雜度。如何做到異步非阻塞呢(線程執(zhí)行完后,主動通知調(diào)用者)。
Guava 實現(xiàn)
GUAVA提供了一個ListenableFuture,繼承JDK的Future接口,可以實現(xiàn)完成后出發(fā)回調(diào),如下:
ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());ListenableFuture<Integer> future = executor.submit(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return 100; }); System.out.println("Do something ...");Futures.addCallback(future, new FutureCallback<Integer>() {@Overridepublic void onSuccess(Integer result) {System.out.println("Future return value " + result);}@Overridepublic void onFailure(Throwable t) {} });System.out.println("... Main thread Done ...");executor.shutdown();Do something …
… Main thread Done …
Future return value 100
可以看到,要獲取執(zhí)行者的放回值,不需要get()阻塞獲取,繼續(xù)執(zhí)行后續(xù)邏輯,線程結(jié)束后會通知調(diào)用者。
Netty 實現(xiàn)
Netty util包中也提供了異步非阻塞的實現(xiàn):
EventExecutorGroup group = new DefaultEventExecutorGroup(1); System.out.println("Do something ...");Future<Integer> future = group.submit(() -> {System.out.println("Sleep......");Thread.sleep(1000);return 99; });future.addListener(future1 -> System.out.println("result:" + future1.get())); System.out.println("Done ...");Do something …
Done …
Sleep……
result:99
而Scala語言本身就支持Future-promise模型,可以看看這篇文章:Scala Future and Promise
總結(jié)
以上是生活随笔為你收集整理的给Future一个Promise的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我逛了一下JDK一条街,发现了不少好东西
- 下一篇: 深入Redis 主从复制原理