RxJava 和 RxAndroid 五(线程调度)
RxJava 和 RxAndroid 一 (基礎)
RxJava 和 RxAndroid 二(操作符的使用)
RxJava 和 RxAndroid 三(生命周期控制和內(nèi)存優(yōu)化)
RxJava 和 RxAndroid 四(RxBinding的使用)
?
本文將有幾個例子說明,rxjava線程調(diào)度的正確使用姿勢。
例1
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;結果
/rx_call: main ? ? ? ? ? -- 主線程
/rx_map: main ? ? ? ?-- ?主線程
/rx_subscribe: main ? -- 主線程
例2
new Thread(new Runnable() {@Overridepublic void run() {Logger.v( "rx_newThread" , Thread.currentThread().getName() );rx();}}).start();void rx(){Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;}
? ? ? 結果
/rx_newThread: Thread-564 ? -- 子線程
/rx_call: Thread-564 ? ? ? ? ? ? ?-- 子線程
/rx_map: Thread-564 ? ? ? ? ? ?-- 子線程?
/rx_subscribe: Thread-564 ? ?-- 子線程
?
- 通過例1和例2,說明,Rxjava默認運行在當前線程中。如果當前線程是子線程,則rxjava運行在子線程;同樣,當前線程是主線程,則rxjava運行在主線程
?
例3
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;結果
/rx_call: RxCachedThreadScheduler-1 ? ?--io線程
/rx_map: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??--主線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--主線程
?
例4
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;? ? ? 結果
/rx_call: RxCachedThreadScheduler-1 ? ? --io線程
/rx_map: RxCachedThreadScheduler-1 ? --io線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?--主線程
? ?
- 通過例3、例4 可以看出??.subscribeOn(Schedulers.io()) ?和?.observeOn(AndroidSchedulers.mainThread()) 寫的位置不一樣,造成的結果也不一樣。從例4中可以看出 map() 操作符默認運行在事件產(chǎn)生的線程之中。事件消費只是在 subscribe() 里面。
- 對于 create() , just() , from() ? 等 ? ? ? ? ? ? ? ? --- 事件產(chǎn)生 ??
? ? ? ? ? ? ? ?map() , flapMap() , scan() , filter() ?等 ? ?-- ?事件加工
? ? ? ? ? ? ? subscribe() ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?-- ?事件消費
- ? 事件產(chǎn)生:默認運行在當前線程,可以由 subscribeOn() ?自定義線程
? ? ? ? ?事件加工:默認跟事件產(chǎn)生的線程保持一致, 可以由 observeOn() 自定義線程
? ? ? ?事件消費:默認運行在當前線程,可以有observeOn() 自定義
?
例5 ?多次切換線程
?
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Logger.v( "rx_call" , Thread.currentThread().getName() );subscriber.onNext( "dd");subscriber.onCompleted();}}).observeOn( Schedulers.newThread() ) //新線程.map(new Func1<String, String >() {@Overridepublic String call(String s) {Logger.v( "rx_map" , Thread.currentThread().getName() );return s + "88";}}).observeOn( Schedulers.io() ) //io線程.filter(new Func1<String, Boolean>() {@Overridepublic Boolean call(String s) {Logger.v( "rx_filter" , Thread.currentThread().getName() );return s != null ;}}).subscribeOn(Schedulers.io()) //定義事件產(chǎn)生線程:io線程.observeOn(AndroidSchedulers.mainThread()) //事件消費線程:主線程.subscribe(new Action1<String>() {@Overridepublic void call(String s) {Logger.v( "rx_subscribe" , Thread.currentThread().getName() );}}) ;結果
/rx_call: RxCachedThreadScheduler-1 ? ? ? ? ? -- io 線程
/rx_map: RxNewThreadScheduler-1 ? ? ? ? ? ??-- new出來的線程
/rx_filter: RxCachedThreadScheduler-2 ? ? ? ?-- io線程
/rx_subscribe: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? -- 主線程
?
例6:只規(guī)定了事件產(chǎn)生的線程
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Log.v( "rx--create " , Thread.currentThread().getName() ) ;subscriber.onNext( "dd" ) ;}}).subscribeOn(Schedulers.io()).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;}}) ;結果
/rx--create: RxCachedThreadScheduler-4 ? ? ? ? ? ? ? ? ? ? ?// io 線程
/rx--subscribe: RxCachedThreadScheduler-4 ? ? ? ? ? ? ? ? // io 線程
? ? ?
例:7:只規(guī)定事件消費線程
Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {Log.v( "rx--create " , Thread.currentThread().getName() ) ;subscriber.onNext( "dd" ) ;}}).observeOn( Schedulers.newThread() ).subscribe(new Action1<String>() {@Overridepublic void call(String s) {Log.v( "rx--subscribe " , Thread.currentThread().getName() ) ;}}) ;結果
/rx--create: main ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? -- 主線程
/rx--subscribe: RxNewThreadScheduler-1 ? ? ? ?-- ?new 出來的子線程?
? ? ??
? ? 從例6可以看出,如果只規(guī)定了事件產(chǎn)生的線程,那么事件消費線程將跟隨事件產(chǎn)生線程。
? ? 從例7可以看出,如果只規(guī)定了事件消費的線程,那么事件產(chǎn)生的線程和 當前線程保持一致。
?
例8:線程調(diào)度封裝
?在Android 常常有這樣的場景,后臺處理處理數(shù)據(jù),前臺展示數(shù)據(jù)。
一般的用法:
Observable.just( "123" ).subscribeOn( Schedulers.io()).observeOn( AndroidSchedulers.mainThread() ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;但是項目中這種場景有很多,所以我們就想能不能把這種場景的調(diào)度方式封裝起來,方便調(diào)用。
簡單的封裝
public Observable apply( Observable observable ){return observable.subscribeOn( Schedulers.io() ).observeOn( AndroidSchedulers.mainThread() ) ;}使用
apply( Observable.just( "123" ) ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;弊端:雖然上面的這種封裝可以做到線程調(diào)度的目的,但是它破壞了鏈式編程的結構,是編程風格變得不優(yōu)雅。
改進:Transformers 的使用(就是轉(zhuǎn)化器的意思,把一種類型的Observable轉(zhuǎn)換成另一種類型的Observable?)
改進后的封裝
Observable.Transformer schedulersTransformer = new Observable.Transformer() {@Override public Object call(Object observable) {return ((Observable) observable).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());}};使用
Observable.just( "123" ).compose( schedulersTransformer ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;弊端:雖然保持了鏈式編程結構的完整,但是每次調(diào)用?.compose( schedulersTransformer ) 都是 new 了一個對象的。所以我們需要再次封裝,盡量保證單例的模式。
改進后的封裝
package lib.app.com.myapplication;import rx.Observable; import rx.android.schedulers.AndroidSchedulers; import rx.schedulers.Schedulers;/*** Created by ${zyj} on 2016/7/1.*/ public class RxUtil {private final static Observable.Transformer schedulersTransformer = new Observable.Transformer() {@Override public Object call(Object observable) {return ((Observable) observable).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());}};public static <T> Observable.Transformer<T, T> applySchedulers() {return (Observable.Transformer<T, T>) schedulersTransformer;}}使用
Observable.just( "123" ).compose( RxUtil.<String>applySchedulers() ).subscribe(new Action1() {@Overridepublic void call(Object o) {}}) ;
?
?
?
總結
以上是生活随笔為你收集整理的RxJava 和 RxAndroid 五(线程调度)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android Builder模式在开发
- 下一篇: Android 数据库 LiteOrm