响应式编程RxJava (一)
1.什么是RxJava? 1.1什么是響應式編程? 是一種基于異步數據流概念的編程模式(異步數據流編程) 數據流 ->河流(被觀測、被過濾、被操作)
1.2響應式編程的設計原則是: 保持數據的不變性 沒有共享 阻塞是有害的
1.3在我們的Java里面提供了解決方案 - RxJava? RxJava:Reactive Extensions Java(Java響應式編程) 響應式編程最初誕生.Net里面 iOS開發中也有響應式編程(block)
// 傳統寫法:加載文件 // new Thread() { // @Override // public void run() { // super.run(); // for (File folder : folders) { // File[] files = folder.listFiles(); // for (File file : files) { // if (file.getName().endsWith(".png")) { // final Bitmap bitmap = getBitmapFromFile(file); // // 更新UI線程 // runOnUiThread(new Runnable() { // @Override // public void run() { // imageCollectorView.addImage(bitmap); // } // }); // } // } // } // } // }.start(); 復制代碼RxJava寫法
File[] folders = new File[10];Observable.from(folders)//便利.flatMap(new Func1<File, Observable<File>>() {@Overridepublic Observable<File> call(File file) {return Observable.from(file.listFiles());}})//過濾.filter(new Func1<File, Boolean>() {@Overridepublic Boolean call(File file) {//條件return file.getName().endsWith(".png");}})//加載圖片.map(new Func1<File, Bitmap>() {@Overridepublic Bitmap call(File file) {return getBitmapFromFile(file);}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())//更新UI.subscribe(new Action1<Bitmap>() {@Overridepublic void call(Bitmap bitmap) {imageCollectorView.addImage(bitmap);}}); 復制代碼文件數組 flatMap:相當于我們手動的起嵌套循環 隊列數據結構 你會發現以下這個簡單的案例有哪些優勢 第一點:你不需要考慮線程問題 第二點:你不要關心如何更新UI線程,如何調用
2.RxJava整體架構設計?
整體架構設計 -> 主要觀察者模式同時里面還采用其他的設計模式 代理模式、迭代器模式、Builder設計模式(構建者模式)整體RxJava框架,角色劃分:Observable :被觀察者Observer : 觀察者Subscrible : 訂閱Subjects : 科目Observable 和 Subjects 是兩個“生產“實體,Observer和Subscrible是兩個“消費”實體熱Observables 和冷Observables從發射物的角度來看,有兩種不同的Observables:熱的和冷的。一個“熱”的Observable典型的只要一創建完就開始發射數據。因此所有后續訂閱它的觀察者可能從序列中間得某個位置開始接收數據(有一些數據錯過了)。一個“冷”的Observable會一直等待,知道由觀察者訂閱它才開始發射數據,因此這個觀察者可以確保會收到整個數據序列。熱和冷熱:主動場景:容器中目前只有一個觀察者,向所有的觀察者發送3條數據,因為熱Observables一旦創建就立馬發送消息,假設我現在發送到了第二條數據,突然之后增加了一個觀察者,這個時候,第二個觀察者就收不到之前的消息。 冷:被動場景:容器中目前只有1個觀察者,因為冷Observables一旦創建就會等待觀察者訂閱,一定有觀察者訂閱了,我立馬將所有的消息發送給這個觀察者(訂閱人) 復制代碼3.RxJava基本API? 第一個案例:如何創建Observables?
subscribe 相關源碼:
public final Subscription subscribe(final Observer<? super T> observer) {if (observer instanceof Subscriber) {return subscribe((Subscriber<? super T>)observer);}if (observer == null) {throw new NullPointerException("observer is null");}return subscribe(new ObserverSubscriber<T>(observer));}static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {// validate and proceedif (subscriber == null) {throw new IllegalArgumentException("subscriber can not be null");}if (observable.onSubscribe == null) {throw new IllegalStateException("onSubscribe function can not be null.");/** the subscribe function can also be overridden but generally that's not the appropriate approach* so I won't mention that in the exception*/}// new Subscriber so onStart itsubscriber.onStart();/** See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls* to user code from within an Observer"*/// if not already wrappedif (!(subscriber instanceof SafeSubscriber)) {// assign to `observer` so we return the protected versionsubscriber = new SafeSubscriber<T>(subscriber);}// The code below is exactly the same an unsafeSubscribe but not used because it would// add a significant depth to already huge call stacks.try {// allow the hook to intercept and/or decorateRxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);return RxJavaHooks.onObservableReturn(subscriber);} catch (Throwable e) {// special handling for certain Throwable/Error/Exception typesExceptions.throwIfFatal(e);// in case the subscriber can't listen to exceptions anymoreif (subscriber.isUnsubscribed()) {RxJavaHooks.onError(RxJavaHooks.onObservableError(e));} else {// if an unhandled error occurs executing the onSubscribe we will propagate ittry {subscriber.onError(RxJavaHooks.onObservableError(e));} catch (Throwable e2) {Exceptions.throwIfFatal(e2);// if this happens it means the onError itself failed (perhaps an invalid function implementation)// so we are unable to propagate the error correctly and will just throwRuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);// TODO could the hook be the cause of the error in the on error handling.RxJavaHooks.onObservableError(r);// TODO why aren't we throwing the hook's return value.throw r; // NOPMD}}return Subscriptions.unsubscribed();}}public class SafeSubscriber<T> extends Subscriber<T> {private final Subscriber<? super T> actual;boolean done;public SafeSubscriber(Subscriber<? super T> actual) {super(actual);this.actual = actual;}/*** Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.* <p>* The {@code Observable} will not call this method if it calls {@link #onError}.*/@Overridepublic void onCompleted() {if (!done) {done = true;try {actual.onCompleted();} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);RxJavaHooks.onError(e);throw new OnCompletedFailedException(e.getMessage(), e);} finally { // NOPMDtry {// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken// and we throw an UnsubscribeFailureException.unsubscribe();} catch (Throwable e) {RxJavaHooks.onError(e);throw new UnsubscribeFailedException(e.getMessage(), e);}}}}/*** Notifies the Subscriber that the {@code Observable} has experienced an error condition.* <p>* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or* {@link #onCompleted}.** @param e* the exception encountered by the Observable*/@Overridepublic void onError(Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwIfFatal(e);if (!done) {done = true;_onError(e);}}/*** Provides the Subscriber with a new item to observe.* <p>* The {@code Observable} may call this method 0 or more times.* <p>* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or* {@link #onError}.** @param t* the item emitted by the Observable*/@Overridepublic void onNext(T t) {try {if (!done) {actual.onNext(t);}} catch (Throwable e) {// we handle here instead of another method so we don't add stacks to the frame// which can prevent it from being able to handle StackOverflowExceptions.throwOrReport(e, this);}}/*** The logic for {@code onError} without the {@code isFinished} check so it can be called from within* {@code onCompleted}.** @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>*/@SuppressWarnings("deprecation")protected void _onError(Throwable e) { // NOPMDRxJavaPlugins.getInstance().getErrorHandler().handleError(e);try {actual.onError(e);} catch (OnErrorNotImplementedException e2) { // NOPMD/** onError isn't implemented so throw** https://github.com/ReactiveX/RxJava/issues/198** Rx Design Guidelines 5.2** "when calling the Subscribe method that only has an onNext argument, the OnError behavior* will be to rethrow the exception on the thread that the message comes out from the observable* sequence. The OnCompleted behavior in this case is to do nothing."*/try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD}throw e2;} catch (Throwable e2) {/** throw since the Rx contract is broken if onError failed** https://github.com/ReactiveX/RxJava/issues/198*/RxJavaHooks.onError(e2);try {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));}throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));}// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catchtry {unsubscribe();} catch (Throwable unsubscribeException) {RxJavaHooks.onError(unsubscribeException);throw new OnErrorFailedException(unsubscribeException);}}/*** Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.** @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}*/public Subscriber<? super T> getActual() {return actual;} }復制代碼subscriber 實際上就是Observer
RxJava基本使用 源碼分析 Observable創建原理分析: 第一步:調用Observable.create()方法 第二步:添加觀察者訂閱監聽Observable.OnSubscrible 第三步:在Observable.create方法中創建被觀察者new Observable(hook.onCreate(f)); 第四步:在Observable類構造方法中保存了觀察者訂閱監聽
訂閱觀察者原理分析: 第一步:注冊觀察者監聽observable.subscribe(new Observer()) 第二步:在Observable類中調用了 public final Subscription subscribe(final Observer<? super T> observer) { if (observer instanceof Subscriber) { return subscribe((Subscriber<? super T>)observer); } if (observer == null) { throw new NullPointerException("observer is null"); } return subscribe(new ObserverSubscriber(observer)); } 方法中注冊觀察者 第三步:在Observable類中調用了 public final Subscription subscribe(Subscriber<? super T> subscriber) { return Observable.subscribe(subscriber, this); }方法 第四步:調用了Observable.subscribe(subscriber, this);方法 第五步:在 Observable.subscribe方法中調用了監聽觀察者訂閱的回調接口 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);// 創建一個被觀察者// 配置回調接口---OnSubscribe// 為什么要配置?// 監聽觀察者訂閱,一旦有觀察者訂閱了,立馬回調改接口observableString = Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> observer) {Log.i("main", "回到了");//訪問請求// 所以在這個方法里面我們可以干一些事情// 進行數據通信(說白了就是通知觀察者)for (int i = 0; i < 5; i++) {observer.onNext("第" + i + "個數據");}//訪問完成// 當我們的數據傳遞完成observer.onCompleted();}});}public void click(View v) {// 觀察者訂閱// 回調原理:// 核心代碼:// hook.onSubscribeStart(observable,// observable.onSubscribe).call(subscriber);observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受數據Log.i("main", "觀察者接收到了數據: " + item);}});}結果輸出 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 回到了 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: 第0個數據 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: 第1個數據 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: 第2個數據 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: 第3個數據 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: 第4個數據 08-02 09:53:45.057 16613-16613/com.haocai.architect.rxjava I/main: ---onCompleted--- 復制代碼observableString.subscribe 中 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); 調用call方法
另一種方式自動發送
private Observable<String> observableString;@Overrideprotected void onCreate(Bundle savedInstanceState) {super.onCreate(savedInstanceState);setContentView(R.layout.activity_simple2);List<String> items = new ArrayList<String>();items.add("Kpioneer");items.add("Xpioneer");items.add("haocai");items.add("Huhu");// 框架本身提供了這樣的API// from: 一旦當你有觀察者注冊,立馬發送消息序列// 框架內部實現// 框架內部調用create方法// 迭代器模式// OnSubscribeFromIterable類專門用于遍歷集合// OnSubscribeFromArray類專門用于遍歷數組observableString = Observable.from(items);}public void click(View v) {observableString.subscribe(new Observer<String>() {@Overridepublic void onCompleted() {Log.i("main", "---onCompleted---");}@Overridepublic void onError(Throwable e) {System.out.println("Oh,no! Something wrong happened!");}@Overridepublic void onNext(String item) {// 接受數據Log.i("main", "觀察者接收到了數據: " + item);}});}結果輸出08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: Kpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: Xpioneer 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: haocai 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: 觀察者接收到了數據: Huhu 08-02 14:38:14.517 32289-32289/com.haocai.architect.rxjava I/main: ---onCompleted--- 復制代碼/*** Copyright 2014 Netflix, Inc.** Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package rx.internal.operators;import java.util.Iterator; import java.util.concurrent.atomic.AtomicLong;import rx.*; import rx.Observable.OnSubscribe; import rx.exceptions.Exceptions;/*** Converts an {@code Iterable} sequence into an {@code Observable}.* <p>* * <p>* You can convert any object that supports the Iterable interface into an Observable that emits each item in* the object, with the {@code toObservable} operation.* @param <T> the value type of the items*/ public final class OnSubscribeFromIterable<T> implements OnSubscribe<T> {final Iterable<? extends T> is;public OnSubscribeFromIterable(Iterable<? extends T> iterable) {if (iterable == null) {throw new NullPointerException("iterable must not be null");}this.is = iterable;}@Overridepublic void call(final Subscriber<? super T> o) {Iterator<? extends T> it;boolean b;try {it = is.iterator();b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!o.isUnsubscribed()) {if (!b) {o.onCompleted();} else {o.setProducer(new IterableProducer<T>(o, it));}}}static final class IterableProducer<T> extends AtomicLong implements Producer {/** */private static final long serialVersionUID = -8730475647105475802L;// 具體的觀察者private final Subscriber<? super T> o;// 具體的數據private final Iterator<? extends T> it;IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {this.o = o;this.it = it;}@Overridepublic void request(long n) {if (get() == Long.MAX_VALUE) {// already started with fast-pathreturn;}if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {fastPath();} elseif (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {slowPath(n);}}void slowPath(long n) {// backpressure is requestedfinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;long r = n;long e = 0;for (;;) {while (e != r) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}e++;}r = get();if (e == r) {r = BackpressureUtils.produced(this, e);if (r == 0L) {break;}e = 0L;}}}void fastPath() {// fast-path without backpressurefinal Subscriber<? super T> o = this.o;final Iterator<? extends T> it = this.it;for (;;) {if (o.isUnsubscribed()) {return;}T value;try {value = it.next();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}o.onNext(value);if (o.isUnsubscribed()) {return;}boolean b;try {b = it.hasNext();} catch (Throwable ex) {Exceptions.throwOrReport(ex, o);return;}if (!b) {if (!o.isUnsubscribed()) {o.onCompleted();}return;}}}}} 復制代碼總結
以上是生活随笔為你收集整理的响应式编程RxJava (一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JavaScript之使用AJAX(适合
- 下一篇: 线程状态与控制结构