RxJava2 源码解析(一)
概述
最近事情太多了,現在公司內部的變動,自己崗位的變化,以及最近決定找工作。所以博客耽誤了,準備面試中,打算看一看RxJava2的源碼,遂有了這篇文章。
不會對RxJava2的源碼逐字逐句的閱讀,只尋找關鍵處,我們平時接觸得到的那些代碼。
背壓實際中接觸較少,故只分析了Observable.
分析的源碼版本為:2.0.1
我們的目的:
??? 知道源頭(Observable)是如何將數據發送出去的。
??? 知道終點(Observer)是如何接收到數據的。
??? 何時將源頭和終點關聯起來的
??? 知道線程調度是怎么實現的
??? 知道操作符是怎么實現的
本文先達到目的1 ,2 ,3。
我個人認為主要還是適配器模式的體現,我們接觸的就只有Observable和Observer,其實內部有大量的中間對象在適配:將它們兩聯系起來,加入一些額外功能,例如考慮dispose和hook等。
從create開始。
這是一段不涉及操作符和線程切換的簡單例子:
??????? Observable.create(new ObservableOnSubscribe<String>() {
??????????? @Override
??????????? public void subscribe(ObservableEmitter<String> e) throws Exception {
??????????????? e.onNext("1");
??????????????? e.onComplete();
??????????? }
??????? }).subscribe(new Observer<String>() {
??????????? @Override
??????????? public void onSubscribe(Disposable d) {
??????????????? Log.d(TAG, "onSubscribe() called with: d = [" + d + "]");
??????????? }
??????????? @Override
??????????? public void onNext(String value) {
??????????????? Log.d(TAG, "onNext() called with: value = [" + value + "]");
??????????? }
??????????? @Override
??????????? public void onError(Throwable e) {
??????????????? Log.d(TAG, "onError() called with: e = [" + e + "]");
??????????? }
??????????? @Override
??????????? public void onComplete() {
??????????????? Log.d(TAG, "onComplete() called");
??????????? }
??????? });
拿 create來說,
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
??????? //.....
??????? return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
??? }
??
返回值是Observable,參數是ObservableOnSubscribe,定義如下:
public interface ObservableOnSubscribe<T> {
??? void subscribe(ObservableEmitter<T> e) throws Exception;
}
?
ObservableOnSubscribe是一個接口,里面就一個方法,也是我們實現的那個方法:
該方法的參數是 ObservableEmitter,我認為它是關聯起 Disposable概念的一層:
public interface ObservableEmitter<T> extends Emitter<T> {
??? void setDisposable(Disposable d);
??? void setCancellable(Cancellable c);
??? boolean isDisposed();
??? ObservableEmitter<T> serialize();
}
?
ObservableEmitter也是一個接口。里面方法很多,它也繼承了 Emitter<T> 接口。
public interface Emitter<T> {
??? void onNext(T value);
??? void onError(Throwable error);
??? void onComplete();
}
??
Emitter<T>定義了 我們在ObservableOnSubscribe中實現subscribe()方法里最常用的三個方法。
好,我們回到原點,create()方法里就一句話return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));,其中提到RxJavaPlugins.onAssembly():
??? /**
???? * Calls the associated hook function.
???? * @param <T> the value type
???? * @param source the hook's input value
???? * @return the value returned by the hook
???? */
??? @SuppressWarnings({ "rawtypes", "unchecked" })
??? public static <T> Observable<T> onAssembly(Observable<T> source) {
??????? Function<Observable, Observable> f = onObservableAssembly;
??????? if (f != null) {
??????????? return apply(f, source);
??????? }
??????? return source;
??? }
?
可以看到這是一個關于hook的方法,關于hook我們暫且不表,不影響主流程,我們默認使用中都沒有hook,所以這里就是直接返回source,即傳入的對象,也就是new ObservableCreate<T>(source).
ObservableCreate我認為算是一種適配器的體現,create()需要返回的是Observable,而我現在有的是(方法傳入的是)ObservableOnSubscribe對象,ObservableCreate將ObservableOnSubscribe適配成Observable。
其中subscribeActual()方法表示的是被訂閱時真正被執行的方法,放后面解析:
public final class ObservableCreate<T> extends Observable<T> {
??? final ObservableOnSubscribe<T> source;
??? public ObservableCreate(ObservableOnSubscribe<T> source) {
??????? this.source = source;
??? }
??? @Override
??? protected void subscribeActual(Observer<? super T> observer) {
??????? CreateEmitter<T> parent = new CreateEmitter<T>(observer);
??????? observer.onSubscribe(parent);
??????? try {
??????????? source.subscribe(parent);
??????? } catch (Throwable ex) {
??????????? Exceptions.throwIfFatal(ex);
??????????? parent.onError(ex);
??????? }
??? }
??
OK,至此,創建流程結束,我們得到了Observable<T>對象,其實就是ObservableCreate<T>.
到訂閱subscribe 結束
subscribe():
??? public final void subscribe(Observer<? super T> observer) {
??????? ...
??????? try {
??????????? //1 hook相關,略過
??????????? observer = RxJavaPlugins.onSubscribe(this, observer);
??????????? ...
??????????? //2 真正的訂閱處
??????????? subscribeActual(observer);
??????? } catch (NullPointerException e) { // NOPMD
??????????? throw e;
??????? } catch (Throwable e) {
??????????? //3 錯誤處理,
??????????? Exceptions.throwIfFatal(e);
??????????? // can't call onError because no way to know if a Disposable has been set or not
??????????? // can't call onSubscribe because the call might have set a Subscription already
??????????? //4 hook錯誤相關,略過
??????????? RxJavaPlugins.onError(e);
??????????? NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
??????????? npe.initCause(e);
??????????? throw npe;
??????? }
??? }
?
關于hook的代碼:
可以看到如果沒有hook,即相應的對象是null,則是傳入什么返回什么的。
??? /**
???? * Calls the associated hook function.
???? * @param <T> the value type
???? * @param source the hook's input value
???? * @param observer the observer
???? * @return the value returned by the hook
???? */
??? @SuppressWarnings({ "rawtypes", "unchecked" })
??? public static <T> Observer<? super T> onSubscribe(Observable<T> source, Observer<? super T> observer) {
??????? //1 默認onObservableSubscribe(可理解為一個flatmap的操作)是null
??????? BiFunction<Observable, Observer, Observer> f = onObservableSubscribe;
??????? //2 所以這句跳過,不會對其進行apply
??????? if (f != null) {
??????????? return apply(f, source, observer);
??????? }
??????? //3 返回參數2
??????? return observer;
??? }
?
我也是驗證了一下 三個Hook相關的變量,確實是null:
??????? Consumer<Throwable> errorHandler = RxJavaPlugins.getErrorHandler();
??????? BiFunction<Observable, Observer, Observer> onObservableSubscribe = RxJavaPlugins.getOnObservableSubscribe();
??????? Function<Observable, Observable> onObservableAssembly = RxJavaPlugins.getOnObservableAssembly();
??????? Log.e(TAG, "errorHandler = [" + errorHandler + "]");
??????? Log.e(TAG, "onObservableSubscribe = [" + onObservableSubscribe + "]");
??????? Log.e(TAG, "onObservableAssembly = [" + onObservableAssembly + "]");
?
所以訂閱時的重點就是:
??????????? //2 真正的訂閱處
??????????? subscribeActual(observer);
?
我們將第一節提到的ObservableCreate里的subscribeActual()方法拿出來看看:
??? @Override
??? protected void subscribeActual(Observer<? super T> observer) {
??????? //1 創建CreateEmitter,也是一個適配器
??????? CreateEmitter<T> parent = new CreateEmitter<T>(observer);
??????? //2 onSubscribe()參數是Disposable ,所以CreateEmitter可以將Observer->Disposable 。還有一點要注意的是`onSubscribe()`是在我們執行`subscribe()`這句代碼的那個線程回調的,并不受線程調度影響。
??????? observer.onSubscribe(parent);
??????? try {
??????????? //3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯系起來
??????????? source.subscribe(parent);
??????? } catch (Throwable ex) {
??????????? Exceptions.throwIfFatal(ex);
??????????? //4 錯誤回調
??????????? parent.onError(ex);
??????? }
??? }
??
Observer是一個接口,里面就四個方法,我們在開頭的例子中已經全部實現(打印Log)。
public interface Observer<T> {
??? void onSubscribe(Disposable d);
??? void onNext(T value);
??? void onError(Throwable e);
??? void onComplete();
}
?
重點在這一句:
?//3 將ObservableOnSubscribe(源頭)與CreateEmitter(Observer,終點)聯系起來
??????????? source.subscribe(parent);
?
source即ObservableOnSubscribe對象,在本文中是:
??????? new ObservableOnSubscribe<String>() {
??????????? @Override
??????????? public void subscribe(ObservableEmitter<String> e) throws Exception {
??????????????? e.onNext("1");
??????????????? e.onComplete();
??????????? }
??????? }
??
則會調用parent.onNext() 和parent.onComplete(),parent是CreateEmitter對象,如下:
?static final class CreateEmitter<T>
??? extends AtomicReference<Disposable>
??? implements ObservableEmitter<T>, Disposable {
??????? final Observer<? super T> observer;
??????? CreateEmitter(Observer<? super T> observer) {
??????????? this.observer = observer;
??????? }
??????? @Override
??????? public void onNext(T t) {
??????????? ...
??????????? //如果沒有被dispose,會調用Observer的onNext()方法
??????????? if (!isDisposed()) {
??????????????? observer.onNext(t);
??????????? }
??????? }
??????? @Override
??????? public void onError(Throwable t) {
??????????? ...
??????????? //1 如果沒有被dispose,會調用Observer的onError()方法
??????????? if (!isDisposed()) {
??????????????? try {
??????????????????? observer.onError(t);
??????????????? } finally {
??????????????? //2 一定會自動dispose()
??????????????????? dispose();
??????????????? }
??????????? } else {
??????????? //3 如果已經被dispose了,會拋出異常。所以onError、onComplete彼此互斥,只能被調用一次
??????????????? RxJavaPlugins.onError(t);
??????????? }
??????? }
??????? @Override
??????? public void onComplete() {
???????? //1 如果沒有被dispose,會調用Observer的onComplete()方法
??????????? if (!isDisposed()) {
??????????????? try {
??????????????????? observer.onComplete();
??????????????? } finally {
???????????????? //2 一定會自動dispose()
??????????????????? dispose();
??????????????? }
??????????? }
??????? }
??????? @Override
??????? public void dispose() {
??????????? DisposableHelper.dispose(this);
??????? }
??????? @Override
??????? public boolean isDisposed() {
??????????? return DisposableHelper.isDisposed(get());
??????? }
??? }
?
總結重點:
??? Observable和Observer的關系沒有被dispose,才會回調Observer的onXXXX()方法
??? Observer的onComplete()和onError() 互斥只能執行一次,因為CreateEmitter在回調他們兩中任意一個后,都會自動dispose()。根據第一點,驗證此結論。
??? Observable和Observer關聯時(訂閱時),Observable才會開始發送數據。
??? ObservableCreate將ObservableOnSubscribe(真正的源)->Observable.
??? ObservableOnSubscribe(真正的源)需要的是發射器ObservableEmitter.
??? CreateEmitter將Observer->ObservableEmitter,同時它也是Disposable.
??? 先error后complete,complete不顯示。 反之會crash,感興趣的可以寫如下代碼驗證。
????? e.onNext("1");
????? //先error后complete,complete不顯示。 反之 會crash
????? //e.onError(new IOException("sb error"));
????? e.onComplete();
????? e.onError(new IOException("sb error"));
?
一個好玩的地方DisposableHelper
原本到這里,最簡單的一個流程我們算是搞清了。
還值得一提的是,DisposableHelper.dispose(this);
DisposableHelper很有趣,它是一個枚舉,這是利用枚舉實現了一個單例disposed state,即是否disposed,如果Disposable類型的變量的引用等于DISPOSED,則起點和終點已經斷開聯系。
其中大多數方法 都是靜態方法,所以isDisposed()方法的實現就很簡單,直接比較引用即可.
其他的幾個方法,和AtomicReference類攪基在了一起。
這是一個實現引用原子操作的類,對象引用的原子更新,常用方法如下:
//返回當前的引用。
V get()
//如果當前值與給定的expect引用相等,(注意是引用相等而不是equals()相等),更新為指定的update值。
boolean compareAndSet(V expect, V update)
//原子地設為給定值并返回舊值。
V getAndSet(V newValue)
??
OK,鋪墊完了我們看看源碼吧:
public enum DisposableHelper implements Disposable {
??? /**
???? * The singleton instance representing a terminal, disposed state, don't leak it.
???? */
??? DISPOSED
??? ;
??? public static boolean isDisposed(Disposable d) {
??????? return d == DISPOSED;
??? }
??? public static boolean dispose(AtomicReference<Disposable> field) {
??????? //1 通過斷點查看,默認情況下,field的值是"null",并非引用是null哦!大坑大坑大坑
??????? //但是current是null引用
??????? Disposable current = field.get();
??????? Disposable d = DISPOSED;
??????? //2 null不等于DISPOSED
??????? if (current != d) {
??????????? //3 field是DISPOSED了,current還是null
??????????? current = field.getAndSet(d);
??????????? if (current != d) {
??????????? //4 默認情況下 走不到這里,這里是在設置了setCancellable()后會走到。
??????????????? if (current != null) {
??????????????????? current.dispose();
??????????????? }
??????????????? return true;
??????????? }
??????? }
??????? return false;
??? }
?
總結
??? 在subscribeActual()方法中,源頭和終點關聯起來。
??? source.subscribe(parent);這句代碼執行時,才開始從發送ObservableOnSubscribe中利用ObservableEmitter發送數據給Observer。即數據是從源頭push給終點的。
??? CreateEmitter 中,只有Observable和Observer的關系沒有被dispose,才會回調Observer的onXXXX()方法
??? Observer的onComplete()和onError() 互斥只能執行一次,因為CreateEmitter在回調他們兩中任意一個后,都會自動dispose()。根據上一點,驗證此結論。
??? 先error后complete,complete不顯示。 反之會crash
??? 還有一點要注意的是onSubscribe()是在我們執行subscribe()這句代碼的那個線程回調的,并不受線程調度影響。
---------------------
https://blog.csdn.net/zxt0601/article/details/61614799
轉載于:https://www.cnblogs.com/ldq2016/p/10484588.html
總結
以上是生活随笔為你收集整理的RxJava2 源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ant 通配符
- 下一篇: codeforces 的一些数学题