javascript
使用 RxJS 实现 JavaScript 的 Reactive 编程
簡介
作為有經驗的JavaScript開發者,我們會在代碼中采用一定程度的異步代碼。我們不斷地處理用戶的輸入請求,也從遠程獲取數據,或者同時運行耗時的計算任務,所有這些都不能讓瀏覽器崩潰。可以說,這些都不是瑣碎的任務,它是確切的需求,我們學著去避開同步計算,讓模型的時間和延時成為問題的關鍵。對于簡單的應用程序,直接使用JavaScript的主事件系統,甚至使用jQuery庫幫助也很常見。然而,還沒有適當的模式來擴展的簡單代碼,解決這些異步問題,滿足更豐富的應用特性,滿足現代web用戶的需求,這些仍然是困難的。我們越來越發現我們的應用代碼正變得復雜,難以維護,難以測試。問題的本質是異步計算本身就是難以管理的,而RxJS可以解決這個問題。
RxJS解決的問題
任何應用最重要的一個目標之一就是在所有時刻保持響應。這意味著對于一個應用來說當它在處理用戶輸入或者憑借AJAX從服務器接受一些額外的數據時停止是一件不可接受的事情。通常來說,主要的問題是IO(輸入/輸出)運行(從磁盤或者網絡讀取)比CPU執行指令慢太多。這同時實用于客戶端和服務器端。讓我門來看看客戶端。在JavaScript中,解決問題的方案始終是充分利用瀏覽器的多重連接并且用回調函數來大量產生一個獨立的用來照顧一些長期運行的處理。這是一種反轉控制控制形式,因為程序的控制不是被你操縱的(因為你不能預知某一處理什么時候會完成),而是在運行時間的責任下交還給你的。雖然對于小應用程序非常有用,但回調的使用使內容豐富的大型應用變得凌亂,它需要同時處理的數據來自用戶以及遠程HTTP的調用。我們都有過這樣的經歷:一旦你需要多塊數據時你就陷入了流行的”末日金字塔“或者回調地獄。
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | makeHttpCall( '/items' ,? ??? items?=>?{ ?????? for? (itemId?of?items)?{ ????????? makeHttpCall(`/items/${itemId}/info`, ??????????? itemInfo?=>?{???????? ?????????????? makeHttpCall(`/items/${itemInfo.pic}`, ???????????????? img?=>?{ ???????????????????? showImg(img); ?????????????? });??? ??????????? }); ?????? } }); beginUiRendering(); |
這段代碼有很多的問題。 其中之一就是風格。當你在這些嵌套的回調函數中添加越來越多的邏輯,這段代碼就會變得很復雜很難理解。因為循環還產生了一個更加細微的問題。for循環是同步的控制流語句,這并不能很好的配合異步調用,因為會有延遲,這可能會產生很奇怪的bug。
這個問題一直都是JavaScript開發者的大麻煩,所以JavaScript在SE6中引入了Promises。?Promises幫助開發者解決這些類似的問題,它提供了一個非常流暢的接口來捕獲時間并且提供一個回調方法then()。上面的代碼就變成了:
?
| 1 2 3 4 | makeHttpCall( '/items' ) ???? .then(itemId?=>?makeHttpCall(`/items/${itemId}/info`)) ???? .then(itemInfo?=>?makeHttpCall(`/items/${itemInfo}.pic}`)) ???? .then(showImg); |
這毫無疑問是一個進步。理解這段代碼的難度顯著下降。然而,盡管Promises在處理這種單值(或單個錯誤)時非常高效,它有也一些局限性。Promimses在處理用戶連續輸入的數據流時效率怎么樣呢? 這時Promises處理起來也并不高效,因為它沒有事件的刪除、分配、重試等等的語法定義。接下來開始講解RxJS。
RxJS 初探
RxJS是一個解決異步問題的JS開發庫.它起源于?Reactive Extensions?項目,它帶來了觀察者模式和函數式編程的相結合的最佳實踐。 觀察者模式是一個被實踐證明的模式,基于生產者(事件的創建者)和消費者(事件的監聽者)的邏輯分離關系.
況且函數式編程方式的引入,如說明性編程,不可變數據結構,鏈式方法調用會使你極大的簡化代碼量。(和回調代碼方式說再見吧)。
若想仔細了解函數式編程,請訪問這里(the Functional?Programming in JavaScript RefCard?)。
如果你熟悉了函數式編程,請把RxJS理解為異步化的Underscore.js。
RxJS 引入了一個重要的數據類型——流(stream)。
理解流( Streams)流(Streams)無非是隨時間流逝的一系列事件。流(Streams)可以用來處理任何類型的事件,如:鼠標點擊,鍵盤按下,網絡位數據,等等。你可以把流作為變量,它有能力從數據角度對發生的改變做出反應。 變量和流都是動態的,但表現有些不同;為了理解它,讓我們看一個簡單的例子。考慮以下簡單的算術運算: ?
| 無若 |
盡管變量a變為了10,但這是一種通過設計方式保證所依賴的變量不變。這就是最大的不同。事件引發的改變總是從事件源(生產者) ?
這樣你看到了,流的方式重新定義的變量值的動態行為。(作為習慣,我喜歡使用$符號在流變量命名里) |
可觀察數據類型
或許RxJS庫最重要的部分是可觀察數據類型的定義。這種類型被用于包裝一個數據片段(按鈕事件,鍵盤事件,鼠標事件,數字,字符串或者隊列),這樣它就有了流式數據類型的優點。最簡單的觀察對象是這種單變量形式,例如:
?
| 1 | var? streamA$?=?Rx.Observable.of(2); |
我們重新使用上面的例子,這次是真正的RxJS語法。這回使用了新的API,我要詳細的講一下:
?
| 1 2 3 4 5 6 | const?streamA$?=?Rx.Observable.of(2); const?streamB$?=?Rx.Observable.of(4); const?streamC$?=?Rx.Observable.concat(streamA$,?streamB$) ?? .reduce((x,?y)?=>?x??+?y); streamC$.subscribe(console.log);? //prints?6 |
運行這個例子輸出值為6。不像之前的偽代碼,在變量被定義后實際上不能對流對象重新賦值。如何必須那樣做的話就要重新創建一個新的流變量,因為流變量是不可變的數據類型。既然是不可變的,通常我們可以安全的使用ES6規范里的不可變關鍵字const使代碼更清晰明確。
為了給streamA$推一系列新值,你必須改變streamA$定義的方式:
?
| 1 2 3 | const?streamA$?=?Rx.Observable.of(2,?10) ... streamC$.subscribe(console.log);? //prints?16 |
現在訂閱streamC$將會得到值16。就像我之前提到的,流只是一個在時間軸上的事件傳輸序列。以下是可視化圖例。
創建可觀察序列對象
很多不同的方法都可以創建可觀察序列對象。這里是一些普通使用的例子:
| of(arg) | 把參數轉換成可觀察序列對象 |
| from(iterable) | 把可迭代的隊列參數轉換成可觀察序列對象 |
| fromPromise(promise) | 把promise對象參數轉換成可觀察序列對象 |
| fromEvent(element, eventName) | 通過增加一個事件監聽器用于監聽匹配的Dom元素,jQuery元素,Zepto元素,Angular元素,Ember.js元素或者EventEmitter等, 來創建可觀察序列對象 |
流式編程的另一個不同點是觸發機制。可觀察序列類型對象是后觸發的(lazy data types),就是說當有訂閱者訂閱的時候什么也不執行(這種方式不會有事件發出來)。它的訂閱機制是被觀察者(Observer)觸發的。
觀察者(Observer)
觀察者代表模型的消費者一端。它負責被可觀察序列對象發送過來的值進行處理和反饋。觀察者的API簡單,基于迭代者模式它定義了
next方法。當事件執行結果推向到可觀察對象的時候,這個方法就會被調用。之前streamC$.subscribe(console.log) 這種簡寫的方式,其實就是背后創建了觀察者對象(Observer)。創建的過程如下:
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | const?observer?=?Rx.Observer.create( ???? function? next(val)?{?? ???????? console.log(val); ???? }, ???? function? error(err)?{? ???????? ;? //?事件異常情況執行 ???? }, ???? function? complete()?{ ???????? ;? // 事件完成后執行 ???? } ); |
觀察者也定義了處理異常的API,意味著會發出執行過程中的異常信號通知。所有的觀察者對象里的方法都是可選擇的,其實你只需要
訂閱一下就可以(調用subscribe方法)。最普遍的方式是需要提供一下對應的業務方法映射到next方法里。這個方法里需要一個具體的業務邏輯,例如寫文件,屏幕打印日志,追加到DOM里,不管怎么說需要你來完成的。
訂閱
只要訂閱一個Observable對象就會返回一個訂閱對象,另外當完成后你可以使用unsubscribe方法釋放掉對象流。這種釋放機制真的很優美,它解決了原生JS在事件處理完成后正確釋放資源的缺點。原生JS在這一塊之前總是會出問題。
為了說明這一點,我創建一個Observable對象來監聽所有的點擊事件:
很明顯這是一個無限觸發的點擊事件流(事件完成方法永遠不會調用)。如果我要停止監聽事件,我只需要簡單的調用unsubscribe方法。這個方法也會清理和釋放事件的句柄資源或者臨時對象資源。
現在你知道了如何創建和銷毀事件流,那么再看看怎么使用它來解決具體的問題吧。在我的模型里采用數字來說明這些API的使用,當然你可以把他們應用到任何的業務邏輯。
流式序列化
RxJS框架的核心思想是提供一個統一的數據處理的編程模型,不考慮數據類型,以及是否同步異步(遠程HTTP調用)等因素。RxJS使用簡單又熟悉的API。通過函數式方法擴展了原生JavaScript的數組集合(被認為是擴展的組數),這些方法是map,filter和reduce等。
| ? ? ? ? ? ? ? ? map(fn) | ? ? ? ? ? ? ? ? 重新構建observable序列對象為新的形式 |
| ? ? ? ? ? ? ? ? filter(predicate) | ? ? ? ? ? ? ? ? 通過給定的predicate函數,過濾掉匹配的observable序列對象 |
| ? ? ? ? ? ? ? ? reduce(accumulator, [seed]) | ? ? ? ? ? ? ? ? 通過收集器函數accumulator返回一個單一結果。seed是收集器的初始化數據。 |
使用從數字1到5的數組,我將要過濾掉奇數,計算它們的平方和總和。按照傳統的方式至少會使用到循環和條件判斷。使用函數式方法就會這樣:
其實這是很小的改變,就是通過Observable對象實例方法來進行數據處理。就像數組一樣,被處理的對象來自于前一個方法的返回結果。目的在當前輸入對象的范疇內執行特定的操作方法(map,filter,reduce),處理所需的業務邏輯。
下面的圖展示了這個場景下的執行過程:
數組是一種可預測的流,因為執行期間所用數據都放在內存中。無論數據是以怎樣的形式,假如這些數據是通過HTTP請求方式獲得后被執行(或者封裝成Promise對象),同樣的代碼仍然有效:
當然,我可以創建獨立的無副作用的函數,并注入到可觀測的序列中。這可以讓你的代碼看起來更直觀:
這是RxJS的美麗之處:單個編程模型可以支持所有的情況。此外,可觀測能讓你序列化你的串行操作,使它們在一起運作良好,并讓抽象化延遲遠離你的代碼。還需要注意的是,這種寫代碼的方式消除了循環和條件語句的復雜性,這是一種更高層的函數。
處理時間
了解RxJS 如何有效處理時間以及通過基于時間的延遲可以做運算符。這里有一個簡短的列表用于隨著時間推移創建最常見的狀態:
| Rx.Observable.interval(period) | 在每一個時期返回一個可觀測序列產生值 |
| Rx.Observable.timer(dueTime) | 在dueTime運行后產生一個值,然后在每一個時期返回一個可觀測序列 |
這些都是很好的模擬時間事件:
每半秒鐘間隔(500)將排放值。因為這是一個無限流,我只有前5個,將可觀測到一個有限的流,發送完成一個信號。
處理用戶輸入
你也可以使用Observable對象與DOM事件進行交互。使用Rx.Observable.fromEvent,我能監聽到任何DOM事件信息。這有一個小例子:
在這個事例中,我可以處理點擊事件,并能夠在觀察者(Observer)里執行任何操作。這里的map方法是轉換接收到的點擊事件,從點擊事件里提取底層元素的href屬性。
處理異步調用
處理用戶輸入不只是異步調用的唯一方式。RxJS框架也優雅地整合了ES6 Promise API,這樣就可以獲取遠程數據。假如要從Github上獲取用戶并且提取用戶名。RxJS框架的強大,讓我們處理這些邏輯只用5行代碼就夠了。
這段代碼引入了一對新方法,這里要解釋一下。首先我把Github的用戶列表REST API包裝成Observable對象。在flatMap方法里通過makeHttpCall函數來對URL地址進行請求,去獲得一個Promise化的AJAX返回對象。這時RxJS要等待返回結果以及Promise化處理。一旦完成,Github的響應數據就會在函數里被包裝成數組存放進Observable對象里。接下來就是調用Observable對象的方法來處理數據。最后我構造了一個簡單的函數來從數組集合里提取用戶名屬性。
Map與FlatMap對比
正如前面所說,Observable對象的map方法是對自身對象的數據值進行映射處理,返回是一個包含映射結果的新Observale對象。調用的方法能返回任意類型對象,甚至另外的Observable類型對象。上面的例子是我把帶有URL信息的lambda表達式作為參數輸入,返回結果是把promise類型對象又包裝成Observalbe對象輸出:
映射方法把Observalbe對象里數據結果進行映射產生一個新Observable對象。(在函數式編程里這種方式很普遍)我們要做的就是處理映射關系并把結果存放到一個Observalbe對象里,這有點像給洋蔥剝皮。這正是flatMap方法的優點。上面的方法返回結果是Rx.Observable.fromPromise(...),所以需要處理promise對象。從一般經驗來看,當你從其他類型對象中構建Observalbe對象時,需要使用flatMap方法。下面看一個例子就好理解了:
這段代碼是以0.5秒間隔生成5個連續數字的界面。首先輸出數字1到5,然后2到6,3到7等等。如下圖所示:
釋放可觀察序列
前面提到了,RxJS框架的好處之一是對JavaScript事件進行了統一抽象,這樣可以更好的釋放和銷毀事件。這就是基于觀察者(Observer)里提供了這個功能去執行自己的清理方法。訂閱Observable對象就會獲得一個Subscription實例,通過這個實例就能得到觀察者(Observer)對象實例。
這段代碼創建一個簡單的Observable對象。但這次不是對事件數據源或AJAX數據源進行包裝,而是創建一個自定義的事件,會不間斷的每隔一秒產生數字。因為是自定義事件所以需要通過訂閱返回的函數調用來創建自己的資源釋放程序。RxJS框架通過Subscription.unsubcribe()的調用執行去釋放資源。這個例子中我的清理動作只是時間間隔函數。7秒之后,我釋放了Observable對象所以引起輸出數字暫停而不是無限制的打印數字。
合并流
你似乎認為這些Observalbe對象很重,實際上他們創建和釋放是很容易的。就像變量一樣,它們可以被合并在一起。下面看看如何進行合并操作。
合并多個流
merge方法負責執行合并操作,合成多個Observalble序列變成一個Observalble對象。這個操作方法就是把多個事件流合并成一個,在時間的維度里順序一致。例如在一個HTML組件里有三個按鈕,分別執行以下方式進行計數:上,下和清除。
另一種流的合并方式是可以通過concat()和concatAll()方法進行。
一個流與另一個合并
withLatestFrom 這個操作者是非常有用的,因為它允許您將一個observable序列合并到另一個正在使用選擇器方法的序列中,除非原始的observable序列產生一個元素。為了展示這一點,假設我想在每一秒鐘打印出GitHub的用戶列表。直觀上,我需要將一個基于時間的流與一個HTTP的流合并。
緩存
正如更早之前提到的,流是一種無狀態的數據結構,這就意味著其狀態從未滯留其中而是立即從生產者流向消費者。然而有時重要的是它能夠臨時存儲一些數據,并且還可以再次基礎上做出決定。想到的一個例子是關于跟蹤雙擊一個元素。你是如何在不存儲第一次點擊的情況下監測到第二次點擊?為此,我們可以使用緩存。有多種情況:
緩沖一段時間
你可以暫時保存一定數量的數據到內部數組,一旦滿足計數閾值時,它將作為一個整體得到釋放.
?
| 1 2 3 4 5 | Rx.Observable.range(1,?9)?.bufferCount(3) .subscribe(console.log); //->?prints?[1,?2,?3] //??????[4,?5,?6] //??????[7,?8,?9] |
基于時間的緩沖數據
你也可以為一個預定義的時間緩沖. ?我將創建一個簡單的函數來模擬每秒鐘從一組可用的郵箱地址發送郵件來顯示. 如果我每秒鐘發一封郵件, 和緩沖, 說, 五秒鐘,一旦緩沖時間運行緩沖將發出一組郵件:
錯誤處理
上面的例子中,我們已經學習了在處理DOM事件或者獲取遠程數據時,幾種關于 streams (流)?的異步操作方式。但是,這些例子里面并沒有告訴我們,如果處理 stream 的過程中出現了錯誤或者異常,我們應該怎樣處理。DOM事件handler里面比較簡單,因為他們實際上不拋出錯誤。但是AJAX不在此列。如果你不是處理簡單的數組,那么 stream 有很大的不可預測性,你必須考慮到會有錯誤或者異常發生。
對于錯誤,如果你傳遞了自己的 observer(觀察器),你需要在內部使用try...catch 并調用 observer.onError(error)。這將允許你捕捉和處理相關錯誤,并最終 dispose。
當然,你也可以使用 .onErrorResumeNext。
捕捉異常(Catch )
所幸你可以像以前一樣使用 catch 語句 (現在是一個操作符 operator) 。為了演示,我將在stream到達值5的時候手動創建一個錯誤。
當條件滿足時,異常將被拋出并一路傳播到 stream 事先注冊的 Observer。你可能想優雅地捕獲異常并顯示一個比較友好的消息。
catch操作符允許你處理指定的錯誤并防止該錯誤傳播到其他已注冊的下游observer。這個操作符可以搭配另一個?Observable 將相應的錯誤傳遞出去,所以你可以通過這個方式在出錯時建議某些默認值。注意:此時關聯到那個 observer 的錯誤處理函數將不會被觸發。
現在,如果我們想要標記一個不可恢復的情況,你可以?catch?且 throw?錯誤。在?catch 塊內,這段代碼將會導致異常解除。我注意到,其拋出異常時的其副作用將會波及其他期待處理的地方。在真正關鍵的部分這應該少用。
另一個選擇是嘗試重試。
重試
在可監控的范圍內,你可以重試上一行操作確定的次數,之前的失敗將會被解除。
最后,作為JavaScript 開發者,我們整天都跟事件或者異步計算打交道。我們在實現復雜UI或者復雜狀態機時,為了保證在失敗時能夠保持響應,這些代碼將變得越來越復雜。RxJS 真正實現了?Reactive Manifesto?的兩個最重要的基本原則,也就是Responsive (響應式)?和?Resilient(自適應性).
此外,RxJS 將這些計算指令提升為語言的第一等級特性,由此組成了JavaScript里面最先進的事件處理系統。這一切組成了一個統一的計算模型來處理這些異步計算指令。該模型包含了可讀性高,易于組合的API,并剝離了一些零碎的細節(如延遲和等待時間等)。
更多參考資料
http://rxmarbles.com/
http://reactivex.io/
http://callbackhell.com/
http://xgrommx.github.io/rx-book/
https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines
原文地址:http://www.oschina.net/translate/rxjs-streams
.NET社區新聞,深度好文,微信中搜索dotNET跨平臺或掃描二維碼關注
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的使用 RxJS 实现 JavaScript 的 Reactive 编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: .NET Core 1.1 Previe
- 下一篇: 基于Bootstrap 3.x的免费高级