RxJS - Observables, observers 和 operators 简介
RxJS 是響應式編程 (reactive programming) 強大的工具,今天我們將深入介紹 Observables 和 Observers 的內容,以及介紹如何創建自己的操作符 (operators)。
如果你之前已經使用過 RxJS,并希望了解 Observable 及 Operators (操作符) 的內部工作原理,那么這篇文章非常適合你。
什么是 Observable
Observable 就是一個擁有以下特性的函數:
-
它接收一個
observer對象作為參數,該對象中包含next、error和complete方法 -
它返回一個函數,用于在銷毀 Observable 時,執行清理操作
在我們實現的示例中,我們將定義一個簡單的 unsubscribe 函數來實現取消訂閱的功能。然而在 RxJS 中,返回的是 Subcription 對象,該對象中包含一個 unsubscribe 方法。
一個 Observable 對象設置觀察者 (observer),并將它與生產者關聯起來。該生產者可能是 DOM 元素產生的 click 或 input 事件,也可能是更復雜的事件,如 HTTP。
為了更好地理解 Observable,我們來自定義 Observable。首先,我們先來看一個訂閱的例子:
const node = document.querySelector('input[type=text]');const input$ = Rx.Observable.fromEvent(node, 'input');input$.subscribe({next: (event) => console.log(`You just typed ${event.target.value}!`),error: (err) => console.log(`Oops... ${err}`),complete: () => console.log(`Complete!`)
});
該示例中,Rx.Observable.formEvent() 方法接收一個 input 元素和事件名作為參數,然后返回一個 $input Observable 對象。接下來我們使用 subscribe() 方法來定于該 Observable 對象。當觸發 input 事件后,對應的值將會傳遞給 observer 對象。
什么是 Observer
Observer (觀察者) 非常簡單,在上面的示例中,觀察者是一個普通的對象,該對象會作為 subscribe() 方法的參數。此外 subscribe(next, error, complete) 也是一個有效的語法,但在本文中我們將討論對象字面量的形式。
當 Observable 對象產生新值的時候,我們可以通過調用 next() 方法來通知對應的觀察者。若出現異常,則會調用觀察者的 error() 方法。當我們訂閱 Observable 對象后,只要有新的值,都會通知對應的觀察者。但在以下兩種情況下,新的值不會再通知對應的觀察者:
-
已調用 observer 對象的
complete()方法 -
消費者對數據不再感興趣,執行取消訂閱操作
此外在執行最終的 subscribe() 訂閱操作前,我們傳遞的值可以經過一系列的鏈式處理操作。執行對應操作的東西叫操作符,每個操作符執行完后會返回一個新的 Observable 對象,然后繼續我們的處理流程。
什么是 Operator
正如上面所說的,Observable 對象能夠執行鏈式操作,具體如下所示:
const input$ = Rx.Observable.fromEvent(node, 'input').map(event => event.target.value).filter(value => value.length >= 2).subscribe(value => {// use the `value`
});
上面代碼的執行流程如下:
-
假設用戶在輸入框中輸入字符
a -
Observable 對象響應對應的
input事件,然后把值傳遞給 observer -
map()操作符返回一個新的 Observable 對象 -
filter()操作符執行過濾操作,然后又返回一個新的 Observable 對象 -
最后我們通過調用
subscribe()方法,來獲取最終的值
簡而言之,Operator 就是一個函數,它接收一個 Observable 對象,然后返回一個新的 Observable 對象。當我們訂閱新返回的 Observable 對象時,它內部會自動訂閱前一個 Observable 對象。
自定義 Observable
Observable 構造函數
function Observable(subscribe) {this.subscribe = subscribe;
}
每個 subscribe 回調函數被賦值給 this.subscribe 屬性,該回調函數將會被我們或其它 Observable 對象調用。
Observer 示例
在我們深入介紹前,我們先來看一個簡單的示例。之前我們已經創建完 Observable 函數,現在我們可以調用我們的觀察者 (observer),然后傳遞數值 1,然后訂閱它:
const one$ = new Observable((observer) => {observer.next(1);observer.complete();
});one$.subscribe({next: (value) => console.log(value) // 1
});
即我們訂閱我們創建的 Observable 實例,然后通過 subscribe() 方法調用通過構造函數設置的回調函數。
Observable.fromEvent
下面就是我們需要的基礎結構,即在 Observable 對象上需要新增一個靜態方法 fromEvent :
Observable.fromEvent = (element, name) => { };
接下來我們將參考 RxJS 為我們提供的方法來實現自定義的 fromEvent() 方法:
const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');
按照上面的使用方式,我們的 fromEvent() 方法需要接收兩個參數,同時需要返回一個新的 Observable 對象,具體如下:
Observable.fromEvent = (element, name) => {return new Observable((observer) => {});
};
接下來我們來實現事件監聽功能:
Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {}, false);});
};
那么我們的 observer 參數來自哪里? 其實 observer 對象就是包含 next、error 和 complete 方法的對象字面量。
需要注意的是,我們的 observer 參數不會被傳遞,直到
subscribe()方法被調用。這意味著addEventListener()方法不會被調用,除非你訂閱該 Observable 對象。
當我們調用 subscribe() 方法,之前設置的 this.subscribe 回調函數會被調用,對應的參數是我們定義的 observer 對象字面量,接下來將使用新的值,作為 next() 方法的參數,調用該方法。
很好,那接下來我們要做什么?之前版本我們只是設置了監聽,但沒有調用 observer 對象的 next() 方法,接下來讓我們來修復這個問題:
Observable.fromEvent = (element, name) => {return new Observable((observer) => {element.addEventListener(name, (event) => {observer.next(event);}, false);});
};
如你所知,當銷毀 Observables 對象時,需要調用一個函數用來執行清理操作。針對目前的場景,在銷毀時我們需要移除事件監聽:
Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};
我們沒有調用 complete() 方法,因為該 Observable 對象處理的 DOM 相關的事件,在時間維度上它們可能是無終止的。
現在讓我們來驗證一下最終實現的功能:
const node = document.querySelector('input');
const p = document.querySelector('p');function Observable(subscribe) {this.subscribe = subscribe;
}Observable.fromEvent = (element, name) => {return new Observable((observer) => {const callback = (event) => observer.next(event);element.addEventListener(name, callback, false);return () => element.removeEventListener(name, callback, false);});
};const input$ = Observable.fromEvent(node, 'input');const unsubscribe = input$.subscribe({next: (event) => {p.innerHTML = event.target.value;}
});// automatically unsub after 5s
setTimeout(unsubscribe, 5000);
自定義操作符
創建我們自己的操作符應該會更容易一些,現在我們了解 Observable 和 Observable 背后的概念。我們將在 Observable 的原型對象上添加一個方法:
Observable.prototype.map = function (mapFn) { };
該方法的功能與 JavaScript 中的 Array.prototype.map 方法類似:
const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);
所以我們需要應用回調函數并調用它,這用于獲取我們所需要的數據。在我們這樣做之前,我們需要流中的最新值。這里是巧妙的部分,在 map() 操作符中,我們需要訪問 Observable 實例。因為 map 方法在原型上,我們可以通過以下方式訪問 Observable 實例:
Observable.prototype.map = function (mapFn) {const input = this;
};
接下來我們在返回的 Observable 對象中執行 input 對象的訂閱操作:
Observable.prototype.map = function(mapFn) {const input = this;return new Observable((observer) => {return input.subscribe();});
};
我們返回了
input.subscribe()方法執行的結果,因為當我們執行取消訂閱操作時,將會依次調用每個 Observable 對象取消訂閱的方法。
最后我們來完善一下 map 操作符的內部代碼:
Observable.prototype.map = function (mapFn) {const input = this;return new Observable((observer) => {return input.subscribe({next: (value) => observer.next(mapFn(value)),error: (err) => observer.error(err),complete: () => observer.complete()});});
};
現在我們已經可以執行鏈式操作了:
const input$ = Observable.fromEvent(node, 'input').map(event => event.target.value);input$.subscribe({next: (value) => {p.innerHTML = value;}
});
我有話說
Observable 與 Promise 有什么區別?
Observable(可觀察對象)是基于推送(Push)運行時執行(lazy)的多值集合。
| MagicQ | 單值 | 多值 |
|---|---|---|
| 拉取(Pull) | 函數 | 遍歷器 |
| 推送(Push) | Promise | Observable |
-
Promise
-
返回單個值
-
不可取消的
-
-
Observable
-
隨著時間的推移發出多個值
-
可以取消的
-
支持 map、filter、reduce 等操作符
-
延遲執行,當訂閱的時候才會開始執行
-
什么是 SafeObserver ?
上面的示例中,我們使用一個包含了 next、error、complete 方法的普通 JavaScript 對象來定義觀察者。一個普通的 JavaScript 對象只是一個開始,在 RxJS 5 里面,為開發者提供了一些保障機制,來保證一個更安全的觀察者。以下是一些比較重要的原則:
-
傳入的
Observer對象可以不實現所有規定的方法 (next、error、complete 方法) -
在
complete或者error觸發之后再調用next方法是沒用的 -
調用
unsubscribe方法后,任何方法都不能再被調用了 -
complete和error觸發后,unsubscribe也會自動調用 -
當
next、complete和error出現異常時,unsubscribe也會自動調用以保證資源不會浪費 -
next、complete和error是可選的。按需處理即可,不必全部處理
為了完成上述目標,我們得把傳入的匿名 Observer 對象封裝在一個 SafeObserver 里以提供上述保障。
若想進一步了解詳細信息,請參考 Observable詳解 文章中 "自定義 Observable" 章節的內容。
參考資源
-
rxjs-observables-observers-operators
總結
以上是生活随笔為你收集整理的RxJS - Observables, observers 和 operators 简介的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Timeline 时间线基础用法
- 下一篇: 什么是java常量