生活随笔
收集整理的這篇文章主要介紹了
RxSwift之常用高阶函数(操作符Operator)的说明和使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、組合操作符
① startWith
startWith 方法會在 Observable 序列開始之前插入一些事件元素,即發出事件消息之前,會先發出這些預先插入的事件消息。
Observable . of ( "2" , "3" , "4" ) . startWith ( "1" ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 1 2 3 4
Observable . of ( "2" , "3" , "4" ) . startWith ( "1" ) . startWith ( "0" ) . startWith ( "B" ) . startWith ( "A" ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) AB
0 1 2 3 4
② merge
將將多個(兩個或兩個以上的)Observable 序列合并成一個 Observable 序列,并將像每個源可觀察序列發出元素一樣發出每個元素。
let subject1
= PublishSubject < String > ( ) let subject2
= PublishSubject < String > ( ) Observable . of ( subject1
, subject2
) . merge ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) subject1
. onNext ( "s" ) subject1
. onNext ( "w" ) subject2
. onNext ( "i" ) subject2
. onNext ( "f" ) subject1
. onNext ( "t" ) swift
③ zip
zip 方法可以將多個(兩個或兩個以上的)Observable 序列壓縮成一個 Observable 序列,而且它會等到每個 Observable 事件一一對應地湊齊之后再合并。 可以將多達 8 個源可觀測序列組合成一個新的可觀測序列,并將從組合的可觀測序列中發射出對應索引處每個源可觀測序列的元素。
let stringSubject
= PublishSubject < String > ( ) let intSubject
= PublishSubject < Int > ( ) Observable . zip ( stringSubject
, intSubject
) { stringElement
, intElement
in "\( stringElement) \( intElement) " } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) stringSubject
. onNext ( "Y" ) stringSubject
. onNext ( "D" ) intSubject
. onNext ( 1 ) intSubject
. onNext ( 2 ) stringSubject
. onNext ( "W" ) intSubject
. onNext ( 3 ) Y
1 D
2 W
3
④ combineLatest
combineLatest 是將多個(兩個或兩個以上的)Observable 序列元素進行合并。 但與 zip 不同的是,每當任意一個 Observable 有新的事件發出時,它會將每個 Observable 序列的最新的一個事件元素進行合并。 combineLatest 可以將 8 源可觀測序列組合成一個新的觀測序列,并將開始發出聯合觀測序列的每個源的最新元素可觀測序列,一旦所有排放源序列至少有一個元素,并且當源可觀測序列發出的任何一個新元素。
let stringSub
= PublishSubject < String > ( ) let intSub
= PublishSubject < Int > ( ) Observable . combineLatest ( stringSub
, intSub
) { strElement
, intElement
in "\( strElement) \( intElement) " } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) stringSub
. onNext ( "D" ) stringSub
. onNext ( "W" ) intSub
. onNext ( 1 ) intSub
. onNext ( 2 ) stringSub
. onNext ( "Y" ) W
1 W
2 Y
2
⑤ switchLatest
將可觀察序列發出的元素轉換為可觀察序列,并從最近的內部可觀察序列發出元素。 switchLatest 方法可以將兩個 Observable 序列合并為一個,每當 self 隊列發射一個元素時,便從第二個序列中取出最新的一個值。
let switchLatestSub1
= BehaviorSubject ( value
: "Y" ) let switchLatestSub2
= BehaviorSubject ( value
: "1" ) let switchLatestSub
= BehaviorSubject ( value
: switchLatestSub1
) switchLatestSub
. asObservable ( ) . switchLatest ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) switchLatestSub1
. onNext ( "D" ) switchLatestSub1
. onNext ( "_" ) switchLatestSub2
. onNext ( "2" ) switchLatestSub2
. onNext ( "3" ) switchLatestSub
. onNext ( switchLatestSub2
) switchLatestSub1
. onNext ( "*" ) switchLatestSub1
. onNext ( "W" ) switchLatestSub2
. onNext ( "4" ) YD
_ 3 4
組合操作符的使用都比較簡單,它們的原理都類似,以 combineLatest 為例: 初始化中 self._arity = arity 就是管理的序列個數; 初始化中 self._hasValue 就是一個初始化的個數為 arity 的,里面的值都是 false; 核心邏輯 next(_ index: Int) 方法中,判斷 self._hasValue[index] 就是剛剛初始化的集合,第一次進來就是第一個序列,進來就會標記 true,并且 _numberOfValues+1,此時就是0->1; 繼續往下面走,發現 _numberOfValues < arity 就會跳過; 如果下次還是第一個序列進來,第一層判斷就通不過,后面還是跳過; 如果下次進來的是第二個序列,那么第一層判斷就會進去,進來就會標記 true,并且 _numberOfValues+1,此時就是1->2; 第二層判斷也滿足條件 self._numberOfValues == self._arity,取回 let result = try self.getResult() 響應結果,然后就發送出去:self.forwardOn(.next(result)); 綜合得出 combineLatest 必須兩個序列都響應才會響應最終的結果。
⑥ withLatestFrom
將兩個 Observables 最新的元素通過一個函數組合起來,當第一個 Observable 發出一個元素,就將組合后的元素發送出來:
withLatestFrom 操作符將兩個 Observables 中最新的元素通過一個函數組合起來,然后將這個組合的結果發出來。當第一個 Observable 發出一個元素時,就立即取出第二個 Observable 中最新的元素,通過一個組合函數將兩個最新的元素合并后發送出去。 當第一個 Observable 發出一個元素時,就立即取出第二個 Observable 中最新的元素,然后把第二個 Observable 中最新的元素發送出去,示例如下:
let disposeBag
= DisposeBag ( )
let firstSubject
= PublishSubject < String > ( )
let secondSubject
= PublishSubject < String > ( ) firstSubject
. withLatestFrom ( secondSubject
) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) firstSubject
. onNext ( "🅰?" )
firstSubject
. onNext ( "🅱?" )
secondSubject
. onNext ( "1" )
secondSubject
. onNext ( "2" )
firstSubject
. onNext ( "🆎" )
2
當第一個 Observable 發出一個元素時,就立即取出第二個 Observable 中最新的元素,將第一個 Observable 中最新的元素 first 和第二個 Observable 中最新的元素second組合,然后把組合結果 first+second發送出去,示例如下:
let disposeBag
= DisposeBag ( )
let firstSubject
= PublishSubject < String > ( )
let secondSubject
= PublishSubject < String > ( ) firstSubject
. withLatestFrom ( secondSubject
) { ( first , second
) in return first + second
} . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) firstSubject
. onNext ( "🅰?" )
firstSubject
. onNext ( "🅱?" )
secondSubject
. onNext ( "1" )
secondSubject
. onNext ( "2" )
firstSubject
. onNext ( "🆎" )
🆎
2
二、映射(變換)操作符
① buffer
buffer 操作符將緩存 Observable 中發出的新元素,當元素達到某個數量,或者經過了特定的時間,它就會將這個元素集合發送出來。 如下所示,timeSpan 緩存間隔時間、count 緩存個數、scheduler 線程,發送兩個 event 后會觸發訂閱,滿 2 秒也會觸發訂閱,如果 event 沒有發送空數組:
let sub
= PublishSubject < String > ( )
sub
. buffer ( timeSpan
: 2 , count : 2 , scheduler
: MainScheduler . asyncInstance
) . subscribe
{ event
in print ( "訂閱1" , event
) } . disposed ( by
: disposeBag
) sub
. onNext ( "發送1" )
sub
. onNext ( "發送2" )
訂閱
1 next ( [ "發送1" , "發送2" ] )
訂閱
1 next ( [ ] )
訂閱
1 next ( [ ] )
訂閱
1 next ( [ ] )
② map
轉換閉包應用于可觀察序列發出的元素,并返回轉換后的元素的新可觀察序列。
let ob
= Observable . of ( 1 , 2 , 3 , 4 ) ob
. map { ( number
) - > Int in return number
+ 2 } . subscribe
{ print ( "\( $0 ) " ) } . disposed ( by
: disposeBag
) next ( 3 ) next ( 4 ) next ( 5 ) next ( 6 ) completed
③ flatMap and flatMapLatest
將可觀測序列發射的元素轉換為可觀測序列,并將兩個可觀測序列的發射合并為一個可觀測序列。 例如,當有一個可觀察的序列,它本身發出可觀察的序列,想能夠對任何一個可觀察序列的新發射做出反應(序列中序列:比如網絡序列中還有模型序列)。 flatMap 和 flatMapLatest 的區別是,flatMapLatest 只會從最近的內部可觀測序列發射元素。 flatMapLatest 實際上是 map 和 switchLatest 操作符的組合。
let boy
= YDWPlayer ( score
: 100 ) let girl
= YDWPlayer ( score
: 90 ) let player
= BehaviorSubject ( value
: boy
) player
. asObservable ( ) . flatMap
{ $
0 . score
. asObservable ( ) } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) boy
. score
. onNext ( 60 ) player
. onNext ( girl
) boy
. score
. onNext ( 50 ) boy
. score
. onNext ( 40 ) girl
. score
. onNext ( 10 ) girl
. score
. onNext ( 0 )
④ scan
從初始就帶有一個默認值開始,然后對可觀察序列發出的每個元素應用累加器閉包,并以單個元素可觀察序列的形式返回每個中間結果。
Observable . of ( 10 , 100 , 1000 ) . scan ( 2 ) { aggregateValue
, newValue
in aggregateValue
+ newValue
} . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
)
映射型高階函數,以 map 來進行分析: 通過 composeMap 創建中間序列:Map; 初始化中 self._source = source & self._transform = transform 保存源序列和外界傳進去的映射表達式:$0+2; MapSink 調用 on(_ event: Event)來發送信號,發送信號之前 let mappedElement = try self._transform(element) 取出要發送的結果,就是經過映射表達式處理的結果; self.forwardOn(.next(mappedElement)) 正常發送。
⑤ window
將 Observable 分解為多個子 Observable,周期性的將子 Observable 發出來:
window 操作符和 buffer 十分相似,buffer 周期性的將緩存的元素集合發送出來,而 window 周期性的將元素集合以 Observable 的形態發送出來。 buffer 要等到元素搜集完畢后,才會發出元素序列,而 window 可以實時發出元素序列。 使用示例:
let disposeBag
= DisposeBag ( )
let subject
= PublishSubject < String > ( )
subject
. window ( timeSpan
: 1 , count : 3 , scheduler
: MainScheduler . instance
) . subscribe ( onNext
: { [ weak self ] in print ( "subscribe: \( $0 ) " ) $
0 . asObservable ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: self ! . disposeBag
) } ) . disposed ( by
: disposeBag
) subject
. onNext ( "a" )
subject
. onNext ( "b" )
subject
. onNext ( "c" ) subject
. onNext ( "1" )
subject
. onNext ( "2" )
subject
. onNext ( "3" )
subscribe
: RxSwift . AddRef < Swift . String >
a
b
c
subscribe
: RxSwift . AddRef < Swift . String >
1
2
3
subscribe
: RxSwift . AddRef < Swift . String >
⑥ concatMap
concatMap 將 Observable 的元素轉換成其他的 Observable,然后將這些 Observables 串連起來:
concatMap 操作符將源 Observable 的每一個元素應用一個轉換方法,將它們轉換成 Observables,然后讓這些 Observables 按順序的發出元素,當前一個 Observable 元素發送完畢后,后一個 Observable 才可以開始發出元素,等待前一個 Observable 產生完成事件后,才對后一個 Observable 進行訂閱。 如下所示:
let disposeBag
= DisposeBag ( ) let subject1
= BehaviorSubject ( value
: "🍎" )
let subject2
= BehaviorSubject ( value
: "🐶" ) let variable
= Variable ( subject1
) variable
. asObservable ( ) . concatMap
{ $
0 } . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) subject1
. onNext ( "🍐" )
subject1
. onNext ( "🍊" ) variable
. value
= subject2subject2
. onNext ( "I would be ignored" )
subject2
. onNext ( "🐱" ) subject1
. onCompleted ( ) subject2
. onNext ( "🐭" )
next ( 🍎
)
next ( 🍐
)
next ( 🍊
)
next ( 🐱
)
next ( 🐭
)
⑦ groupBy
將源 Observable 分解為多個子 Observable,并且每個子 Observable 將源 Observable 中“相似”的元素發送出來:
groupBy 操作符將源 Observable 分解為多個子 Observable,然后將這些子 Observable 發送出來。它會將元素通過某個鍵進行分組,然后將分組后的元素序列以 Observable 的形態發送出來。 使用示例:
let disposeBag
= DisposeBag ( )
Observable < Int > . of ( 0 , 1 , 2 , 3 , 4 , 5 ) . groupBy ( keySelector
: { ( element
) - > String in return element
% 2 == 0 ? "偶數" : "基數" } ) . subscribe
{ ( event
) in switch event
{ case . next ( let group
) : group
. asObservable ( ) . subscribe ( { ( event
) in print ( "key:\( group. key) event:\( event) " ) } ) . disposed ( by
: disposeBag
) default : print ( "" ) } }
. disposed ( by
: disposeBag
)
key:偶數 event:
next ( 0 )
key:基數 event:
next ( 1 )
key:偶數 event:
next ( 2 )
key:基數 event:
next ( 3 )
key:偶數 event:
next ( 4 )
key:基數 event:
next ( 5 )
key:偶數 event:completed
key:基數 event:completed
三、過濾條件操作符
① filter
Observable . of ( 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 0 ) . filter { $
0 % 2 == 0 } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 2 4 6 8 0
② distinctUntilChanged
Observable . of ( "1" , "2" , "2" , "2" , "3" , "3" , "4" ) . distinctUntilChanged ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 1 2 3 4
③ elementAt
僅在可觀察序列發出的所有元素的指定索引處發出元素:
Observable . of ( "Y" , "D" , "W" , "N" , "B" ) . elementAt ( 3 ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) N
④ single
只發出可觀察序列發出的第一個元素(或滿足條件的第一個元素)。如果可觀察序列發出多個元素,將拋出一個錯誤。
Observable . of ( "D" , "W" ) . single ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) Observable . of ( "D" , "W" ) . single
{ $
0 == "W" } . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) D
Unhandled error happened
: Sequence contains more than one element
. subscription called from
: next ( W
) completed
⑤ take
只從一個可觀察序列的開始發出指定數量的元素。 signal 只有一個序列,在實際開發會受到局限,這里引出 take 可以不受限制。
Observable . of ( "A" , "B" , "C" , "D" ) . take ( 2 ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) AB
⑥ takeLast
Observable . of ( "A" , "B" , "C" , "D" ) . takeLast ( 3 ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) BCD
⑦ takeWhile
只要指定條件的值為 true,就從可觀察序列的開始發出元素。
Observable . of ( 1 , 2 , 3 , 4 , 5 , 6 ) . takeWhile
{ $
0 < 3 } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 1 2
⑧ takeUntil
從源可觀察序列發出元素,直到參考可觀察序列發出元素。takeUntil 應用非常頻繁 比如頁面銷毀,就不能獲取值(cell 重用運用)。
let sourceSequence
= PublishSubject < String > ( ) let referenceSequence
= PublishSubject < String > ( ) sourceSequence
. takeUntil ( referenceSequence
) . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) sourceSequence
. onNext ( "A" ) sourceSequence
. onNext ( "B" ) sourceSequence
. onNext ( "C" ) referenceSequence
. onNext ( "D" ) sourceSequence
. onNext ( "E" ) sourceSequence
. onNext ( "F" ) sourceSequence
. onNext ( "G" ) next ( A
) next ( B
) next ( C
) completed
⑨ skip
從源可觀察序列發出元素,直到參考可觀察序列發出元素。skip 應用非常頻繁,不用解釋 textfiled 都會有默認序列產生。
Observable . of ( 1 , 2 , 3 , 4 , 5 , 6 ) . skip ( 2 ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) Observable . of ( 1 , 2 , 3 , 4 , 5 , 6 ) . skipWhile
{ $
0 < 4 } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 3 4 5 6 4 5 6
⑩ skipUntil
抑制從源可觀察序列發出元素,直到參考可觀察序列發出元素。
let sourceSeq
= PublishSubject < String > ( ) let referenceSeq
= PublishSubject < String > ( ) sourceSeq
. skipUntil ( referenceSeq
) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) sourceSeq
. onNext ( "A" ) sourceSeq
. onNext ( "B" ) sourceSeq
. onNext ( "C" ) referenceSeq
. onNext ( "D" ) sourceSeq
. onNext ( "E" ) sourceSeq
. onNext ( "F" ) sourceSeq
. onNext ( "G" ) EFG
? ignoreElements
忽略掉所有的元素,只發出 error 或 completed 事件:
ignoreElements 操作符將阻止 Observable 發出 next 事件,但是允許他發出 error 或 completed 事件。如果并不關心 Observable 的任何元素,只想知道 Observable 在什么時候終止,那就可以使用 ignoreElements 操作符。 使用示例:
let disposeBag
= DisposeBag ( ) Observable . of ( 1 , 2 , 3 , 4 ) . ignoreElements ( ) . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
)
completed
四、集合控制操作符
① toArray
將一個可觀察序列轉換為一個數組,將該數組作為一個新的單元素可觀察序列發出,然后終止。
Observable . range ( start
: 1 , count : 10 ) . toArray ( ) . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) success ( [ 1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ] )
② reduce
從一個設置的初始化值開始,然后對一個可觀察序列發出的所有元素應用累加器閉包,并以單個元素可觀察序列的形式返回聚合結果 ,類似 scan。
Observable . of ( 10 , 100 , 1000 ) . reduce ( 1 , accumulator
: + ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) 1111
③ concat
以順序方式連接來自一個可觀察序列的內部可觀察序列的元素,在從下一個序列發出元素之前,等待每個序列成功終止。
let subject1
= BehaviorSubject ( value
: "Y" ) let subject2
= BehaviorSubject ( value
: "1" ) let subjectsSubject
= BehaviorSubject ( value
: subject1
) subjectsSubject
. asObservable ( ) . concat ( ) . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) subject1
. onNext ( "D" ) subject1
. onNext ( "W" ) subjectsSubject
. onNext ( subject2
) subject2
. onNext ( "打印不出來" ) subject2
. onNext ( "2" ) subject1
. onCompleted ( ) subject2
. onNext ( "3" ) next ( Y
) next ( D
) next ( W
) next ( 2 ) next ( 3 )
五、從可觀察對象的錯誤通知中恢復的操作符
① catchError
通過切換到提供的恢復可觀察序列,從錯誤事件中恢復:
let dwError
= NSError ( domain
: "com.xx.***" , code
: 0 , userInfo
: [ "key" : "400" ] ) as ? Error let sequenceThatFails
= PublishSubject < String > ( ) let recoverySequence
= PublishSubject < String > ( ) sequenceThatFails
. catchError
{ print ( "Error:" , $
0 ) return recoverySequence
} . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) sequenceThatFails
. onNext ( "Y" ) sequenceThatFails
. onNext ( "D" ) sequenceThatFails
. onError ( dwError
! ) recoverySequence
. onNext ( "W" ) next ( Y
) next ( D
) Error : Error Domain = com
. xx
. * * * Code = 0 "(null)" UserInfo = { key
= 400 } next ( W
)
② catchErrorJustReturn
從錯誤事件中恢復,方法是返回一個可觀察到的序列,該序列發出單個元素,然后終止。
let sequenceThatFails
= PublishSubject < String > ( ) sequenceThatFails
. catchErrorJustReturn ( "A" ) . subscribe
{ print ( $
0 ) } . disposed ( by
: disposeBag
) sequenceThatFails
. onNext ( "B" ) sequenceThatFails
. onNext ( "C" ) sequenceThatFails
. onError ( self . dwError
) next ( D
) next ( W
) next ( Y
) completed
③ retry
通過無限地重新訂閱可觀察序列來恢復重復的錯誤事件。
var count = 1 let sequenceRetryErrors
= Observable < String > . create
{ observer
in observer
. onNext ( "A" ) observer
. onNext ( "B" ) observer
. onNext ( "C" ) if count == 1 { observer
. onError ( dwError
! ) print ( "錯誤序列" ) count + = 1 } observer
. onNext ( "D" ) observer
. onNext ( "E" ) observer
. onNext ( "F" ) observer
. onCompleted ( ) return Disposables . create ( ) } sequenceRetryErrors
. retry ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) ABC錯誤序列ABCDEF
④ retry( _ : ):
通過重新訂閱可觀察到的序列,重復地從錯誤事件中恢復,直到重試次數達到 max 未遂計數。
var count = 1 let sequenceThatErrors
= Observable < String > . create
{ observer
in observer
. onNext ( "A" ) observer
. onNext ( "B" ) observer
. onNext ( "C" ) if count < 5 { observer
. onError ( dwError
! ) print ( "錯誤序列" ) count + = 1 } observer
. onNext ( "D" ) observer
. onNext ( "E" ) observer
. onNext ( "F" ) observer
. onCompleted ( ) return Disposables . create ( ) } sequenceThatErrors
. retry ( 3 ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) ABC錯誤序列ABC錯誤序列ABC錯誤序列
Unhandled error happened
: Error Domain = com
. xx
. * * * Code = 0 "(null)" UserInfo = { key
= 400 } subscription called from
:
六、Rx 流程操作符
① debug
var count = 1 let sequenceThatErrors
= Observable < String > . create
{ observer
in observer
. onNext ( "A" ) observer
. onNext ( "B" ) observer
. onNext ( "C" ) if count < 5 { observer
. onError ( dwError
! ) print ( "錯誤序列" ) count + = 1 } observer
. onNext ( "D" ) observer
. onNext ( "E" ) observer
. onNext ( "F" ) observer
. onCompleted ( ) return Disposables . create ( ) } sequenceThatErrors
. retry ( 3 ) . debug ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
) ViewController . swift
: 346 ( viewDidLoad ( ) ) - > subscribed
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( A
) A
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( B
) B
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( C
) C錯誤序列
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( A
) A
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( B
) B
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( C
) C錯誤序列
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( A
) A
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( B
) B
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event next ( C
) C錯誤序列
ViewController . swift
: 346 ( viewDidLoad ( ) ) - > Event error ( Error Domain = com
. xx
. * * * Code = 0 "(null)" UserInfo = { key
= 400 } )
Unhandled error happened
: Error Domain = com
. xx
. * * * Code = 0 "(null)" UserInfo = { key
= 400 } subscription called from
: ViewController . swift
: 346 ( viewDidLoad ( ) ) - > isDisposed
② RxSwift.Resources.total:
提供所有 Rx 資源分配的計數,這對于在開發期間檢測泄漏非常有用。
print ( RxSwift . Resources . total
) let subject
= BehaviorSubject ( value
: "A" ) let subscription1
= subject
. subscribe ( onNext
: { print ( $
0 ) } ) print ( RxSwift . Resources . total
) let subscription2
= subject
. subscribe ( onNext
: { print ( $
0 ) } ) print ( RxSwift . Resources . total
) subscription1
. dispose ( ) print ( RxSwift . Resources . total
) subscription2
. dispose ( ) print ( RxSwift . Resources . total
)
七、鏈接操作符(Connectable Observable Operators)
① 可連接的序列
可連接的序列(Connectable Observable): 可連接的序列和一般序列不同在于:有訂閱時不會立刻開始發送事件消息,只有當調用 connect() 之后才會開始發送值。 可連接的序列可以讓所有的訂閱者訂閱后,才開始發出事件消息,從而保證想要的所有訂閱者都能接收到事件消息。 普通序列:
let interval
= Observable < Int > . interval ( 1 , scheduler
: MainScheduler . instance
)
_ = interval
. subscribe ( onNext
: { print ( "訂閱1: \( $0 ) " ) } )
delay ( 5 ) { _ = interval
. subscribe ( onNext
: { print ( "訂閱2: \( $0 ) " ) } )
}
public func delay ( _ delay
: Double , closure
: @escaping
( ) - > Void ) { DispatchQueue . main
. asyncAfter ( deadline
: . now ( ) + delay
) { closure ( ) }
}
運行結果如下,可以看到第一個訂閱者訂閱后,每隔 1 秒會收到一個值。而第二個訂閱者 5 秒后才收到第一個值 0,所以兩個訂閱者接收到的值是不同步的:
訂閱
1 :
0
訂閱
1 :
1
訂閱
1 :
2
訂閱
1 :
3
訂閱
1 :
4
訂閱
1 :
5
訂閱
2 :
0
訂閱
1 :
6
訂閱
2 :
1
② multicast
將源可觀察序列轉換為可連接序列,并通過指定的主題廣播其發射。
func testMulticastConnectOperators ( ) { let subject
= PublishSubject < Any > ( ) subject
. subscribe
{ print ( "00:\( $0 ) " ) } . disposed ( by
: disposeBag
) let netOB
= Observable < Any > . create
{ ( observer
) - > Disposable in sleep ( 2 ) print ( "開始請求網絡" ) observer
. onNext ( "請求到的網絡數據" ) observer
. onNext ( "請求到的本地" ) observer
. onCompleted ( ) return Disposables . create
{ print ( "銷毀回調" ) } } . publish ( ) netOB
. subscribe ( onNext
: { ( anything
) in print ( "訂閱1:" , anything
) } ) . disposed ( by
: disposeBag
) netOB
. subscribe ( onNext
: { ( anything
) in print ( "訂閱2:" , anything
) } ) . disposed ( by
: disposeBag
) _ = netOB
. connect ( ) }
說明: 底層邏輯探索中間變量 ConnectableObservableAdapter 保存了源序列 source、中間序列 makeSubject; 訂閱流程 self.lazySubject.subscribe(observer) 一個懶加載的序列,保證了中間變量 ConnectableObservableAdapter 每一次都是同一個響應序列 剩下就是 PublishSubject 的訂閱效果; 等待源序列的響應,但是源序列的訂閱是在 connect 函數里面,如果沒有調用 connect 函數,意味著就永遠不會發送響應。這樣背后的邏輯就是,前面所以的發送響應在 connect 函數之前的都沒有任何的意義。 以上也就說明 publish 就是狀態共享的:connnect 一次序列發送一次響應(響應所有訂閱)。
③ replay
將源可觀察序列轉換為可連接的序列,并將向每個新訂閱服務器重放以前排放的緩沖大小; 首先擁有和 publish 一樣的能力,共享 Observable sequence, 其次使用 replay 還需要傳入一個參數(buffer size)來緩存已發送的事件,當有新的訂閱者訂閱了,會把緩存的事件發送給新的訂閱者。
func testReplayConnectOperators ( ) { let interval
= Observable < Int > . interval ( . seconds ( 1 ) , scheduler
: MainScheduler . instance
) . replay ( 5 ) interval
. subscribe ( onNext
: { print ( Date . time
, "訂閱: 1, 事件: \( $0 ) " ) } ) . disposed ( by
: self . disposeBag
) delay ( 2 ) { _ = interval
. connect ( ) } delay ( 4 ) { interval
. subscribe ( onNext
: { print ( Date . time
, "訂閱: 2, 事件: \( $0 ) " ) } ) . disposed ( by
: self . disposeBag
) } delay ( 8 ) { interval
. subscribe ( onNext
: { print ( Date . time
, "訂閱: 3, 事件: \( $0 ) " ) } ) . disposed ( by
: self . disposeBag
) } delay ( 20 , closure
: { self . disposeBag
= DisposeBag ( ) } ) }
④ publish
publish 方法會將一個正常的序列轉換成一個可連接的序列,同時該序列不會立刻發送事件,只有在調用 connect 之后才會開始。
let interval
= Observable < Int > . interval ( 1 , scheduler
: MainScheduler . instance
) . publish ( )
_ = interval
. subscribe ( onNext
: { print ( "訂閱1: \( $0 ) " ) } )
delay ( 2 ) { _ = interval
. connect ( )
}
delay ( 5 ) { _ = interval
. subscribe ( onNext
: { print ( "訂閱2: \( $0 ) " ) } )
}
訂閱
1 :
0
訂閱
1 :
1
訂閱
1 :
2
訂閱
2 :
2
訂閱
1 :
3
訂閱
2 :
3
訂閱
1 :
4
訂閱
2 :
4
訂閱
1 :
5
⑤ refCount
將可被連接的 Observable 轉換為普通 Observable:
可被連接的 Observable 和普通的 Observable 十分相似,不過在被訂閱后不會發出元素,直到 connect 操作符被應用為止。這樣一來可以控制 Observable 在什么時候開始發出元素。 refCount 操作符將自動連接和斷開可被連接的 Observable。它將可被連接的 Observable 轉換為普通 Observable。當第一個觀察者對它訂閱時,那么底層的 Observable 將被連接;當最后一個觀察者離開時,那么底層的 Observable 將被斷開連接。 使用示例:
let interval
= Observable < Int > . interval ( 1 , scheduler
: MainScheduler . instance
) . publish ( ) . refCount ( )
_ = interval
. subscribe ( onNext
: { print ( "訂閱1: \( $0 ) " ) } )
delay ( 5 ) { _ = interval
. subscribe ( onNext
: { print ( "訂閱2: \( $0 ) " ) } )
}
訂閱
1 : 0
訂閱
1 : 1
訂閱
1 : 2
訂閱
1 : 3
訂閱
1 : 4
訂閱
1 : 5
訂閱
2 : 5
訂閱
1 : 6
訂閱
2 : 6
訂閱
1 : 7
訂閱
2 : 7
訂閱
1 : 8
訂閱
2 : 8
⑥ shareReplay
使觀察者共享 Observable,觀察者會立即收到最新的元素,即使這些元素是在訂閱前產生的:
shareReplay 操作符將使得觀察者共享源 Observable,并且緩存最新的 n 個元素,將這些元素直接發送給新的觀察者。 使用示例:
let interval
= Observable < Int > . interval ( 1 , scheduler
: MainScheduler . instance
) . share ( replay
: 5 )
_ = interval
. subscribe ( onNext
: { print ( "訂閱1: \( $0 ) " ) } )
delay ( 5 ) { _ = interval
. subscribe ( onNext
: { print ( "訂閱2: \( $0 ) " ) } ) }
public func delay ( _ delay
: Double , closure
: @escaping
( ) - > Void ) { DispatchQueue . main
. asyncAfter ( deadline
: . now ( ) + delay
) { closure ( ) }
}
訂閱
1 : 0
訂閱
1 : 1
訂閱
1 : 2
訂閱
1 : 3
訂閱
1 : 4
訂閱
2 : 0
訂閱
2 : 1
訂閱
2 : 2
訂閱
2 : 3
訂閱
2 : 4
訂閱
1 : 5
訂閱
2 : 5
訂閱
1 : 6
訂閱
2 : 6
八、條件和布爾操作符(Conditional and Boolean Operators)
① amb
在多個源 Observables 中,取第一個發出元素或產生事件的 Observable,然后只發出它的元素:
當傳入多個 Observables 到 amb 操作符時,它將取其中一個 Observable:第一個產生事件的那個 Observable,可以是一個 next,error 或者 completed 事件;amb 將忽略掉其它的 Observables。 例如:
let a
= Observable < Int > . just ( 0 )
let b
= Observable < Int > . just ( 1 )
let c
= Observable < Int > . just ( 2 ) Observable < Int > . amb ( [ a
, b
, c
] ) . subscribe
{ event
in switch event
{ case . next ( let element
) : print ( "element:" , element
) case . error ( let error
) : print ( "error:" , error
) case . completed
: print ( "completed" ) } } . disposed ( by
: bag
)
輸出:
element
: 0
completed
② skipWhile
跳過 Observable 中頭幾個元素,直到元素的判定為否:
skipWhile 操作符可以讓你忽略源 Observable 中頭幾個元素,直到元素的判定為否后,它才鏡像源 Observable。 使用示例:
let disposeBag
= DisposeBag ( ) Observable . of ( 1 , 2 , 3 , 4 , 3 , 2 , 1 ) . skipWhile
{ $
0 < 4 } . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
)
4
3
2
1
九、其它操作符
① connect
connect 通知 ConnectableObservable 可以開始發出元素:
ConnectableObservable 和普通的 Observable 十分相似,不過在被訂閱后不會發出元素,直到 connect 操作符被應用為止,這樣一來可以等所有觀察者全部訂閱完成后,才發出元素。 如下所示:
let intSequence
= Observable < Int > . interval ( 1 , scheduler
: MainScheduler . instance
) . publish ( ) _ = intSequence
. subscribe ( onNext
: { print ( "Subscription 1:, Event: \( $0 ) " ) } ) DispatchQueue . main
. asyncAfter ( deadline
: . now ( ) + 2 ) { _ = intSequence
. connect ( )
} DispatchQueue . main
. asyncAfter ( deadline
: . now ( ) + 4 ) { _ = intSequence
. subscribe ( onNext
: { print ( "Subscription 2:, Event: \( $0 ) " ) } )
} DispatchQueue . main
. asyncAfter ( deadline
: . now ( ) + 6 ) { _ = intSequence
. subscribe ( onNext
: { print ( "Subscription 3:, Event: \( $0 ) " ) } )
}
Subscription 1 : , Event : 0
Subscription 1 : , Event : 1
Subscription 2 : , Event : 1
Subscription 1 : , Event : 2
Subscription 2 : , Event : 2
Subscription 1 : , Event : 3
Subscription 2 : , Event : 3
Subscription 3 : , Event : 3
Subscription 1 : , Event : 4
Subscription 2 : , Event : 4
Subscription 3 : , Event : 4
Subscription 1 : , Event : 5
Subscription 2 : , Event : 5
Subscription 3 : , Event : 5
Subscription 1 : , Event : 6
Subscription 2 : , Event : 6
Subscription 3 : , Event : 6
. . .
② create
通過一個構建函數完整的創建一個 Observable:
create 操作符將創建一個 Observable,需要提供一個構建函數,在構建函數里面描述事件(next,error,completed)的產生過程。 通常情況下一個有限的序列,只會調用一次觀察者的 onCompleted 或者 onError 方法,并且在調用它們后,不會再去調用觀察者的其它方法。 如下所示,創建一個 [0, 1, … 8, 9] 的序列:
let id
= Observable < Int > . create
{ observer
in observer
. onNext ( 0 ) observer
. onNext ( 1 ) observer
. onNext ( 2 ) observer
. onNext ( 3 ) observer
. onNext ( 4 ) observer
. onNext ( 5 ) observer
. onNext ( 6 ) observer
. onNext ( 7 ) observer
. onNext ( 8 ) observer
. onNext ( 9 ) observer
. onCompleted ( ) return Disposables . create ( )
}
③ debounce
debounce 操作符將發出這種元素,在 Observable 產生這種元素后,一段時間內沒有新元素產生。 如下所示:
let pb
= PublishSubject < Int > ( )
pb
. debounce ( 2 , scheduler
: MainScheduler . instance
) . subscribe ( onNext
: { int
in print ( "element:" , int
) } ) . disposed ( by
: bag
)
pb
. onNext ( 1 )
pb
. onNext ( 2 )
pb
. onNext ( 3 )
pb
. onNext ( 4 )
pb
. onNext ( 5 )
指定了兩秒鐘,所以在兩秒鐘以內,只接收了到了最新的 element: 5,因此輸出如下:
element
: 5
④ deferred
直到訂閱發生,才創建 Observable,并且為每位訂閱者創建全新的 Observable:
deferred 操作符將等待觀察者訂閱它,才創建一個 Observable,它會通過一個構建函數為每一位訂閱者創建新的 Observable。看上去每位訂閱者都是對同一個 Observable 產生訂閱,實際上它們都獲得了獨立的序列。 在一些情況下,直到訂閱時才創建 Observable 是可以保證拿到的數據都是最新的。 如下所示:
let ob
= Observable < Int > . deferred
{ ( ) - > Observable < Int > in let ob1
= Observable < Int > . create ( { ov
in ov
. onNext ( 1 ) ov
. onNext ( 2 ) ov
. onCompleted ( ) return Disposables . create ( ) } ) return ob1
}
ob
. subscribe ( onNext
: { int
in print ( int
)
} ) . disposed ( by
: bag
)
1
2
⑤ delay
將 Observable 的每一個元素拖延一段時間后發出:
delay 操作符將修改一個 Observable,它會將 Observable 的所有元素都拖延一段設定好的時間, 然后才將它們發送出來。 如下所示:
Observable < Int > . just ( 1 ) . delay ( 3 , scheduler
: MainScheduler . instance
) . subscribe ( onNext
: { i
in print ( "element: " , i
) } ) . disposed ( by
: bag
)
element
: 1
⑥ delaySubscription
delaySubscription 操作符將在經過所設定的時間后,才對 Observable 進行訂閱操作。
Observable < Int > . just ( 2 ) . delaySubscription ( 3 , scheduler
: MainScheduler . instance
) . subscribe ( onNext
: { i
in print ( "element: " , i
) } ) . disposed ( by
: bag
)
element
: 2
⑦ materialize
一個有限的 Observable 將產生零個或者多個 onNext 事件,然后產生一個 onCompleted 或者 onError 事件。materialize 操作符將 Observable 產生的這些事件全部轉換成元素,然后發送出來。
let disposeBag
= DisposeBag ( )
Observable . of ( 1 , 2 , 1 ) . materialize ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
)
next ( 1 )
next ( 2 )
next ( 1 )
completed
⑧ dematerialize
dematerialize 操作符將 materialize 轉換后的元素還原:
let disposeBag
= DisposeBag ( ) Observable . of ( 1 , 2 , 1 ) . materialize ( ) . dematerialize ( ) . subscribe ( onNext
: { print ( $
0 ) } ) . disposed ( by
: disposeBag
)
1
2
1
⑨ do
當 Observable 的某些事件產生時,可以使用 do 操作符來注冊一些回調操作,這些回調會被單獨調用,它們會和 Observable 原本的回調分離。
let disposeBag
= DisposeBag ( ) Observable < [ Int ] > . of ( [ 1 , 2 , 3 ] ) . do ( onNext
: { element
in print ( "do element:" , element
) } , onError
: { error
in print ( "do error:" , error
) } , onCompleted
: { print ( "do completed" ) } , onSubscribe
: { print ( "do subscribe" ) } , onSubscribed
: { print ( "do subscribed" ) } , onDispose
: { print ( "do dispose" ) } ) . subscribe
{ event
in switch event
{ case . next ( let element
) : print ( "element:" , element
) case . error ( let error
) : print ( "error:" , error
) case . completed
: print ( "completed" ) } } . disposed ( by
: disposeBag
)
do 優先于 subscribe 打印,因此輸出結果:
do subscribe
do subscribed
do element
: [ 1 , 2 , 3 ]
element
: [ 1 , 2 , 3 ]
do completed
completed
do dispose
⑩ empty
empty 操作符將創建一個 Observable,這個 Observable 只有一個完成事件:
let id
= Observable < Int > . empty ( )
let id
= Observable < Int > . create
{ observer
in observer
. onCompleted ( ) return Disposables . create ( )
}
? timeout
如果 Observable 在一段時間內沒有產生元素,timeout 操作符將使它發出一個超時的 error 事件。
let disposeBag
= DisposeBag ( )
let times
= [ [ "value" : 1 , "time" : 0 ] , [ "value" : 2 , "time" : 0.5 ] , [ "value" : 3 , "time" : 1.5 ] , [ "value" : 4 , "time" : 4 ] , [ "value" : 5 , "time" : 5 ] ]
Observable . from ( times
) . flatMap
{ item
in return Observable . of ( Int ( item
[ "value" ] ! ) ) . delaySubscription ( Double ( item
[ "time" ] ! ) , scheduler
: MainScheduler . instance
) } . timeout ( 2 , scheduler
: MainScheduler . instance
) . subscribe ( onNext
: { element
in print ( element
) } , onError
: { error
in print ( error
) } ) . disposed ( by
: disposeBag
)
1
2
3
Sequence timeout
? using
通過使用 using 操作符創建 Observable 時,同時創建一個可被清除的資源,一旦 Observable 終止,那么這個資源就會被清除掉。
let infiniteInterval$
= Observable < Int > . interval ( 0.1 , scheduler
: MainScheduler . instance
) . do ( onNext
: { print ( "infinite$: \( $0 ) " ) } , onSubscribe
: { print ( "開始訂閱 infinite$" ) } , onDispose
: { print ( "銷毀 infinite$" ) }
)
let limited$
= Observable < Int > . interval ( 0.5 , scheduler
: MainScheduler . instance
) . take ( 2 ) . do ( onNext
: { print ( "limited$: \( $0 ) " ) } , onSubscribe
: { print ( "開始訂閱 limited$" ) } , onDispose
: { print ( "銷毀 limited$" ) }
)
let o
: Observable < Int > = Observable . using ( { ( ) - > AnyDisposable in return AnyDisposable ( infiniteInterval$
. subscribe ( ) )
} , observableFactory
: { _ in return limited$
}
)
o
. subscribe ( )
class AnyDisposable : Disposable { let _dispose
: ( ) - > Void init ( _ disposable
: Disposable ) { _dispose
= disposable
. dispose
} func dispose ( ) { _dispose ( ) }
}
開始訂閱 infinite$
開始訂閱 limited$
infinite$
: 0
infinite$
: 1
infinite$
: 2
infinite$
: 3
limited$
: 0
infinite$
: 4
infinite$
: 5
infinite$
: 6
infinite$
: 7
infinite$
: 8
limited$
: 1
銷毀 limited$
銷毀 infinite$
與50位技術專家面對面 20年技術見證,附贈技術全景圖
總結
以上是生活随笔 為你收集整理的RxSwift之常用高阶函数(操作符Operator)的说明和使用 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。