RxJava学习入门
RxJava是什么
一個詞:異步。
RxJava 在 GitHub 主頁上的自我介紹是 “a library for composing asynchronous and event-based programs using observable sequences for the Java VM”(一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫)。這就是 RxJava ,概括得非常精準。
然而,對于初學者來說,這太難看懂了。因為它是一個『總結』,而初學者更需要一個『引言』。
其實, RxJava 的本質可以壓縮為異步這一個詞。說到根上,它就是一個實現(xiàn)異步操作的庫,而別的定語都是基于這之上的。
引入RxJava
在build.gradle在dependencies加入
dependencies {compile 'io.reactivex:rxandroid:1.1.0'compile 'io.reactivex:rxjava:1.1.0' }異步網(wǎng)絡請求:
場景:異步網(wǎng)絡請求一個User數(shù)據(jù),并在TextView展示。
平常代碼:
TextView textView = ...;Map<String, String> params = new HashMap<>(); params.put("user_id", userid);// 請求參數(shù)UserHttp client = new UserHttp(); client.post("http://server.com/user", params, new CallBack<String>() { // 異步請求@Overrideprotected void onSuccess(String result) { // 在UI線程回調// 返回的字符串(通常是一個json),解析成User對象User user = parse(result); textView.setText(user.getName());} });大概就是這樣子了吧,當然一般都會再封裝一下。
用RxJava大概是這樣子:
TextView textView = ...;Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(final Subscriber<? super String> subscriber) {// 下面subscribeOn(Schedulers.newThread()) 把這方法設定在新線程回調Map<String, String> params = new HashMap<>();params.put("user_id", userid);// 請求參數(shù)UserHttp client = new UserHttp();Response response = client.post("http://kkmike999.com/user", params);// 同步請求if (response.status == 200) { // 請求成功String result = response.getResult();subscriber.onNext(result);subscriber.onCompleted();} else {// 請求失敗subscriber.onError(new Throwable(response.getMessage()));}} }) .subscribeOn(Schedulers.newThread()) // 設置call(...)方法,在新線程回調;// 可封裝得更美觀 Observable<String> observable = UserHttp.create(userid); observable.observeOn(AndroidSchedulers.mainThread())// 讓下面onNext()、onError()、onComplete()在UI線程(主線程)回調.subscribe(new Subscriber<String>() {@Overridepublic void onNext(String result) { // 上面 subscriber.onNext(result)在這里回調// 返回的字符串(通常是一個json),解析成User對象User user = parse(result);textView.setText(user.getName());}@Overridepublic void onError(Throwable e) {} // 上面subscriber.onError(new Throwable(msg))在這里回調@Overridepublic void onCompleted() {} });雖然代碼增多了,RxJava 好在哪?就好在簡潔,好在那把什么復雜邏輯都能穿成一條線的簡潔。
API 介紹和原理簡析
RxJava 的異步實現(xiàn),是通過一種擴展的觀察者模式來實現(xiàn)的
RxJava 的觀察者模式
RxJava 有四個基本概念:Observable (可觀察者,即被觀察者)、 Observer (觀察者)、 subscribe (訂閱)、事件。Observable 和 Observer 通過 subscribe() 方法實現(xiàn)訂閱關系,從而 Observable 可以在需要的時候發(fā)出事件來通知 Observer。
與傳統(tǒng)觀察者模式不同, RxJava 的事件回調方法除了普通事件 onNext() (相當于 onClick() / onEvent())之外,還定義了兩個特殊的事件:onCompleted() 和 onError()。
- onCompleted(): 事件隊列完結。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規(guī)定,當不會再有新的 onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標志。
- onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
- 在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調用了其中一個,就不應該再調用另一個。
RxJava 的觀察者模式大致如下圖:
基本實現(xiàn)
基于以上的概念, RxJava 的基本實現(xiàn)主要有三點:
1) 創(chuàng)建 Observer
Observer 即觀察者,它決定事件觸發(fā)的時候將有怎樣的行為。 RxJava 中的 Observer 接口的實現(xiàn)方式:
Observer<String> observer = new Observer<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onNext(String s) {Log.d(tag, "Item: " + s);}@Overridepublic void onCompleted() {Log.d(tag, "Completed!");}@Overridepublic void onError(Throwable e) {Log.d(tag, "Error!");} };不僅基本使用方式一樣,實質上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
- onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發(fā)送之前被調用,可以用于做一些準備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認情況下它的實現(xiàn)為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行), onStart() 就不適用了,因為它總是在 subscribe 所發(fā)生的線程被調用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到。
- unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調用后,Subscriber 將不再接收事件。一般在這個方法調用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 unsubscribe() 這個方法很重要,因為在 subscribe() 之后, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調用 unsubscribe() 來解除引用關系,以避免內(nèi)存泄露的發(fā)生。
2) 創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable ,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {subscriber.onNext("Hello");subscriber.onNext("Hi");subscriber.onNext("Aloha");subscriber.onCompleted();} });可以看到,這里傳入了一個 OnSubscribe 對象作為參數(shù)。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調用,事件序列就會依照設定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber 將會被調用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調用了觀察者的回調方法,就實現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個例子很簡單:事件的內(nèi)容是字符串,而不是一些復雜的對象;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡請求的結果在請求返回之前是未知的);所有事件在一瞬間被全部發(fā)送出去,而不是夾雜一些確定或不確定的時間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的??傊?#xff0c;這個例子看起來毫無實用價值。但這是為了便于說明,實質上只要你想,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫。至于具體怎么做,后面都會講到,但現(xiàn)在不行。只有把基礎原理先說明白了,上層的運用才能更容易說清楚。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法。基于這個方法, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,例如:
just(T…): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha"); // 將會依次調用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();from(T[]) / from(Iterable
String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words); // 將會依次調用: // onNext("Hello"); // onNext("Hi"); // onNext("Aloha"); // onCompleted();上面 just(T…) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。
3) Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結起來,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer); // 或者: observable.subscribe(subscriber);有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關系。這讓人讀起來有點別扭,不過如果把 API 設計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設計就造成影響了,比較起來明顯是得不償失的。
Observable.subscribe(Subscriber) 的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關的代碼剔除后的核心代碼。 // 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。 public Subscription subscribe(Subscriber subscriber) {subscriber.onStart();onSubscribe.call(subscriber);return subscriber; }可以看到,subscriber() 做了3件事:
調用 Subscriber.onStart() 。這個方法在前面已經(jīng)介紹過,是一個可選的準備方法。
調用 Observable 中的 OnSubscribe.call(Subscriber) 。在這里,事件發(fā)送的邏輯開始運行。從這也可以看出,在 RxJava 中, Observable 并不是在創(chuàng)建的時候就立即開始發(fā)送事件,而是在它被訂閱的時候,即當 subscribe() 方法執(zhí)行的時候。
將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
整個過程中對象間的關系如下圖:
除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調,RxJava 會自動根據(jù)定義創(chuàng)建出 Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {// onNext()@Overridepublic void call(String s) {Log.d(tag, s);} }; Action1<Throwable> onErrorAction = new Action1<Throwable>() {// onError()@Overridepublic void call(Throwable throwable) {// Error handling} }; Action0 onCompletedAction = new Action0() {// onCompleted()@Overridepublic void call() {Log.d(tag, "completed");} };// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext() observable.subscribe(onNextAction); // 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError() observable.subscribe(onNextAction, onErrorAction); // 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted() observable.subscribe(onNextAction, onErrorAction, onCompletedAction);簡單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝對象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調。這樣其實也可以看做將 onCompleted() 方法作為參數(shù)傳進了 subscribe(),相當于其他某些語言中的『閉包』。 Action1 也是一個接口,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數(shù);與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的,因此 Action1 可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現(xiàn)不完整定義的回調。事實上,雖然 Action0 和 Action1 在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 過程中最終會被轉換成 Subscriber 對象,因此,從這里開始,后面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。
4) 場景示例
下面舉兩個例子:
為了把原理用更清晰的方式表述出來,本文中挑選的都是功能盡可能簡單的例子,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當你看到這種情況,不要覺得是因為 RxJava 太啰嗦,而是因為在過早的時候舉出真實場景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景。
a. 打印字符串數(shù)組
將字符串數(shù)組 names 中的所有字符串依次打印出來:
String[] names = ...; Observable.from(names).subscribe(new Action1<String>() {@Overridepublic void call(String name) {Log.d(tag, name);}});b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現(xiàn)異常的時候打印 Toast 報錯:
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<? super Drawable> subscriber) {Drawable drawable = getTheme().getDrawable(drawableRes));subscriber.onNext(drawable);subscriber.onCompleted();} }).subscribe(new Observer<Drawable>() {@Overridepublic void onNext(Drawable drawable) {imageView.setImageDrawable(drawable);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();} });正如上面兩個例子這樣,創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。非常簡單。
線程控制 —— Scheduler (一)
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調度器)。
1) Scheduler 的 API (一)
在RxJava 中,Scheduler ——調度器,相當于線程控制器,RxJava 通過它來指定每一段代碼應該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler ,它們已經(jīng)適合大多數(shù)的使用場景:
- Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。
- Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。 * subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。 * observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。
文字敘述總歸難理解,上代碼:
Observable.just(1, 2, 3, 4).subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發(fā)生在主線程.subscribe(new Action1<Integer>() {@Overridepublic void call(Integer number) {Log.d(tag, "number:" + number);}});上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3、4 將會在 IO 線程發(fā)出;而由于 observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。事實上,這種在 subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略。
而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:
int drawableRes = ...; ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() {@Overridepublic void call(Subscriber<? super Drawable> subscriber) {Drawable drawable = getTheme().getDrawable(drawableRes));subscriber.onNext(drawable);subscriber.onCompleted();} }) .subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調發(fā)生在主線程 .subscribe(new Observer<Drawable>() {@Overridepublic void onNext(Drawable drawable) {imageView.setImageDrawable(drawable);}@Overridepublic void onCompleted() {}@Overridepublic void onError(Throwable e) {Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();} });那么,加載圖片將會發(fā)生在 IO 線程,而設置圖片則被設定在了主線程。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓。
2) Scheduler 的原理 (一)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎么做到的?而且 subscribe() 不是最外層直接調用的方法嗎,它竟然也能被指定線程?)。然而 Scheduler 的原理需要放在后面講,因為它的原理是以下一節(jié)《變換》的原理作為基礎的。
好吧這一節(jié)其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。
變換
終于要到牛逼的地方了,不管你激動不激動,反正我是激動了。
RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。
1) API
首先看一個 map() 的例子:
Observable.just("images/logo.png") // 輸入類型 String.map(new Func1<String, Bitmap>() {@Overridepublic Bitmap call(String filePath) { // 參數(shù)類型 Stringreturn getBitmapFromPath(filePath); // 返回類型 Bitmap}}).subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) { // 參數(shù)類型 BitmapshowBitmap(bitmap);}});這里出現(xiàn)了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將參數(shù)中的 String 對象轉換成一個 Bitmap 對象后返回,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String 轉為了 Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
map(): 事件對象的直接變換,具體功能上面已經(jīng)介紹過。它是 RxJava 最常用的變換。 map() 的示意圖:
flatmap
這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設這么一種需求:假設有一個數(shù)據(jù)結構『學生』,現(xiàn)在需要打印出一組學生的名字。實現(xiàn)方式很簡單:
Student[] students = ...; Subscriber<String> subscriber = new Subscriber<String>() {@Overridepublic void onNext(String name) {Log.d(tag, name);}... }; Observable.from(students).map(new Func1<Student, String>() {@Overridepublic String call(Student student) {return student.getName();}}).subscribe(subscriber);很簡單。那么再假設:如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現(xiàn):
Student[] students = ...; Subscriber<Student> subscriber = new Subscriber<Student>() {@Overridepublic void onNext(Student student) {List<Course> courses = student.getCourses();for (int i = 0; i < courses.size(); i++) {Course course = courses.get(i);Log.d(tag, course.getName());}}... }; Observable.from(students).subscribe(subscriber);依然很簡單。那么如果我不想在 Subscriber 中使用 for 循環(huán),而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉化,而我現(xiàn)在的要求是一對多的轉化。那怎么才能把一個 Student 轉化成多個 Course 呢?
這個時候,就需要用 flatMap() 了:
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() {@Overridepublic void onNext(Course course) {Log.d(tag, course.getName());}... }; Observable.from(students).flatMap(new Func1<Student, Observable<Course>>() {@Overridepublic Observable<Course> call(Student student) {return Observable.from(student.getCourses());}}).subscribe(subscriber);從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉化之后返回另一個對象。但需要注意,和 map() 不同的是, flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調方法中。 flatMap() 的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調方法。這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
flatMap() 示意圖:
3) 延伸:doOnSubscribe()
然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在 subscribe() 發(fā)生時就被調用了,因此不能指定線程,而是只能執(zhí)行在 subscribe() 被調用時的線程。這就導致如果 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執(zhí)行),將會有線程非法的風險,因為有時你無法預測 subscribe() 將會在什么線程執(zhí)行。
而與 Subscriber.onStart() 相對應的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在 subscribe() 調用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
進度條示例代碼:
ProgressDialog progress = ...;....observeOn(AndroidSchedulers.mainThread()) // 規(guī)定Subscriber在主線程回調.doOnSubscribe(new Action0() { // 主線程@Overridepublic void call() {progress.show();}}).doOnCompleted(new Action0() { // 主線程@Overridepublic void call() {progress.dismiss();}}).subscribe(new Subscriber<List<String>>() { // 主線程@Overridepublic void onNext(List<String> strings) {...}@Overridepublic void onComplete() {...}}參考鏈接
給 Android 開發(fā)者的 RxJava 詳解
RxJava快速入門 - 簡書
總結
以上是生活随笔為你收集整理的RxJava学习入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【每日SQL打卡】
- 下一篇: 【每日SQL打卡】