RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject
本文轉載自公眾號" zcx的工作室", 作者: zcx本人
原文鏈接:https://mp.weixin.qq.com/s/i14brW_Ok8JYGoBIcfhs5Q
原文標題:RxJS 源碼解析(二): Multicast Observable
上一篇,我們分析了 Oberservable 和 Subscription 的具體實現方法。這一篇,將會了解一系列不同的 Muticasted Observable(多播觀察源)。這些 Observable 在 RxJS 中主要是以 Subject 命名,它們有以下幾種不同的實現:
-
Subject
-
AnonymousSubject
-
BehaviorSubject
-
ReplaySubject
-
AsyncSubject
所謂 Muticasted Observable,就是這個 Observable 可以持續的發送數據給到訂閱它的訂閱者們。
Subject
Subject 是最基礎的 Muticasted Observable,訂閱者對其進行訂閱后,將會拿到 Subject 之后發送的數據。但是,如果訂閱者在數據發送后再訂閱,那么它將永遠都拿不到這條數據。用一下例子簡單說明一下:
const subject = new Subject<number>(); // 訂閱之前調用是不會打印 subject.next(1);// 訂閱數據 cost subscription = subject.subscribe((value) => {console.log('訂閱數據A:' + value); });// 訂閱后調用會打印數據。 subject.next(2);// 打印結果 // 訂閱數據A:2Subject 的實現通過將觀察員們放入數組中,如果有事件即將到來,通知當前所有已經在位的觀察員們。
class Subject<T> extends Observable<T> { observers: Observer<T>[] = [];// 省略了一些內容next(value?: T) {if (!this.isStopped) {...const { observers } = this;const len = observers.length;const copy = observers.slice();for (let i = 0; i < len; i++) {copy[i].next(value);}}}// error 類似于 nexterror(err: any) {...this.hasError = true;this.thrownError = err;this.isStopped = true;const { observers } = this;const len = observers.length;const copy = observers.slice();for (let i = 0; i < len; i++) {copy[i].error(err);}this.observers.length = 0;}// complete 類似于 nextcomplete() {...this.isStopped = true;const { observers } = this;const len = observers.length;const copy = observers.slice();for (let i = 0; i < len; i++) {copy[i].complete();}this.observers.length = 0;} }通過重寫了 _subscribe ,將觀察員在訂閱時保存到 observers 數組中。
_subscribe(subscriber: Subscriber<T>): Subscription {if (this.hasError) {subscriber.error(this.thrownError);return Subscription.EMPTY;} else if (this.isStopped) {subscriber.complete();return Subscription.EMPTY;} else {// 如果都沒有問題,在這里將觀察員保存到 observers 數組。this.observers.push(subscriber);// 提供一個指向于當前觀察者的訂閱對象。return new SubjectSubscription(this, subscriber)} }Subject 通過創建一個新的指向于它的 observable,完成和 Observable 之間的轉換。
asObservable(): Observable<T> {const observable = new Observable<T>();(<any>observable).source = this;return observable; }AnonymousSubject
AnonymousSubject 是一個 Subject 的 wrapper,它擁有一個 名為 destination 的 Observer 成員。Observer 提供了三個方法接口,分別是 next,error 和 complete。
export interface Observer<T> {closed?: boolean;next: (value: T) => void;error: (err: any) => void;complete: () => void; }AnonymousSubject 通過重載 Subject 的 next,error,complete 將調用轉發到 destination 。由于其重載這三個重要的方法,其本身并不具備 Subject 所提供的功能。
AnonymousSubject 重載這些方法的主要作用是為了將調用轉發到 destination ,也就是提供了一個
export class AnonymousSubject<T> extends Subject<T> {constructor(protected destination?: Observer<T>, source?: Observable<T>) {super();this.source = source;}next(value: T) {const { destination } = this;if (destination && destination.next) {destination.next(value);}}error(err: any) {const { destination } = this;if (destination && destination.error) {this.destination.error(err);}}complete() {const { destination } = this;if (destination && destination.complete) {this.destination.complete();}} }它也重載 _subscribe,那么也就不具備 Subject 的保存訂閱者的功能了。
_subscribe(subscriber: Subscriber<T>): Subscription {const { source } = this;if (source) {return this.source.subscribe(subscriber);} else {return Subscription.EMPTY;} }通過閱讀源碼使用到 AnonymousSubject 的地方,我認為 AnonymousSubject 主要的功能還是為 Subject 的 lift 方法提供一個封裝,lift 需要返回的是一個符合當前類的同構對象。
export class Subject<T> extends Observable<T> {lift<R>(operator: Operator<T, R>): Observable<R> {const subject = new AnonymousSubject(this, this);subject.operator = <any>operator;return <any>subject;} }如果直接重新構造一個 Subject 雖然符合同構,但是存儲了過多的冗余數據,比如,訂閱的時候就會重復把訂閱者添加到 observers 中;如果直接使用 Observable ,那么又不符合同構,因為 Observable 并不具備 next,error 和 complete 等功能,那么這就是一種比較穩妥的做法,通過重載復寫 Subject 的一些方法,使得其既具備同構,也不會重復保存冗余數據。
BehaviorSubject
BehaviorSubject 為 Subject 提供了數據持久化(相對于 Subject 本身)功能,它本身存儲了已經到來的數據,可以看看以下例子。
const subject = new BehaviorSubject<number>(0);// 初始化后直接訂閱 const subscriptionA = subject.subscribe((value) => {console.log('訂閱數據A:' + value); });// 訂閱之前調用是不會打印 subject.next(1);const subscriptionB = subject.subscribe((value) => {console.log('訂閱數據B:' + value); });// 訂閱后調用會打印數據。 subject.next(2);// 打印結果 // 訂閱數據A:0 // 訂閱數據A:1 // 訂閱數據B:1 // 訂閱數據A:2 //BehaviorSubject 擁有一個 _value 成員,每次調用 next 發送數據的時候,BehaviorSubject 都會將數據保存到 _value 中。
export class BehaviorSubject<T> extends Subject<T> {constructor(private _value: T) {super();}get value(): T {return this.getValue();}getValue(): T {if (this.hasError) {throw this.thrownError;} else if (this.closed) {throw new ObjectUnsubscribedError();} else {return this._value;}} }調用 next 的時候,會把傳入的 value 保存起來,并交由 Subject 的 next 來處理。
next(value: T): void {super.next(this._value = value); }當 BehaviorSubject 被訂閱的時候,也會把當前存儲的數據發送給訂閱者,通過重寫 _subscribe 實現這個功能。
_subscribe(subscriber: Subscriber<T>): Subscription {const subscription = super._subscribe(subscriber);// 只要訂閱器沒有關閉,那么就將當前存儲的數據發送給訂閱者。if (subscription && !(<SubscriptionLike>subscription).closed) {subscriber.next(this._value);}return subscription; }AsyncSubject
AsyncSubject 并沒有提供相應的異步操作,而是把控制最終數據到來的權力交給調用者,訂閱者只會接收到 AsyncSubject 最終的數據。正如官方例子所展示的的,當它單獨調用 next 的時候,訂閱者并不會接收到數據,而只有當它調用 complete 的時候,訂閱者才會接收到最終到來的消息。以下例子可以說明 AsyncSubject 的運作方式。
const subject = new AsyncSubject<number>();const subscriptionA = subject.subscribe((value) => {console.log('訂閱數據A:' + value); });// 此處不會觸發訂閱 subject.next(1); subject.next(2); subject.next(3); subject.next(4);const subscriptionB = subject.subscribe((value) => {console.log('訂閱數據B:' + value); });// 同樣,這里不會觸發訂閱 subject.next(5); // 但是完成方法會觸發訂閱 subject.complete();// 打印結果 // 訂閱數據A:5 // 訂閱數據B:5AsyncSubject 通過保留發送狀態和完成狀態,來達到以上目的。
export class AsyncSubject<T> extends Subject<T> {private value: T = null;private hasNext: boolean = false;private hasCompleted: boolean = false; }AsyncSubject 的 next 不會調用 Subject 的 next,而是保存未完成狀態下最新到來的數據。
next(value: T): void {if (!this.hasCompleted) {this.value = value;this.hasNext = true;} }那么 Subject 的 next 會在 AsyncSubject 的 complete 方法中調用。
complete(): void {this.hasCompleted = true;if (this.hasNext) {super.next(this.value);}super.complete(); }ReplaySubject
ReplaySubject 的作用是在給定的時間內,發送所有的已經收到的緩沖區數據,當時間過期后,將銷毀之前已經收到的數據,重新收集即將到來的數據。所以在構造的時候,需要給定兩個值,一個是緩沖區的大小(bufferSize),一個是給定緩沖區存活的窗口時間(windowTime),需要注意的是 ReplaySubject 所使用的緩沖區的策略是 FIFO。
下面舉出兩個例子,可以先感受一下 ReplaySubject 的行為。第一個如下:
const subject = new ReplaySubject<string>(3);const subscriptionA = subject.subscribe((value) => {console.log('訂閱數據A:' + value); });subject.next(1); subject.next(2); subject.next(3); subject.next(4);const subscriptionB = subject.subscribe((value) => {console.log('訂閱數據B:' + value); });// 打印結果: // 訂閱數據A: 1 // 訂閱數據A: 2 // 訂閱數據A: 3 // 訂閱數據A: 4 // 訂閱數據B:2 // 訂閱數據B:3 // 訂閱數據B:4下面是第二個例子,這個 ReplaySubject 帶有一個窗口時間。
const subject = new ReplaySubject<string>(10, 1000);const subscriptionA = subject.subscribe((value) => {console.log('訂閱數據A:' + value); });subject.next('number'); subject.next('string'); subject.next('object'); subject.next('boolean');setTimeout(() => {subject.next('undefined');const subscriptionB = subject.subscribe((value) => {console.log('訂閱數據B:' + value);}); }, 2000);// 打印結果 // 訂閱數據A:number // 訂閱數據A:string // 訂閱數據A:object // 訂閱數據A:boolean // 訂閱數據A:undefined // 訂閱數據B:undefined其實 ReplaySubject 跟 BehaviorSubject 很類似,但是不同的點在于,ReplaySubject 多了緩沖區和窗口時間,也算是擴展了 BehaviorSubject 的使用場景。
在源碼中,還有第三個參數,那就是調度器(scheduler),一般來說,使用默認調度器已經可以覆蓋大部分需求,關于調度器的部分會在之后講到。
export class ReplaySubject<T> extends Subject<T> {private _events: (ReplayEvent<T> | T)[] = [];private _bufferSize: number;private _windowTime: number;private _infiniteTimeWindow: boolean = false;constructor(bufferSize: number = Number.POSITIVE_INFINITY,windowTime: number = Number.POSITIVE_INFINITY,private scheduler?: SchedulerLike) {super();this._bufferSize = bufferSize < 1 ? 1 : bufferSize;this._windowTime = windowTime < 1 ? 1 : windowTime;if (windowTime === Number.POSITIVE_INFINITY) {this._infiniteTimeWindow = true;this.next = this.nextInfiniteTimeWindow;} else {this.next = this.nextTimeWindow;}} }上面的源碼中,ReplaySubject 在構造時會根據不同的窗口時間來設置 next 具體的運行內容,主要以下兩種方式。
-
nextInfiniteTimeWindow
-
nextTimeWindow
nextInfiniteTimeWindow
如果窗口時間是無限的,那么就意味著緩沖區數據的約束條件只會是將來的數據。
private nextInfiniteTimeWindow(value: T): void {const _events = this._events;_events.push(value);// 根據數據長度和緩沖區大小,決定哪些數據留在緩沖區。if (_events.length > this._bufferSize) {_events.shift();}super.next(value); }nextTimeWindow
如果窗口時間是有限的,那么緩沖區的約束條件就由兩條組成:窗口時間和將來的數據。這時,緩沖區數據就由 ReplayEvent 組成。ReplayEvent 保存了到來的數據的內容和其當前的時間戳。
class ReplayEvent<T> {constructor(readonly public time: number,readonly public value: T ) {} }那么通過 _trimBufferThenGetEvents 對緩沖區數據進行生死判斷后,再把完整的數據交由 Subject 的 next 發送出去。
private nextTimeWindow(value: T): void {this._events.push(new ReplayEvent(this._getNow(), value));this._trimBufferThenGetEvents();super.next(value); }_trimBufferThenGetEvents 這個方法是根據不同的 event 對象中的時間戳與當前的時間戳進行判斷,同時根據緩沖區的大小,從而得到這個對象中的數據是否能夠保留的憑證。
private _trimBufferThenGetEvents(): ReplayEvent<T>[] {const now = this._getNow();const _bufferSize = this._bufferSize;const _windowTime = this._windowTime;const _events = <ReplayEvent<T>[]>this._events;const eventsCount = _events.length;let spliceCount = 0;// 由于緩沖區的是 FIFO,所以時間的排// 序一定是從小到大那么,只需要找到分// 割點,就能決定緩沖數據的最小數據長// 度。while (spliceCount < eventsCount) {if ((now - _events[spliceCount].time) < _windowTime) {break;}spliceCount++;}// 緩沖區長度對切割的優先級會更高,// 所以如果超出了緩沖區長度,那么切// 割點要由更大的一方決定。if (eventsCount > _bufferSize) {spliceCount = Math.max(spliceCount, eventsCount - _bufferSize);}if (spliceCount > 0) {_events.splice(0, spliceCount);}return _events; }訂閱過程
ReplaySubject 的訂閱過程比較特殊,因為訂閱的時候需要發送緩沖區數據,而且在不同時間進行訂閱也會使得緩沖區中的數據變化,所以訂閱是需要考慮的問題會比較多。那么,抓住 _infiniteTimeWindow 這個變量來看代碼會變得很容易。
// 以下源碼省略了調度器相關的代碼 _subscribe(subscriber: Subscriber<T>): Subscription {const _infiniteTimeWindow = this._infiniteTimeWindow;// 窗口時間是無限的則不用考慮// 窗口時間是有限的則更新緩沖區const _events = _infiniteTimeWindow ? this._events : this._trimBufferThenGetEvents();const len = _events.length;// 創建 subscriptionlet subscription: Subscription;if (this.isStopped || this.hasError) {subscription = Subscription.EMPTY;} else {this.observers.push(subscriber);subscription = new SubjectSubscription(this, subscriber);}// 分類討論不同的約束條件if (_infiniteTimeWindow) {// 窗口時間不是無限的,緩沖區存儲直接就是數據for (let i = 0; i < len && !subscriber.closed; i++) {subscriber.next(<T>_events[i]);}} else {// 窗口時間不是無限的,緩沖區存儲的是 ReplayEventfor (let i = 0; i < len && !subscriber.closed; i++) {subscriber.next((<ReplayEvent<T>>_events[i]).value);}}if (this.hasError) {subscriber.error(this.thrownError);} else if (this.isStopped) {subscriber.complete();}return subscription; }最后
本章我主要簡單分析了 5 種主要的 Subject,這些 Subject 實現了不同類型的 Muticasted Observable,對 Observable 進行了擴展。
總結
以上是生活随笔為你收集整理的RxJs Subject, AnonymousSubject, BehaviorSubject, ReplaySubject和AsyncSubject的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 科学家创造出世界首个木质晶体管:运行频率
- 下一篇: 代码泄露事件后,三星暂时禁止员工使用 C