Rxjava2自我·解惑
最近項目升級,從Rxjava1升級到了Rxjava2(之前一直想升級,但是去年項目實在多。。不敢造次。。),發現變化挺大的,并且最近逐漸有點強迫癥,受不了只會用API的自己。所以,廢話少說,看代碼吧。本人很菜,寫出來只是為了給自己一個交代,勿噴。 首先,不能一頭扎進入,先定個目標。我們看的目的是什么:
- Rxjava 訂閱和終止訂閱的過程(Observable,Observer,Consumer,just,create等等)
- Rxjava操作符原理(map,flatMap)
- Rxjava線程調度原理(subscribeOn、observeOn、io、main)
Rxjava 訂閱過程
這里以Observable.just為例:
Observable.just(1).subscribe(new Observer<Integer>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(Integer integer) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}}); 復制代碼讓我們以圖片流的形式開始吧:
ScalarDisposable其實就是個線程,你們應該也猜到。
7.run方法 好了,整個過程就這么完成了。 一臉懵逼。。。。 我們繼續看看Observable.create吧: Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {}}).subscribe(new Observer<String>() {@Overridepublic void onSubscribe(Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {}}); 復制代碼主要是一些取消的方法,它還有個父類Emitter:
public interface Emitter<T> {/*** Signal a normal value.* @param value the value to signal, not null*/void onNext(@NonNull T value);/*** Signal a Throwable exception.* @param error the Throwable to signal, not null*/void onError(@NonNull Throwable error);/*** Signal a completion.*/void onComplete(); } 復制代碼這些方法再熟悉不過了。。。。 7. OK,這個類就這樣看完了,接下來看它的下一個方法:
哦,接下來調用onSubscribe方法。 咦,這不就是我們傳遞進來的觀察者嗎? 慢著,慢著,有點眉目了,如果沒猜錯,應該是這樣的:observable 調用subscribe()后,最終會調用Observable.create(T)中T的subscribe()方法,在subscribe中,如果我們完成了,會調用emitter.onNext方法,如下,應該很多人都是這么寫的:
@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//....N個邏輯后e.onNext("最終結果");}}) 復制代碼此時,由于CreateEmitter是持有我們的觀察者observer的,如圖:
如果,沒猜錯,它就是個代理,實際上它會調用observer.onNext()方法。驗證一下吧。 嗯,的確如此。 好了,訂閱的過程解決了。 總結就是:Observable.xx方法會生成一個ObservableXX類,之后,當調用Observable.subscribe時,會去調用ObservableXX類的subscribeActual方法,這個方法會調用我們在外面實現observable的方法,并把包裝類傳遞過去。當我們發送事件時,會調用包裝類的onNext等方法,然后這個包裝類會幫我們調用傳遞進去的觀察者的onNext等方法。(onError,onComplete同理,只不過我沒截圖)
接下來,取消訂閱過程。堅持一下,就快結束第一部分了。
終止訂閱的過程
Disposable disposable = Observable.create((ObservableOnSubscribe<String>) e -> {//....N個邏輯后e.onNext("最終結果");}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {}});disposable.dispose(); 復制代碼在終止訂閱時,執行dispose()會將一個單例對象DISPOSED,賦給當前的Disposable對象,由于枚舉成員本質是靜態常量,所以isDisposed(Disposable d)方法也只需要判斷當前對象的引用是否是DISPOSED即可。
Rxjava 操作符
先從map入手:
RxJava 線程調度原理
代碼如下:
Observable.create(new ObservableOnSubscribe<String>() {@Overridepublic void subscribe(ObservableEmitter<String> e) throws Exception {//....N個邏輯后e.onNext("最終結果");}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {}}); 復制代碼照理圖片流。。
看樣子,好像是個空實現。。。。好吧
s.onSubscribe(parent)//暫時啥也沒做,如果我們沒有在外面實現onSubscribe方法的話。 復制代碼看一下~ SubscribeTask:
是一個線程,當啟動時,調用上一個observable的subscribe方法。我們知道,最后就是調用observable的subscribeActual。 對我們這段代碼來說,這里的observable就是ObservableCreate。從這里可知,的確調用是自下而上調用的。
點擊:
@NonNullpublic abstract Worker createWorker(); 復制代碼抽象的,得看實現類:
OK。不要陷入太深,知道創建的worker是什么東西之后我們看下一句。 DisposeTask task = new DisposeTask(decoratedRun, w);w.schedule(task, delay, unit); 復制代碼new 了一個什么task后,調用了worker的schedule方法,這個 worker我們知道了,那我們去看它的schedule方法。
從線程池中取了一個threadWorker,然后調用scheduleActual方法: 這里的executor是什么應該不難猜到,上一下代碼: public class NewThreadWorker extends Scheduler.Worker implements Disposable {private final ScheduledExecutorService executor;volatile boolean disposed;public NewThreadWorker(ThreadFactory threadFactory) {executor = SchedulerPoolFactory.create(threadFactory);} 復制代碼create()方法:
/*** Creates a ScheduledExecutorService with the given factory.* @param factory the thread factory* @return the ScheduledExecutorService*/public static ScheduledExecutorService create(ThreadFactory factory) {final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);//創建了個線程池tryPutIntoPool(PURGE_ENABLED, exec);return exec;} 復制代碼總的來說就是: 把Observable包裝成任務,然后交給線程池調度執行。 總結:
- subscribeOn只生效一次的原因就是因為, 訂閱過程是自下而上的,當調用subscribe時,會一級一級向上調用上一級observable的方法,此時,最頂層的(最上游,最開始傳遞的)ObservableSubscribeOn會先被調用,從而確定了任務執行所在的線程,此后設置的線程都將無效。
- 訂閱過程是自下而上的,然后數據發射事件,再自上而下傳遞數據。
總結
以上是生活随笔為你收集整理的Rxjava2自我·解惑的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [译] 如何在东南亚拓展您的应用业务
- 下一篇: 【后台任务】将工作请求发送到后台服务(1