响应式编程优点 有效_Reactive(响应式)编程
Reactor 和Rxjava是Reactive Programming范例的一個具體實現,可以概括為:
反應式編程是一種涉及數據流和變化傳播的異步編程范例。這意味著可以通過所采用的編程語言輕松地表達靜態(例如陣列)或動態(例如事件發射器)數據流。
作為反應式編程方向的第一步,Microsoft在.NET生態系統中創建了Reactive Extensions(Rx)庫。然后RxJava在JVM上實現了響應式編程。隨著時間的推移,通過Reactive Streams工作出現了Java的標準化,這一規范定義了JVM上的反應庫的一組接口和交互規則。它的接口已經在父類Flow下集成到Java 9中。
另外Java 8還引入了Stream,它旨在有效地處理數據流(包括原始類型),這些數據流可以在沒有延遲或很少延遲的情況下訪問。它是基于拉的,只能使用一次,缺少與時間相關的操作,并且可以執行并行計算,但無法指定要使用的線程池。但是它還沒有設計用于處理延遲操作,例如I / O操作。其所不支持的特性就是Reactor或RxJava等Reactive API的用武之地。
Reactor 或 Rxjava等反應性API也提供Java 8 Stream等運算符,但它們更適用于任何流序列(不僅僅是集合),并允許定義一個轉換操作的管道,該管道將應用于通過它的數據,這要歸功于方便的流暢API和使用lambdas。它們旨在處理同步或異步操作,并允許您緩沖,合并,連接或對數據應用各種轉換。
首先考慮一下,為什么需要這樣的異步反應式編程庫?現代應用程序可以支持大量并發用戶,即使現代硬件的功能不斷提高,現代軟件的性能仍然是一個關鍵問題。
人們可以通過兩種方式來提高系統的能力:
- 并行化:使用更多線程和更多硬件資源。
- 在現有資源的使用方式上尋求更高的效率。
通常,Java開發人員使用阻塞代碼編寫程序。這種做法很好,直到出現性能瓶頸,此時需要引入額外的線程。但是,資源利用率的這種擴展會很快引入爭用和并發問題。
更糟糕的是,會導致浪費資源。一旦程序涉及一些延遲(特別是I / O,例如數據庫請求或網絡調用),資源就會被浪費,因為線程(或許多線程)現在處于空閑狀態,等待數據。
所以并行化方法不是靈丹妙藥,獲得硬件的全部功能是必要的。
第二種方法,尋求現有資源的更高的使用率,可以解決資源浪費問題。通過編寫異步,非阻塞代碼,您可以使用相同的底層資源將執行切換到另一個活動任務,然后在異步處理完成后返回到當前線程進行繼續處理。
但是如何在JVM上生成異步代碼? Java提供了兩種異步編程模型:
- CallBacks:異步方法沒有返回值,但需要額外的回調參數(lambda或匿名類),在結果可用時調用它們。
- Futures:異步方法立即返回Future 。異步線程計算任務結果,但Future對象包裝對它的訪問。該值不會立即可用,并且可以輪詢對象,直到該值可用。例如,運行Callable 任務的ExecutorService使用Future對象。
但是上面兩種方法都有局限性。首先多個callback難以組合在一起,很快導致代碼難以閱讀以及難以維護(稱為“Callback Hell”):
考慮下面一個例子:在用戶的UI上展示用戶喜歡的top 5個商品的詳細信息,如果不存在的話則調用推薦服務獲取5個;這個功能的實現需要三個服務支持:一個是獲取用戶喜歡的商品的ID的接口(userService.getFavorites),第二個是獲取商品詳情信息接口(favoriteService.getDetails),第三個是推薦商品與商品詳情的服務(suggestionService.getSuggestions),基于callback模式實現上面功能代碼如下:
- 我們的三個服務接口都是基于callback的,當異步任務執行完畢后,如果結果正常則會調用callback的onSuccess方法,如果結果異常則會調用onError方法。
- 代碼1中我們調用了userService.getFavorites接口來獲取用戶userId的推薦商品id列表,如果獲取結果正常則會調用代碼2,如果失敗則會調用代碼7,通知用戶UI錯誤信息。
- 如果正常則會執行代碼3判斷推薦商品id列表是否為空,如果是的話則執行代碼4調用推薦商品與商品詳情的服務(suggestionService.getSuggestions),如果獲取商品詳情失敗則執行代碼7callback的OnError把錯誤信息顯示到用戶UI,否則如果成功則執行代碼5切換線程到UI線程,在獲取的商品詳情列表上施加jdk8 stream運算使用limit獲取5個元素,然后顯示到UI上。
- 代碼3如果判斷用戶推薦商品id列表不為空則執行代碼8,在商品id列表上使用JDK8 stream獲取流,然后使用limit獲取5個元素,然后執行代碼9調用favoriteService.getDetails服務獲取具體商品的詳情,這里多個id獲取詳情是并發進行的,當獲取到詳情成功后會執行代碼10在UI線程上繪制出商品詳情信息,如果失敗則執行代碼11顯示錯誤。
如上為了實現該功能,我們寫了很多代碼,使用了大量callback,這些代碼比較晦澀難懂,并且存在代碼重復,下面我們使用Reactor來實現等價的功能:
- 碼1調用getFavorites服務獲取userId對應的商品列表,該方法會馬上返回一個流對象,然后代碼2在流上施加flatMap運算把每個商品id轉換為商品Id對應的商品詳情信息(通過調用服務favoriteService::getDetails),由于方法getDetails是異步的,所以flatmap實際上實現了同步轉異步,然后把所有商品詳情信息組成新的流返回。
- 代碼3判斷如果返回的流中沒有元素則調用suggestionService.getSuggestions()服務獲取推薦的商品詳情列表,代碼4則從代碼2或者代碼3返回的流中獲取5個元素(5個商品詳細信息),然后執行代碼5publishOn把當前線程切換到UI調度器來執行,代碼6則通過subscribe方法激活整個流處理鏈,然后在UI線程上繪制商品詳情列表或者顯示錯誤。
如上代碼可知基于reactor編寫的代碼邏輯屬于聲明式編程,比較通俗易懂,代碼量也比較少,不含有重復的代碼。
future相比callback要好一些,但盡管CompletableFuture在Java 8上進行了改進,但它們仍然表現不佳。一起編排多個future是可行但是不容易的,它們不支持延遲計算(比如rxjava中的defer操作)和高級錯誤處理,例如下面例子。考慮另外一個例子:首先我們獲取一個id列表,然后根據id分別獲取對應的name和統計數據,然后組合每個id對應的name和統計數據為一個新的數據,最后輸出所有組合對的值,下面我們使用CompletableFuture來實現這個功能,以便保證整個過程是異步的,并且每個id對應的處理是并發的:
- 如上代碼1我們調用ifhIds方法異步返回了一個CompletableFuture對象,其內部保存了id列表
- 代碼2調用ids的thenComposeAsync方法返回一個新的CompletableFuture對象,新CompletableFuture對象的數據是代碼2中的lambda表達式執行結果,表達式內代碼3獲取id列表的流對象,然后使用map操作把id元素轉換為name與統計信息拼接的字符串,這里是通過代碼3.1根據id獲取name對應的CompletableFuture對象,代碼3.2獲取統計信息對應的CompletableFuture,然后使用代碼3.3把兩個CompletableFuture對象進行合并做到的。
- 代碼3會返回一個流對象,其中元素是所有id對應的name與統計信息組合后的結果,然后代碼4把流中元素收集保存到了combinationList列表里面。代碼5把列表轉換為了數組,這是因為代碼2的allOf操作符的參數必須為數組。
- 代碼6把combinationList列表中的所有CompletableFuture對象轉換為了一個allDone(等所有CompletableFuture對象的任務執行完畢),到這里我們調用allDone的get()方法就可以等待所有異步處理執行完畢,但是我們目的是想獲取到所有異步任務的執行結果,所以代碼7在allDone上施加了thenApply運算,意在等所有任務處理完畢后調用所有CompletableFuture的join方法獲取每個任務的執行結果,然后收集為列表后返回一個新的CompletableFuture對象,然后代碼8在新的CompletableFuture上調用join方法獲取所有執行結果列表。
Reactor本身提供了更多的開箱即用的操作符,使用Reactor來實現上面功能代碼如下:
- 如上代碼1我們調用ifhIds方法異步返回了一個Flux對象,其內部保存了id列表
- 代碼2調用ids的flatMap方法對其中元素進行轉換,代碼2.1根據id獲取name信息(返回流對象Mono),代碼2.2 根據id獲取統計信息(返回流對象Mono),代碼3
- 結合兩個流為新的流元素。
- 代碼3調用新流的collectList方法把所有的流對象轉換為列表,然后返回一個新的Mono流對象。
- 代碼4 則調用新的Mono流對象的block方法阻塞獲取所有執行結果。
如上代碼使用reactor方式編寫的代碼相比使用CompletableFuture實現相同功能來說,更簡潔,更通俗易懂。
Callback和Future的這些弊病是相似的,而響應式編程正是使用發布者 - 訂閱者方式來解決這些問題的。
諸如Reactor之類的反應庫旨在解決JVM上“經典”異步方法的這些缺點,同時還關注一些其他方面:
- 可組合性和可讀性
- 數據作為一個用豐富的運算符詞匯表操縱的流程
- 在您訂閱之前沒有任何事情發生
- 背壓或消費者向生產者發出信號反饋發出信號過快的能力
- 高級但高價值的抽象,與并發無關
可組合性,指的是編排多個異步任務的能力,使用先前任務的結果作為后續任務的輸入或以fork-join方式執行多個任務。
編排任務的能力與代碼的可讀性和可維護性緊密相關。隨著異步過程層數量和復雜性的增加,能夠編寫和讀取代碼變得越來越困難。正如我們所看到的,callback模型很簡單,但其主要缺點之一是,對于復雜的處理,您需要從回調執行回調,本身嵌套在另一個回調中,依此類推。那個混亂被稱為Callback Hell,正如你可以猜到的(或者從經驗中得知),這樣的代碼很難回歸并推理。
Reactor提供了豐富的組合選項,其中代碼反映了抽象過程的組織,并且所有內容通常都保持在同一級別(嵌套最小化)。
您可以將響應式應用程序處理的數據視為在裝配線中移動。Reactor既是傳送帶又是工作站。原材料從源(原始發布者)注入,最終作為成品準備推送給消費者(或訂閱者)。
原材料可以經歷各種轉換和其他中間步驟,或者是將中間元素聚集在一起形成較大裝配線的一部分。如果在裝配線中某一點出現堵塞,受影響的工作站可向上游發出信號以限制原材料的向下流動。
在Reactor中,運算符是我們裝配線中類比的工作站。每個運算符都會向發布者添加行為,并將上一步的發布者包裝到新實例中。因此鏈接整個鏈,使得數據源自第一個發布者并沿著鏈向下移動,由每個鏈點進行轉換。最終,訂閱者訂閱該流,然后激活完成該過程。需要注意,在訂閱者訂閱發布者之前沒有任何事情發生。
雖然Reactive Streams規范根本沒有指定運算符,但Reactor或者rxjava等反應庫的最佳附加值之一是它們提供的豐富的運算符。這些涉及很多方面,從簡單的轉換和過濾到復雜的編排和錯誤處理。
在Reactor中,當您編寫Publisher鏈時,默認情況下數據不會啟動。相反,您可以創建異步過程的抽象描述(這可以幫助重用和組合)。
通過訂閱操作,您可以將發布者綁定到訂閱者,從而觸發整個鏈中的數據流。這是通過訂閱者發出的單個請求信號在內部實現的,該請求信號在上游傳播,一直返回到源發布者。
上游傳播信號也用于實現背壓,我們在裝配線中將其描述為當工作站比上游工作站處理速度慢時向上游線路發送的反饋信號。
Reactive Streams規范定義的真正機制非常接近于下面的類比:訂閱者可以在無限制模式下工作,讓生產者以最快的速度推送所有數據,或者它可以使用請求機制向生產者發送信號通知它準備處理最多n個元素。
施加到源的中間操作符也可以在途中更改請求。想象一個緩沖區運算符,它以10個批次對元素進行分組。如果訂閱者請求1個緩沖區,則源可以生成10個元素。一些生產者還實施預取策略,這避免了往返的request(1),并且如果在請求之前生成元素并不太昂貴,則預取是很有益的。
這將推模型轉換為推拉式混合模式,如果上游生產了很多元素,則下游可以從上游拉出n個元素。但是如果元素沒有準備好,就會在上游生產出元素后推數據到下游。
更多技術分享,歡迎關注微信公眾號:技術原始積累
總結
以上是生活随笔為你收集整理的响应式编程优点 有效_Reactive(响应式)编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python控制autocad_利用py
- 下一篇: python js 效率_巧用 db.s