android rxjava2 简书,RXJava2学习
什么是RxJava
一個可觀測的序列來組成異步的、基于事件的程序的庫。(簡單來說:它就是一個實現異步操作的庫)
RxJava 好在哪?
RxJava 其實就是提供一套異步編程的 API,這套 API 是基于觀察者模式的,而且是鏈式調用的,所以使用 RxJava 編寫的代碼的邏輯會非常簡潔。
觀察者模式
定義:定義對象間一種一對多的依賴關系,使得每當一個對象改變狀態,則所有依賴于它的對象都會得到通知并被自動更新
作用是:解耦 UI層與具體的業務邏輯解耦
適用場景
數據庫的讀寫、大圖片的載入、文件壓縮/解壓等各種需要放在后臺工作的耗時操作,都可以用 RxJava 來實現。
三個基本的元素
被觀察者(Observable)
觀察者(Observer)
onSubscribe() 訂閱觀察者的時候被調用
onNext() 發送該事件時,觀察者會回調 onNext() 方法
onError() 發送該事件時,觀察者會回調 onError() 方法,當發送該事件之后,其他事件將不會繼續發送
onComplete() 發送該事件時,觀察者會回調 onComplete() 方法,當發送該事件之后,其他事件將不會繼續發送
訂閱(subscribe)
連接觀察者和被觀察者
// 1. 通過creat()創建被觀察者對象
Observable.create(new ObservableOnSubscribe() {
// 2. 在復寫的subscribe()里定義需要發送的事件
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onNext(4);
emitter.onComplete();
} // 至此,一個被觀察者對象(Observable)就創建完畢
}).subscribe(new Observer() {
// 3. 通過通過訂閱(subscribe)連接觀察者和被觀察者
// 4. 創建觀察者 & 定義響應事件的行為
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "開始采用subscribe連接");
}
// 默認最先調用復寫的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "對Error事件作出響應");
}
@Override
public void onComplete() {
Log.d(TAG, "對Complete事件作出響應");
}
});
五種被觀察者
Observable Observable即被觀察者,決定什么時候觸發事件以及觸發怎樣的事件
Flowable Flowable可以看成是Observable的實現,只是它支持背壓
Single 只有onSuccess可onError事件,只能用onSuccess發射一個數據或一個錯誤通知,之后再發射數據也不會做任何處理,直接忽略
Completable 只有onComplete和onError事件,不發射數據,沒有map,flatMap操作符。常常結合andThen操作符使用
Maybe 沒有onNext方法,同樣需要onSuccess發射數據,且只能發射0或1個數據,多發也不再處理
/**
* Observable --- 被觀察者
* create ---操作符
* ObservableEmitter --- 發射器向觀察者發送事件
*/
Observable objectObservable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter emitter) throws Exception {
emitter.onNext("Observable");
emitter.onComplete();
}
});
// Flowable被觀察者(背壓)的創建
Flowable objectFlowable = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
}
}, BackpressureStrategy.BUFFER);
//Single 被觀察者
Single.create(new SingleOnSubscribe() {
@Override
public void subscribe(SingleEmitter emitter) throws Exception {
}
}).subscribe(new SingleObserver() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Object o) {
}
@Override
public void onError(Throwable e) {
}
});
//Completable 被觀察者
Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
}
});
//Maybe 被觀察者
Maybe.create(new MaybeOnSubscribe() {
@Override
public void subscribe(MaybeEmitter emitter) throws Exception {
}
});
五種被觀察者可通過toObservable,toFlowable,toSingle,toCompletable,toMaybe相互轉換
操作符
1.創建操作符
create() : 創建一個被觀察者
just() : 創建一個被觀察者,并發送事件,發送的事件不可以超過10個以上
fromArray() : 這個方法和 just() 類似,只不過 fromArray 可以傳入多于10個的變量,并且可以傳入一個數組
fromCallable() : 這里的 Callable 是 java.util.concurrent 中的 Callable,Callable 和 Runnable 的用法基本一致,只是它會返回一個結果值,這個結果值就是發給觀察者的
fromFuture() : 參數中的 Future 是 java.util.concurrent 中的 Future,Future 的作用是增加了 cancel() 等方法操作 Callable,它可以通過 get() 方法來獲取 Callable 返回的值
fromIterable() : 直接發送一個 List 集合數據給觀察者
defer() : 這個方法的作用就是直到被觀察者被訂閱后才會創建被觀察者。
timer() : 當到指定時間后就會發送一個 0L 的值給觀察者。
interval() : 每隔一段時間就會發送一個事件,這個事件是從0開始,不斷增1的數字。
intervalRange() : 可以指定發送事件的開始值和數量,其他與 interval() 的功能一樣。
range() : 同時發送一定范圍的事件序列。
rangeLong() :作用與 range() 一樣,只是數據類型為 Long
empty() : 直接發送 onComplete() 事件
never():不發送任何事件
error():發送 onError() 事件
Observable.just(1,2,3).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object integer) {
System.out.println("just===" + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2,轉換操作符
map() :map 可以將被觀察者發送的數據類型轉變成其他的類型
flatMap(): 這個方法可以將事件序列中的元素進行整合加工,返回一個新的被觀察者。
concatMap() :concatMap() 和 flatMap() 基本上是一樣的,只不過 concatMap() 轉發出來的事件是有序的,而 flatMap() 是無序的
buffer() : 從需要發送的事件當中獲取一定數量的事件,并將這些事件放到緩沖區當中一并發出
groupBy(): 將發送的數據進行分組,每個分組都會返回一個被觀察者
scan() :將數據以一定的邏輯聚合起來
window() :發送指定數量的事件時,就將這些事件分為一組。window 中的 count 的參數就是代表指定的數量,例如將 count 指定為2,那么每發2個數據就會將這2個數據分成一組。
Observable.just(1,2,3,4,5,6)
.map(new Function() {
@Override
public String apply(Integer value) throws Exception {
//將integer轉化成String
return "aa"+value;
}
}).subscribe(new Consumer() {
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
3,組合操作符
concat() :可以將多個觀察者組合在一起,然后按照之前發送順序發送事件。需要注意的是,concat() 最多只可以發送4個事件。
concatArray() : 與 concat() 作用一樣,不過 concatArray() 可以發送多于 4 個被觀察者。
merge() : 這個方法月 concat() 作用基本一樣,知識 concat() 是串行發送事件,而 merge() 并行發送事件。
zip() :會將多個被觀察者合并,根據各個被觀察者發送事件的順序一個個結合起來,最終發送的事件數量會與源 Observable 中最少事件的數量一樣。
reduce() :與 scan() 操作符的作用也是將發送數據以一定邏輯聚合起來,這兩個的區別在于 scan() 每處理一次數據就會將事件發送給觀察者,而 reduce() 會將所有數據聚合在一起才會發送事件給觀察者。
collect() : 將數據收集到數據結構當中
count() :返回被觀察者發送事件的數量。
startWith() & startWithArray() : 在發送事件之前追加事件,startWith() 追加一個事件,startWithArray() 可以追加多個事件。追加的事件會先發出。
combineLatest() & combineLatestDelayError() :combineLatest() 的作用與 zip() 類似,但是 combineLatest() 發送事件的序列是與發送的時間線有關的,當 combineLatest() 中所有的 Observable 都發送了事件,只要其中有一個 Observable 發送事件,這個事件就會和其他 Observable 最近發送的事件結合起來發送
concatArrayDelayError() & mergeArrayDelayError() : 在 concatArray() 和 mergeArray() 兩個方法當中,如果其中有一個被觀察者發送了一個 Error 事件,那么就會停止發送事件,如果你想 onError() 事件延遲到所有被觀察者都發送完事件后再執行的話,就可以使用 concatArrayDelayError() 和 mergeArrayDelayError()
Observable.concat(Observable.just(1,2),Observable.just(5,6),
Observable.just(3,4),Observable.just(7,8)).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
4,功能操作符
delay()
延遲一段時間發送事件。
doOnEach()
Observable 每發送一件事件之前都會先回調這個方法。
doOnNext()
Observable 每發送 onNext() 之前都會先回調這個方法。
doAfterNext()
Observable 每發送 onNext() 之后都會回調這個方法。
doOnComplete()
Observable 每發送 onComplete() 之前都會回調這個方法。
doOnError()
Observable 每發送 onError() 之前都會回調這個方法。
doOnSubscribe()
Observable 每發送 onSubscribe() 之前都會回調這個方法。
doOnDispose()
當調用 Disposable 的 dispose() 之后回調該方法
doOnLifecycle()
在回調 onSubscribe 之前回調該方法的第一個參數的回調方法,可以使用該回調方法決定是否取消訂閱
doOnTerminate() & doAfterTerminate()
doOnTerminate 是在 onError 或者 onComplete 發送之前回調,而 doAfterTerminate 則是 onError 或者 onComplete 發送之后回調
doFinally()
在所有事件發送完畢之后回調該方法。
onErrorReturn()
當接受到一個 onError() 事件之后回調,返回的值會回調 onNext() 方法,并正常結束該事件序列
onErrorResumeNext()
當接收到 onError() 事件時,返回一個新的 Observable,并正常結束事件序列
onExceptionResumeNext()
與 onErrorResumeNext() 作用基本一致,但是這個方法只能捕捉 Exception。
retry()
如果出現錯誤事件,則會重新發送所有事件序列。times 是代表重新發的次數
retryWhen()
當被觀察者接收到異常或者錯誤事件時會回調該方法,這個方法會返回一個新的被觀察者。如果返回的被觀察者發送 Error 事件則之前的被觀察者不會繼續發送事件,如果發送正常事件則之前的被觀察者會繼續不斷重試發送事件
repeat()
重復發送被觀察者的事件,times 為發送次數
repeatWhen()
這個方法可以會返回一個新的被觀察者設定一定邏輯來決定是否重復發送事件。
subscribeOn()
指定被觀察者的線程,要注意的時,如果多次調用此方法,只有第一次有效。
observeOn()
指定觀察者的線程,每指定一次就會生效一次。
retryUntil()
出現錯誤事件之后,可以通過此方法判斷是否繼續發送事件。
Observable.just(1,2,3).delay(2, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe()");
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
5,過濾操作符
filter()
通過一定邏輯來過濾被觀察者發送的事件,如果返回 true 則會發送事件,否則不會發送
ofType()
可以過濾不符合該類型事件
skip()
跳過正序某些事件,count 代表跳過事件的數量
distinct()
過濾事件序列中的重復事件。
distinctUntilChanged()
過濾掉連續重復的事件
take()
控制觀察者接收的事件的數量。
debounce()
如果兩件事件發送的時間間隔小于設定的時間間隔則前一件事件就不會發送給觀察者。
firstElement() && lastElement()
firstElement() 取事件序列的第一個元素,lastElement() 取事件序列的最后一個元素。
elementAt() & elementAtOrError()
elementAt() 可以指定取出事件序列中事件,但是輸入的 index 超出事件序列的總數的話就不會出現任何結果。這種情況下,你想發出異常信息的話就用 elementAtOrError() 。
Observable.just(1,2,3).filter(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 3;
}
}).subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
6,條件操作符
takeWhile()
可以設置條件,當某個數據滿足條件時就會發送該數據,反之則不發送
skipWhile()
可以設置條件,當某個數據滿足條件時不發送該數據,反之則發送。
takeUntil()
可以設置條件,當事件滿足此條件時,下一次的事件就不會被發送了。
skipUntil()
當 skipUntil() 中的 Observable 發送事件了,原來的 Observable 才會發送事件給觀察者。
sequenceEqual()
判斷兩個 Observable 發送的事件是否相同。
isEmpty()
判斷事件序列是否為空。
amb()
amb() 要傳入一個 Observable 集合,但是只會發送最先發送事件的 Observable 中的事件,其余 Observable 將會被丟棄
defaultIfEmpty()
如果觀察者只發送一個 onComplete() 事件,則可以利用這個方法發送一個值
all()
判斷事件序列是否全部滿足某個事件,如果都滿足則返回 true,反之則返回 false。
contains()
判斷事件序列中是否含有某個元素,如果有則返回 true,如果沒有則返回 false。
Observable.just(1,2,3,4,5).all(new Predicate() {
@Override
public boolean test(Integer integer) throws Exception {
return integer < 5;
}
}).subscribe(new Consumer() {
@Override
public void accept(Boolean aBoolean) throws Exception {
System.out.println("accept()===" + aBoolean);
}
});
線程切換
1,RxJava線程控制(調度/切換)的作用是什么?
指定 被觀察者 (Observable) / 觀察者(Observer) 的工作線程類型。
2,為什么要進行RxJava線程控制(調度/切換)?
在 RxJava模型中,被觀察者 (Observable) / 觀察者(Observer)的工作線程 = 創建自身的線程
對于一般的需求場景,需要在子線程中實現耗時的操作;然后回到主線程實現 UI操作
應用到 RxJava模型中,可理解為:
被觀察者 (Observable) 在 子線程 中生產事件(如實現耗時操作等等)
觀察者(Observer)在 主線程 接收 & 響應事件(即實現UI操作)
3,實現方式
采用 RxJava內置的線程調度器( Scheduler ),即通過 功能性操作符subscribeOn() & observeOn()實現
subscribeOn
通過接收一個Scheduler參數,來指定對數據的處理運行在特定的線程調度器Scheduler上。若多次設定,則只有一次起作用。
observeOn
接收一個Scheduler參數,來指定下游操作運行在特定的線程調度器Scheduler上。若多次設定,每次均起作用。
Scheduler種類
類型
含義
Schedulers.io(?)
用于IO密集型的操作,例如讀寫SD卡文件,查詢數據庫,訪問網絡等,具有線程緩存機制,在此調度器接收到任務后,先檢查線程緩存池中,是否有空閑的線程,如果有,則復用,如果沒有則創建新的線程,并加入到線程池中,如果每次都沒有空閑線程使用,可以無上限的創建新線程。
Schedulers.newThread(?)
在每執行一個任務時創建一個新的線程,不具有線程緩存機制,因為創建一個新的線程比復用一個線程更耗時耗力,雖然使用Schedulers.io(?)的地方,都可以使用Schedulers.newThread(?),但是,Schedulers.newThread(?)的效率沒有Schedulers.io(?)高。
Schedulers.computation():
用于CPU 密集型計算任務,即不會被 I/O 等操作限制性能的耗時操作,例如xml,json文件的解析,Bitmap圖片的壓縮取樣等,具有固定的線程池,大小為CPU的核數。不可以用于I/O操作,因為I/O操作的等待時間會浪費CPU。
Schedulers.trampoline():
在當前線程立即執行任務,如果當前線程有任務在執行,則會將其暫停,等插入進來的任務執行完之后,再將未完成的任務接著執行。
Schedulers.single():
擁有一個線程單例,所有的任務都在這一個線程中執行,當此線程中有任務執行時,其他任務將會按照先進先出的順序依次執行。
Scheduler.from(@NonNull Executor executor):
指定一個線程調度器,由此調度器來控制任務的執行策略。
AndroidSchedulers.mainThread():
在Android UI線程中執行任務,為Android開發定制。
具體使用
// Observable.subscribeOn(Schedulers.Thread):指定被觀察者 發送事件的線程(傳入RxJava內置的線程類型)
// Observable.observeOn(Schedulers.Thread):指定觀察者 接收 & 響應事件的線程(傳入RxJava內置的線程類型)
// 通過訂閱(subscribe)連接觀察者和被觀察者
observable.subscribeOn(Schedulers.newThread()) // 1. 指定被觀察者 生產事件的線程
.observeOn(AndroidSchedulers.mainThread()) // 2. 指定觀察者 接收 & 響應事件的線程
.subscribe(observer); // 3. 最后再通過訂閱(subscribe)連接觀察者和被觀察者
背壓
1,出現原因
當上下游在不同的線程中,通過Observable發射,處理,響應數據流時,如果上游發射數據的速度快于下游接收處理數據的速度,這樣對于那些沒來得及處理的數據就會造成積壓,這些數據既不會丟失,也不會被垃圾回收機制回收,而是存放在一個異步緩存池中,如果緩存池中的數據一直得不到處理,越積越多,最后就會造成內存溢出,這便是響應式編程中的背壓(backpressure)問題
944365-a8ca5dd7f71bd781.webp.jpg
2,解決方法
使用BackpressureStrategy背壓策略
944365-37ae2f5f93d9326c.webp.jpg
RxJava2.0實施背壓策略后與RxJava1.0未實施對比
944365-c01363ed15386193.webp.jpg
背壓的具體實現:Flowable
944365-ceca5a724ce25985.webp.jpg
與 RxJava1.0 中被觀察者的舊實現 Observable 的關系
944365-025e8828a7dd1fd9.webp.jpg
Flowable的基礎使用非常類似于Observable
/**
* 步驟1:創建被觀察者 = Flowable
*/
Flowable upstream = Flowable.create(new FlowableOnSubscribe() {
@Override
public void subscribe(FlowableEmitter emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}, BackpressureStrategy.ERROR);
// 需要傳入背壓參數BackpressureStrategy,下面會詳細說明
/**
* 步驟2:創建觀察者 = Subscriber
*/
Subscriber downstream = new Subscriber() {
@Override
public void onSubscribe(Subscription s) {
// 對比Observer傳入的Disposable參數,Subscriber此處傳入的參數 = Subscription
// 相同點:Subscription具備Disposable參數的作用,即Disposable.dispose()切斷連接, 同樣的調用Subscription.cancel()切斷連接
// 不同點:Subscription增加了void request(long n)
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
// 關于request()下面會繼續詳細說明
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
/**
* 步驟3:建立訂閱關系
*/
upstream.subscribe(downstream);
BackpressureStrategy背壓參數
策略
意義
MISSING
MissingEmitter
在此策略下,通過Create方法創建的Flowable相當于沒有指定背壓策略,不會對通過onNext發射的數據做緩存或丟棄處理,需要下游通過背壓操作符
ERROR
ErrorAsyncEmitter
在此策略下,如果放入Flowable的異步緩存池中的數據超限了,則會拋出MissingBackpressureException異常
BUFFER
BufferAsyncEmitter
部維護了一個緩存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默認的異步緩存池滿了,會通過此緩存池暫存數據,它與Observable的異步緩存池一樣,可以無限制向里添加數據,不會拋出MissingBackpressureException異常,但會導致OOM
DROP
DropAsyncEmitter
在此策略下,如果Flowable的異步緩存池滿了,會丟掉上游發送的數據
LATEST
LatestAsyncEmitter
與Drop策略一樣,如果緩存池滿了,會丟掉將要放入緩存池中的數據,不同的是,不管緩存池的狀態如何,LATEST都會將最后一條數據強行放入緩存池中,來保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發射的一條數據
Subscription
響應式拉取方式,來設置下游對數據的請求數量,上游可以根據下游的需求量,按需發送數據,如果不顯示調用request()則默認下游的需求量為零,所以運行上面的代碼后,上游Flowable發射的數據不會交給下游Subscriber處理。
總結
以上是生活随笔為你收集整理的android rxjava2 简书,RXJava2学习的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: js将中文转换成编码 java解析_JS
- 下一篇: 【NLP】文本分类还停留在BERT?对偶