从JDK9的Flow接口说起
一、JDK9響應式編程
Java是一個“古老”并且廣泛應用的編程語言,但Java9中引入了一些新鮮有趣的特性。這篇文章主要介紹FlowAPI這個新特性,通過FlowAPI我們僅僅使用JDK就能夠搭建響應式應用程序,而不需要其他額外的類庫,如RxJava或Project Reactor。
盡管如此,當你看到過接口文檔后你就會明白到正如字面所說,這只是一個API而已。她僅僅包含了一些Interface和一個實現類:
1.Interface Flow.Publisher<T>定義了生產數據和控制事件的方法。 2.Interface Flow.Subscriber<T>定義了消費數據和事件的方法。 3.Interface Flow.Subscription 定義了鏈接Publisher和Subscriber的方法。 4.Interface Flow.Processor<T,R>定義了轉換Publisher到Subscriber的方法 5.最后,class SubmissionPublisher<T>是Flow.Publisher<T>的實現,她可以靈活的生產數據,同時與Reactive Stream兼容。雖然Java9中沒有很多FlowAPI的實現類可供我們使用,但是依靠這些接口第三方可以提供的響應式編程得到了規范和統一,比如從JDBC driver到RabbitMQ的響應式實現。
其中Publisher為數據發布者,Subscriber為數據訂閱者,Subscription為發布者和訂閱者之間的訂閱關系,Processor為數據處理器。
- 關系圖:
二、Pull,Push,Pull-Push
我對響應式編程的理解是, 這是一種數據消費者控制數據流的編程方式。需要指出是,當消費速度低于生產速度時,消費者要求生產者降低速度以完全消費數據(這個現象稱作back-pressure(背壓))。這種處理方式不是在制造混亂,你可能已經使用過這種模式,只是最近因為在主要框架和平臺上使用才變得更流行,比如Java9,Spring5。另外在分布式系統中處理大規模數據傳輸時也使用到了這種模式。
回顧過去可以幫我們更好的理解這種模式。
- pull模式
幾年前,最常見的消費數據模式是pull-based。client端不斷輪詢服務端以獲取數據。這種模式的優點是當client端資源有限時可以更好的控制數據流(停止輪詢),而缺點是當服務端沒有數據時輪詢是對計算資源和網絡資源的浪費。
- push模式
隨著時間推移,處理數據的模式轉變為push-based,生產者不關心消費者的消費能力,直接推送數據。這種模式的缺點是當消費資源低于生產資源時會造成緩沖區溢出從而數據丟失,當丟失率維持在較小的數值時還可以接受,但是當這個比率變大時我們會希望生產者降速以避免大規模數據丟失。
- pull-push模式
響應式編程是一種pull-push混合模式以綜合他們的優點,這種模式下消費者負責請求數據以控制生產者數據流,同時當處理資源不足時也可以選擇阻斷或者丟棄數據,接下來我們會看到一個典型案例。
三、Flow與Stream
響應式編程并不是為了替換傳統編程,其實兩者相互兼容而且可以互相協作完成任務。Java8中引入的StreamAPI通過map,reduce以及其他操作可以完美的處理數據集,而FlowAPI則專注于處理數據的流通,比如對數據的請求,減速,丟棄,阻塞等。同時你可以使用Streams作為數據源(publisher),當必要時阻塞丟棄其中的數據。你也可以在Subscriber中使用Streams以進行數據的歸并操作。更值得一提的是:reactive streams不僅兼容傳統編程方式,而且還支持函數式編程以極大的提高可讀性和可維護性。有一點可能會使我們感到困惑:如果你需要在兩個系統間傳輸數據,同時進行轉形操作,如何使用Flows和Streams來完成?這種情況下,我們使用Java8的Function來做數據轉換,但是如何在Publisher和Subscriber之間使用StreamAPI呢?答案是我們可以在Publisher和Subscriber之間再加一個subscriber,她可以從最初的publisher獲取數據,轉換,然后再作為一個新的publisher,而使最初的subscriber訂閱這個新的publisher,也是Java9中的接口Flow.Processor<T,R>,我們只需要實現這個接口并編寫轉換數據的functions。從技術上講,我們完全可以使用Flows來替換Streams,但任何時候都這么做就顯得過于偏激。比如,我們創建一個Publisher來作為int數組的數據源,然后在Processor中轉換Integer為String,最后創建一個Subscriber來歸并到一個String中。這個時候就完全沒有必要使用Flows,因為這不是在控制兩個模塊或兩個線程間的數據通信,這個時候使用Streams更為合理。
四、例子
Publisher部分的源碼如下所示:
它是一個函數式接口,只包含一個subscribe方法,通過這個方法將數據發布出去。
Subscriber部分的源碼如下所示:
該接口包含了四個方法:
Subscription部分的源碼如下所示:
Processor部分的代碼如下所示:
它是一個空接口,但是它繼承了Publisher和Subscriber,所以它既能發布數據也能訂閱數據。基于這個特性,它可以充當數據轉換的角色,先從數據發布者那接收數據項,然后經過處理后再發布給最終的數據訂閱者。
接下來我們舉個數據發布和數據訂閱的簡單示例,以此了解Java 9 Flow API的使用。先入為主,直接貼出整個示例代碼:
public class FlowApiTest {public static void main(String[] args) throws InterruptedException {// 1. 定義 String 類型的數據發布者,JDK 9自帶的// SubmissionPublisher 實現了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 創建一個訂閱者,用于接收發布者的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通過 Subscription 和發布者保持訂閱關系,并用它來給發布者反饋this.subscription = subscription;// 請求一個數據this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收發布者發布的消息System.out.println("【訂閱者】接收消息 <------ " + item);// 接收后再次請求一個數據this.subscription.request(1);// 如果不想再接收數據,也可以直接調用 cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 過程中出現異常會回調這個方法System.out.println("【訂閱者】數據接收出現異常," + throwable);// 出現異常,取消訂閱,告訴發布者我不再接收數據了// 實際測試發現,只要訂閱者接收消息出現異常,進入了這個回調// 訂閱者就不會再繼續接收消息了this.subscription.cancel();}@Overridepublic void onComplete() {// 當發布者發出的數據都被接收了,// 并且發布者關閉后,會回調這個方法System.out.println("【訂閱者】數據接收完畢");}};// 3. 發布者和訂閱者需要建立關系publisher.subscribe(subscriber);// 4. 發布者開始發布數據for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【發布者】發布消息 ------> " + message);publisher.submit(message);}// 5. 發布結束后,關閉發布者publisher.close();// main線程延遲關閉,不然訂閱者還沒接收完消息,線程就被關閉了Thread.currentThread().join(2000);} }上面使用JDK 自帶的Publisher實現類SubmissionPublisher來發布 String類型的數據,然后用匿名實現類的方式創建了一個Subscriber實現類。接著使用SubmissionPublisher的subscribe方法來為發布者和訂閱者建立關系。建立關系后,發布者就可以發布數據,接收者也開始接收數據。詳細的說明注釋里都寫了,這里就不再贅述代碼的邏輯了。
所謂的背壓(Backpressure)通俗的講就是數據接收者的壓力,傳統模式下,發布者只關心數據的創造與發布,而當數據發布速率遠高于數據接收速率的時候,數據接收者緩沖區將被填滿,無法再接收數據。發布者并不關心這些,依舊不斷地發送數據,所以就造成了IO阻塞。基于響應式模型實現的Flow API可以很好地解決這個問題。在Java 9的Flow API定義中,Subscriber會將Publisher發布的數據緩沖在Subscription中,其長度默認為256:
假如當這個緩沖區都被填滿后,Publisher將會停止發送數據,直到Subscriber接收了數據Subscription有空閑位置的時候,Publisher才會繼續發布數據,而非一味地發個不停。下面用代碼來演示這個情況:
上面代碼中,我們在Subscriber的onNext方法中用下面的代碼模擬延遲,讓數據處理過程維持在2秒左右:
try {TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) {e.printStackTrace(); }然后數據發布量調整到了500,當程序啟動的時候,由于數據發布的速度非常快(普通for循環),所以數據訂閱者的數據緩沖區瞬間被填滿,于是你會看到下面這個情況,只有當數據訂閱者處理了一個數據的時候,數據發布者才會相應地再次發布一個新數據:
Processor的使用也很簡單,其實它就是Publisher和Subscriber的結合體,充當數據處理的角色,通常的做法是用它來接收發布者發布的消息,然后進行相應的處理,再將數據發布出去,供消息訂閱者接收。下面是一個Processor用法的簡單示例:
public class ProcessorTest {static class MyProcessor extends SubmissionPublisher<String> implements Processor<String, String> {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {// 通過 Subscription 和發布者保持訂閱關系,并用它來給發布者反饋this.subscription = subscription;// 請求一個數據this.subscription.request(1);}@Overridepublic void onNext(String item) {// 接收發布者發布的消息System.out.println("【處理器】接收消息 <------ " + item);// 處理器將消息進行轉換String newItem = "【處理器加工后的數據: " + item + "】";this.submit(newItem);// 接收后再次請求一個數據,表示我已經處理完了,你可以再發數據過來了this.subscription.request(1);// 如果不想再接收數據,也可以直接調用cancel,表示不再接收了// this.subscription.cancel();}@Overridepublic void onError(Throwable throwable) {// 過程中出現異常會回調這個方法System.out.println("【處理器】數據接收出現異常," + throwable);// 出現異常,取消訂閱,告訴發布者我不再接收數據了this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【處理器】數據處理完畢");// 處理器處理完數據后關閉this.close();}}public static void main(String[] args) throws InterruptedException {// 1. 定義String類型的數據發布者,JDK 9自帶的// SubmissionPublisher實現了 PublisherSubmissionPublisher<String> publisher = new SubmissionPublisher<>();// 2. 創建處理器,用于接收發布者發布的消息,// 轉換后再發送給訂閱者MyProcessor processor = new MyProcessor();// 3. 發布者和處理器建立訂閱的關系publisher.subscribe(processor);// 4.創建一個訂閱者,用于接收處理器的消息Subscriber<String> subscriber = new Subscriber<>() {private Subscription subscription;@Overridepublic void onSubscribe(Subscription subscription) {this.subscription = subscription;this.subscription.request(1);}@Overridepublic void onNext(String item) {System.out.println("【訂閱者】接收消息 <------ " + item + "");this.subscription.request(1);}@Overridepublic void onError(Throwable throwable) {System.out.println("【訂閱者】數據接收出現異常," + throwable);this.subscription.cancel();}@Overridepublic void onComplete() {System.out.println("【訂閱者】數據接收完畢");}};// 5. 處理器和訂閱者建立訂閱關系processor.subscribe(subscriber);// 6. 發布者開始發布數據for (int i = 0; i < 10; i++) {String message = "hello flow api " + i;System.out.println("【發布者】發布消息 ------> " + message);publisher.submit(message);}// 7. 發布結束后,關閉發布者publisher.close();// main線程延遲關閉,不然訂閱者還沒接收完消息,線程就被關閉了Thread.currentThread().join(2000);} }
參考文章
參考文章
總結
以上是生活随笔為你收集整理的从JDK9的Flow接口说起的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 一道京东的面试题
- 下一篇: 互联网日报 | 新东方二次上市通过港交所