Flink的异步I/O及Future和CompletableFuture
1 概述
??Flink在做流數據計算時,經常要外部系統進行交互,如Redis、Hive、HBase等等存儲系統。系統間通信延遲是否會拖慢整個Flink作業,影響整體吞吐量和實時性。
??如需要查詢外部數據庫以關聯上用戶的額外信息,通常的實現方式是向數據庫發送用戶a的查詢請求(如在MapFunction中),然后等待結果返回,返回之后才能進行下一次查詢請求,這是一種同步訪問的模式,如下圖左邊所示,網絡等待時間極大的阻礙了吞吐和延遲。
??Flink從1.2版本開始就引入了Async I/O(https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/operators/asyncio.html)。異步模式可以并發的處理多個請求和回復,也就是說,你可以連續的向數據庫發送用戶a、b、c、d等的請求,與此同時,哪個請求的回復先返回了就處理哪個回復,從而連續的請求之間不需要阻塞等待,如上圖右邊所示,這也是Async I/O的實現原理。
2 Future和CompletableFuture
??先了解一下Future和CompletableFuture
2.1 Future
??從JDK1.5開始,提供了Future來表示異步計算的結果,一般需要結合ExecutorService(執行者)和Callable(任務)來使用。Future的get方法是阻塞的
package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時間60 時間單位s 線程等待隊列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));Future<Long> future = executor.submit(() -> {// 故意耗時Thread.sleep(3000);return System.currentTimeMillis();});System.out.println(future.get());System.out.println("因為get是阻塞的,所以這個消息在數據之后輸出");executor.shutdown();} }??結果為
1612337847685 因為get是阻塞的,所以這個消息在數據之后輸出??Future只是個接口,實際上返回的類是FutureTask:
public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {return new FutureTask<T>(callable);}??FutureTask的get方法如下
private volatile int state;private static final int NEW = 0;private static final int COMPLETING = 1;private static final int NORMAL = 2;private static final int EXCEPTIONAL = 3;private static final int CANCELLED = 4;private static final int INTERRUPTING = 5;private static final int INTERRUPTED = 6;public V get() throws InterruptedException, ExecutionException {int s = state;// 首先判斷FutureTask的狀態是否為完成狀態,如果是完成狀態,說明已經執行過set或setException方法,返回report(s)。任務的運行狀態。最初是NEW == 0。運行狀態僅在set、setException和cancel方法中轉換為終端狀態。if (s <= COMPLETING)//如果get時,FutureTask的狀態為未完成狀態,則調用awaitDone方法進行阻塞s = awaitDone(false, 0L);return report(s);}/*** awaitDone方法可以看成是不斷輪詢查看FutureTask的狀態。在get阻塞期間:①如果執行get的線程被中斷,則移除FutureTask的所有阻塞隊列中的線程(waiters),并拋出中斷異常;②如果FutureTask的狀態轉換為完成狀態(正常完成或取消),則返回完成狀態;③如果FutureTask的狀態變為COMPLETING, 則說明正在set結果,此時讓線程等一等;④如果FutureTask的狀態為初始態NEW,則將當前線程加入到FutureTask的阻塞線程中去;⑤如果get方法沒有設置超時時間,則阻塞當前調用get線程;如果設置了超時時間,則判斷是否達到超時時間,如果到達,則移除FutureTask的所有阻塞列隊中的線程,并返回此時FutureTask的狀態,如果未到達時間,則在剩下的時間內繼續阻塞當前線程。*/private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}}??Future的局限性:
??①可以發現雖然 Future接口可以構建異步應用,但是對于結果的獲取卻是很不方便,只能通過阻塞或者輪詢的方式得到任務的結果。阻塞的方式顯然和我們的異步編程的初衷相違背,輪詢的方式又會耗費無謂的 CPU 資源,而且也不能及時地得到計算結果。
??②它很難直接表述多個Future 結果之間的依賴性。實際開發中,經常需要將多個異步計算的結果合并成一個,或者等待Future集合中的所有任務都完成,或者任務完成以后觸發執行動作
2.2 CompletableFuture
??JDk1.8引入了CompletableFuture,它實際上也是Future的實現類。這里可以得出:
??CompletableFuture有一些新特性,能完成Future不能完成的工作。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {??首先看類定義,實現了CompletionStage接口,這個接口是所有的新特性了。
??對于CompletableFuture有四個執行異步任務的方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)??supply開頭的帶有返回值,run開頭的無返回值。如果我們指定線程池,則會使用我么指定的線程池;如果沒有指定線程池,默認使用ForkJoinPool.commonPool()作為線程池。
public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時間60 時間單位s 線程等待隊列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {return "hello";}, executor);System.out.println(future.get());executor.shutdown();}??上面只是對執行異步任務,如果要利用計算結果進一步處理使用,進行結果轉換有如下方法:①thenApply (同步)②thenApplyAsync(異步)
// 同步轉換 public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) // 異步轉換,使用默認線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) // 異步轉換,使用指定線程池 public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) package com.quinto.flink;import java.util.concurrent.*;public class FutureTest {public static void main(String[] args) throws ExecutionException, InterruptedException {// 核心線程池大小5 最大線程池大小10 線程最大空閑時間60 時間單位s 線程等待隊列ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));CompletableFuture<Long> future = CompletableFuture// 執行異步任務.supplyAsync(() -> {return System.currentTimeMillis();}, executor)// 對前面的結果進行處理.thenApply(n -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}Long time = System.currentTimeMillis();System.out.println("如果是同步的,這條消息應該先輸出");return time-n;});System.out.println("等待2秒");System.out.println(future.get());executor.shutdown();} }??結果為
如果是同步的,這條消息應該先輸出 等待2秒 2017??如果把thenApply換成thenApplyAsync,結果如下
等待2秒 如果是同步的,這條消息應該先輸出 2008??處理完任務以及結果,該去消費了有如下方法:①thenAccept(能夠拿到并利用執行結果) ② thenRun(不能夠拿到并利用執行結果,只是單純的執行其它任務)③thenAcceptBoth(能傳入另一個stage,然后把另一個stage的結果和當前stage的結果作為參數去消費。)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)public CompletableFuture<Void> thenRun(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action) public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)??如果要組合兩個任務有如下方法:①thenCombine(至少兩個方法參數,一個為其它stage,一個為用戶自定義的處理函數,函數返回值為結果類型) ;② thenCompose(至少一個方法參數即處理函數,函數返回值為stage類型)
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)public <U> CompletionStage<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) public <U> CompletionStage<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)??如果有多條渠道去完成同一種任務,選擇最快的那個有如下方法:①applyToEither (有返回值)②acceptEither(沒有返回值)
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn) public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action) public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)??Future和CompletableFuture對比:
??Future:只能通過get方法或者死循環判斷isDone來獲取。異常情況不好處理。
??CompletableFuture:只要設置好回調函數即可實現:①只要任務完成,就執行設置的函數,不用考慮什么時候任務完成②如果發生異常,會執行處理異常的函數③能應付復雜任務的處理,如果有復雜任務,比如依賴問題,組合問題等,同樣可以寫好處理函數來處理
3 使用Aysnc I/O的條件
??(1)具有對外部系統進行異步IO訪問的客戶端API,如使用vertx,但是目前只支持scala 2.12的版本,可以使用java類庫來做
??(2)沒有這樣的客戶端,可以通過創建多個客戶端并使用線程池處理同步調用來嘗試將同步客戶端轉變為有限的并發客戶端,如可以寫ExecutorService來實現。但是這種方法通常比適當的異步客戶端效率低。
4 Aysnc I/O的案例
4.1 有外部系統進行異步IO訪問的客戶端API的方式
// 這個例子實現了異步請求和回調的Futures,具有Java8的Futures接口(與Flink的Futures相同)class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {// 定義連接客戶端,并且不參與序列化private transient DatabaseClient client;@Overridepublic void open(Configuration parameters) throws Exception {// 創建連接client = new DatabaseClient(host, post, credentials);}@Overridepublic void close() throws Exception {client.close();}@Overridepublic void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {// 用連接進行查詢,查詢之后返回的是future,有可能有,有可能沒有final Future<String> result = client.query(key);// 如果有結果返回的話會通知你(有個回調方法),這里可以設置超時時間,如果超過了一定的時間還沒有返回相當于從這里取一取就會拋異常,結果就會返回nullCompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// Normally handled explicitly.return null;}}//如果它已經執行完了,就會把結果放到Collections里面}).thenAccept( (String dbResult) -> {resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));});} }// create the original stream DataStream<String> stream = ...;// unorderedWait這個是不在乎請求返回的順序的,里面用到的是阻塞隊列,隊列滿了會阻塞,隊列里面一次最多可以有100個異步請求,超時時間是1000毫秒 DataStream<Tuple2<String, String>> resultStream =AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);4.2 沒有外部系統進行異步IO訪問的客戶端API的方式
package com.quinto.flink;import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.Collections; import java.util.concurrent.*; import java.util.function.Supplier;class AsyncDatabaseRequest extends RichAsyncFunction<String,String> {// 這里用到了連接池,以前查詢是阻塞的,查詢完這個下一個還是同個連接,// 現在要發送多個請求不能用同個連接,每個請求都會返回一個結果。這里不但要用到連接池,還要用到線程池。private transient DruidDataSource druidDataSource;private transient ExecutorService executorService;@Overridepublic void open(Configuration parameters) throws Exception {executorService = Executors.newFixedThreadPool(20);druidDataSource = new DruidDataSource();druidDataSource.setDriverClassName("com.mysql.jdbc.Driver");druidDataSource.setUsername("root");druidDataSource.setPassword("root");druidDataSource.setUrl("jdbc:mysql:..localhost:3306/bigdata?characterEncoding=UTF-8");druidDataSource.setInitialSize(5);druidDataSource.setMinIdle(10);druidDataSource.setMaxActive(20);}@Overridepublic void close() throws Exception {druidDataSource.close();executorService.shutdown();}@Overridepublic void asyncInvoke(String input,final ResultFuture<String> resultFuture) {// 向線程池丟入一個線程Future<String> future = executorService.submit(() -> {String sql = "SELECT id,name FROM table WHERE id = ?";String result = null;Connection connection = null;PreparedStatement stmt = null;ResultSet rs = null;try {connection = druidDataSource.getConnection();stmt = connection.prepareStatement(sql);rs = stmt.executeQuery();while (rs.next()){result = rs.getString("name");}}finally {if (rs!=null){rs.close();}if (stmt!=null){stmt.close();}if (connection!=null){connection.close();}}return result;});// 接收任務的處理結果,并消費處理,無返回結果。CompletableFuture.supplyAsync(new Supplier<String>() {@Overridepublic String get() {try {// 從future里面把結果取出來,如果有就返回,沒有的話出異常就返回nullreturn future.get();} catch (Exception e) {return null;}}// 拿到上一步的執行結果,進行處理}).thenAccept((String result)->{// 從future里面取出數據會有一個回調,然后會把他放到resultFuture,complete中要求放的是一個集合,所以需要進行轉換resultFuture.complete(Collections.singleton(result));});} }??這樣mysql的API還是用他原來的,只不過把mysql的查詢使用把要查詢的功能丟線程池。以前查詢要好久才返回,現在來一個查詢就丟到線程池里面,不需要等待結果,返回的結果放在future里面。原來查詢是阻塞的,現在開啟一個線程查,把查詢結果丟到future里面。相當于新開一個線程讓他幫我查,原來是單線程的,現在開多個線程同時查,然后把結果放future,以后有結果了從這里面取。
總結
以上是生活随笔為你收集整理的Flink的异步I/O及Future和CompletableFuture的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SparkContext解析
- 下一篇: dz论坛php.ini设置,Discuz