rxjs 里的pipe operator
源代碼:
const a = of([1, 2, 3]);const b = map((data: number[]) => {for( let i = 0; i < data.length; i++){data[i] = data[i] + 1;}console.log('data: ' + data);return data;} );const c = a.pipe(b);c.subscribe((data) => console.log('Fairy:' + data));注意,傳入pipe的operations不是應用程序傳入的fn,而是執行了map operator后,被MapOperation包裹過的fn:
this還是指向Observable對象。
因為我只傳入了一個operation,所以不會調用array的reduce,而是直接返回數組里唯一的一個元素:
pipeFromArray(operations)返回operations數組里唯一的元素,即mapOperations,然后將this即當前調用pipe的Observable對象傳入mapOperations函數:
此時source就是調用pipe函數的Observable對象:
調用Observable對象的lift方法,輸入為新建的MapOperator:
這個MapOperator其實就是一個POJO - Plain Old JavaScript Object,就是回調project和this上下文的封裝。
Observable對象的lift操作也很好理解:新建一個Observable對象,將其source指向原始Observable,然后operator設置為剛才新建的MapOperator:
注意,此時我們執行subscribe的Observable對象,實際上是pipe操作返回的新Observable:
其map operator里包含了map應該執行的回調函數:
source指向的是原始的Observable對象。
Observable.subscribe里第一個重要的操作,就是創建subscriber對象。既然是subscribe,就得有subscribe對象,這個對象就是應用程序傳入的callback的封裝。
至此我們已經認識了一些對象的封裝了:
MapOperation和MapOperator是map操作符里傳入的function(稱為project)的封裝,其中MapOperator包含project和thisArg,而MapOperation內部的邏輯有二:一是確保project的類型為function,二是創建MapOperator,然后調用Observable的lift生成新的Observable.
Subscriber是應用程序傳入的callback的封裝。
Subscriber的destination指向了SafeSubscriber,后者包含的app callback:
繼續回到Observable的subscribe操作,下圖的含義是,如果新的Observable對象包含mapOperator(在我們的例子里確實包含),則調用該MapOperator,同時傳入的輸入參數為原始的Observable對象。
這里看到MapOperator的call方法了:
把subscribe調用鏈式傳遞給source了,這也是為什么MapOperation里為什么要調用原始Observable的lift方法——建立Observable與Observable之間的鏈接關系。
MapSubscriber和普通的Subscriber有何區別?上圖做了對比,就多了一個project字段。
原始對象(即ofType返回的Observable)調用trySubscribe:
sink即MapSubscriber,里面包含了app callback:
以及map project:
有了這些信息,可以執行map操作了。
始終記住這個語義:Observable.subscribe(subscriber)
2021-1-31 更新
Observable pipe方法驅動的流,每一個map函數的輸出是下一個函數的輸入。下列代碼打印:[3,4,5]
更多Jerry的原創文章,盡在:“汪子熙”:
總結
以上是生活随笔為你收集整理的rxjs 里的pipe operator的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: QQ影音投屏在哪里 投屏具体操作分享
- 下一篇: LOL信誉分过低怎么恢复 LOL信誉分过