RxJava2学习笔记(3)
接上回繼續,今天來學習下zip(打包)操作
一、zip操作
@Testpublic void zipTest() {Observable.zip(Observable.create(emitter -> {for (int i = 0; i < 10; i++) {emitter.onNext(100 + i);}}), Observable.create(emitter -> {for (int i = 0; i < 5; i++) {emitter.onNext(new Character((char) (65 + i)));}}), (integer, character) -> integer + "" + character).subscribe(s -> System.out.println(s));}zip字面意義,就是打包操作,把多個Obserable合并在一起,形成一個新的Obserable,類似文件1、文件2 ... 文件n,合成一個新文件。上面這段代碼的輸出:
100A 101B 102C 103D 104E第1個生產者,發射了10個數字(100~109),第1個生產者發射了5個字符(A~E),合并處理時,會把 “數字+字符",變成一個新字符串,然后繼續發射。注意:這里有一個類似"木桶原理",即決定一個木桶能盛多少水的,永遠是最短的那塊木頭。10發A型子彈 + 5發B型子彈,按1:1來合成,最終只有得到5發新型子彈。
?
二、限流
生產者-消費者模型中,有可能會遇到這樣一種情況:生產者精力旺盛,狂生產數據,然后消費者力不從心,根本來不及處理,這樣上游就堵住了,嚴重的話,可能導致內存耗盡。最簡單的辦法,就是把來不及處理的內容給扔掉(即:丟棄策略)。剛剛提到的zip操作中的木桶原理,就可以派上用場了。
@Testpublic void zipTest1() throws InterruptedException {Observable.zip(Observable.create(emitter -> {for (int i = 0; ; i++) { //一直不停的發emitter.onNext(i);}}).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {for (int i = 0; i < 5; i++) {emitter.onNext(0); //這里技巧性的處理:發1個0過去}}).subscribeOn(Schedulers.newThread()),(BiFunction<Object, Object, Object>) (i1, i2) -> (Integer) i1 + (Integer) i2) //1個數字+0,不影響原值.subscribe(integer -> System.out.println(integer));Thread.sleep(200);}輸出:
0 1 2 3 4如果是字符串,可以參考下面這樣處理:
Observable.zip(Observable.create(emitter -> {for (int i = 0; ; i++) {emitter.onNext("A" + i);}}).subscribeOn(Schedulers.newThread()), Observable.create(emitter -> {for (int i = 0; i < 5; i++) {emitter.onNext("");}}).subscribeOn(Schedulers.newThread()),(BiFunction<Object, Object, Object>) (i1, i2) -> (String) i1 + (String) i2).subscribe(s -> System.out.println(s));Thread.sleep(200);輸出:
A0 A1 A2 A3 A4
三、Flowable
剛才用zip這種"奇淫技巧"實現了限流,但其實rxjava還有更科學的做法(Flowable)。再思考一下“限流”這種場景,生產者太猛,一下噴出來的量太多,而消費者太弱,完全吸收不下。比較溫和的方式,最好是生產者噴發前先問下消費者,你1次能接承受多大的量?我根據你的能力來調整(多么體貼)沒錯!rxjava就是這么體貼,你想到的,它也想到了。
Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {for (int i = 0; ; i++) {emitter.onNext(i);}}, BackpressureStrategy.DROP) //這里的BackpressureStrategy.DROP是一種丟棄策略.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(2); //只接收2條信息}@Overridepublic void onNext(Integer integer) {System.out.println("onNext->" + integer);}@Overridepublic void onError(Throwable t) {System.out.println("onError!");}@Overridepublic void onComplete() {System.out.println("onComplete");}});Thread.sleep(1000);?注意:?onSubscribe 里有一行s.request(2),相當于消費者在訂閱時,告訴生產者,只能處理2條記錄。然后跑起來,就真的只有2條輸出了:
onNext->0 onNext->1值得一提的是:剩下的消息,雖然消費者不再處理了,但是生產者實際上還會繼續發的,大家可以在emitter.onNext(i)這后面,輸入一行文字,自行測試。之所以這么設計,大家可以思考一下,因為一個生產者射出來的東西,可能有多個消費者在消費,如果因為某1個消費者說:哎呀,太多了,我消化不了,你趕緊停下! 然后生產者如果真的停下來,其它消費者可能就有意見了。
但是,如果只有一個消費者的情況下,我們就是想讓生產者嚴格按照消費者的處理能力來發送數據,該怎么做呢?先把上面這段代碼加幾行輸出看看:
@Testpublic void flowableTest() throws InterruptedException {Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {for (int i = 0; i <= 20; i++) {System.out.println("requested=>" + emitter.requested());emitter.onNext(i);}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(5);}@Overridepublic void onNext(Integer integer) {System.out.println("onNext->" + integer);}@Overridepublic void onError(Throwable t) {System.out.println("onError=>" + t.getMessage());}@Overridepublic void onComplete() {System.out.println("onComplete");}});Thread.sleep(1000);}輸出:
requested=>5 onNext->0 requested=>4 onNext->1 requested=>3 onNext->2 requested=>2 onNext->3 requested=>1 onNext->4 requested=>0 onError=>create: could not emit value due to lack of requests requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0 requested=>0注意:當消費者設置了request(x)后,生產者里的requested值,就會設置成相應的x值(僅同步模式),然后每emitter.onNext()發一次數據,這個值就減少,第11,12行,當生產者emitter的requested值為0時,下游就開始報錯了,也就是說這時已經達到了消費者的處理極限。利用這一點,就可以實現我們剛才說的小目標:
@Testpublic void flowableTest() throws InterruptedException {Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {for (int i = 0; i <= 20; i++) {System.out.println("requested=>" + emitter.requested());// 想想:為什么不用if這種判斷方式? // if (emitter.requested()>0){ // emitter.onNext(i); // }while (emitter.requested() <= 0) {Thread.sleep(10);//防止cpu占用過高continue;}emitter.onNext(i);}}, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(5);}@Overridepublic void onNext(Integer integer) {System.out.println("onNext->" + integer);}@Overridepublic void onError(Throwable t) {System.out.println("onError=>" + t.getMessage());}@Overridepublic void onComplete() {System.out.println("onComplete");}});Thread.sleep(1000);}輸出:
requested=>5 onNext->0 requested=>4 onNext->1 requested=>3 onNext->2 requested=>2 onNext->3 requested=>1 onNext->4 requested=>0上面這些都是同步情況下(即:生產者與消費者都在一個線程里)Flowable的處理方法,如果是在異步多線程情況下,我們來看看是否能繼續適用:
@Testpublic void flowableTest() throws InterruptedException {Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {for (int i = 0; i <= 200; i++) {System.out.println("requested=>" + emitter.requested());while (emitter.requested() <= 0) {Thread.sleep(10);//防止cpu占用過高continue;}emitter.onNext(i);}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()) //生產者使用獨立線程.observeOn(Schedulers.newThread()) //消費者使用獨立線程.subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(5);}@Overridepublic void onNext(Integer integer) {System.out.println("onNext->" + integer);}@Overridepublic void onError(Throwable t) {System.out.println("onError=>" + t.getMessage());}@Overridepublic void onComplete() {System.out.println("onComplete");}});while (true) {Thread.sleep(1000);}}輸出:
requested=>128 requested=>127 requested=>126 requested=>125 requested=>124 requested=>123 requested=>122 requested=>121 requested=>120 requested=>119 requested=>118 requested=>117 requested=>116 requested=>115 requested=>114 requested=>113 requested=>112 requested=>111 requested=>110 requested=>109 requested=>108 requested=>107 requested=>106 requested=>105 requested=>104 onNext->0 requested=>103 onNext->1 requested=>102 onNext->2 requested=>101 onNext->3 requested=>100 onNext->4 requested=>99 requested=>98 requested=>97 requested=>96 requested=>95 requested=>94 requested=>93 requested=>92 requested=>91 requested=>90 requested=>89 requested=>88 requested=>87 requested=>86 requested=>85 requested=>84 requested=>83 requested=>82 requested=>81 requested=>80 requested=>79 requested=>78 requested=>77 requested=>76 requested=>75 requested=>74 requested=>73 requested=>72 requested=>71 requested=>70 requested=>69 requested=>68 requested=>67 requested=>66 requested=>65 requested=>64 requested=>63 requested=>62 requested=>61 requested=>60 requested=>59 requested=>58 requested=>57 requested=>56 requested=>55 requested=>54 requested=>53 requested=>52 requested=>51 requested=>50 requested=>49 requested=>48 requested=>47 requested=>46 requested=>45 requested=>44 requested=>43 requested=>42 requested=>41 requested=>40 requested=>39 requested=>38 requested=>37 requested=>36 requested=>35 requested=>34 requested=>33 requested=>32 requested=>31 requested=>30 requested=>29 requested=>28 requested=>27 requested=>26 requested=>25 requested=>24 requested=>23 requested=>22 requested=>21 requested=>20 requested=>19 requested=>18 requested=>17 requested=>16 requested=>15 requested=>14 requested=>13 requested=>12 requested=>11 requested=>10 requested=>9 requested=>8 requested=>7 requested=>6 requested=>5 requested=>4 requested=>3 requested=>2 requested=>1 requested=>0可以發現,之前的套路不管用了,生產者還是在一直持續不停的發送,但是并沒有發射滿200次,而是正好等于緩沖區大小128(關于128這個數字,可參考本文最后的參考文章)。
先來解決異步場景下,生產者為啥不能發送超過128條消息的問題,把上面的問題略改一下:
@Testpublic void flowableTest() throws InterruptedException {Flowable.create((FlowableOnSubscribe<Integer>) emitter -> {for (int i = 0; i <= 200; i++) {System.out.println("requested=>" + emitter.requested());while (emitter.requested() <= 0) {Thread.sleep(10);continue;}emitter.onNext(i);}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.newThread()).observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {@Overridepublic void onSubscribe(Subscription s) {s.request(96); //神奇的96}@Overridepublic void onNext(Integer integer) {System.out.println("onNext->" + integer);}@Overridepublic void onError(Throwable t) {System.out.println("onError=>" + t.getMessage());}@Overridepublic void onComplete() {System.out.println("onComplete");}});while (true) {Thread.sleep(1000);}}注意:19行,消費者一上來,就設置request(96),這是一個神奇的數字96,低于這個值,生產者仍然只能最多發送128個事件,達到這個值,生產者就可以繼續發送(詳情分析見本文最后的參考文章)。輸出如下:
requested=>128 requested=>127 requested=>126 requested=>125 requested=>124 requested=>123 requested=>122 requested=>121 requested=>120 requested=>119 requested=>118 requested=>117 requested=>116 requested=>115 requested=>114 requested=>113 requested=>112 requested=>111 requested=>110 requested=>109 requested=>108 requested=>107 requested=>106 requested=>105 requested=>104 requested=>103 requested=>102 requested=>101 onNext->0 requested=>100 onNext->1 requested=>99 onNext->2 requested=>98 onNext->3 requested=>97 onNext->4 onNext->5 onNext->6 onNext->7 onNext->8 onNext->9 onNext->10 onNext->11 onNext->12 requested=>96 onNext->13 requested=>95 onNext->14 requested=>94 onNext->15 requested=>93 onNext->16 requested=>92 onNext->17 requested=>91 onNext->18 requested=>90 onNext->19 requested=>89 onNext->20 requested=>88 onNext->21 requested=>87 onNext->22 requested=>86 onNext->23 requested=>85 onNext->24 requested=>84 onNext->25 requested=>83 onNext->26 requested=>82 onNext->27 requested=>81 requested=>80 requested=>79 onNext->28 requested=>78 requested=>77 requested=>76 onNext->29 requested=>75 onNext->30 requested=>74 onNext->31 requested=>73 onNext->32 requested=>72 onNext->33 requested=>71 onNext->34 requested=>70 onNext->35 requested=>69 onNext->36 requested=>68 onNext->37 requested=>67 onNext->38 requested=>66 onNext->39 requested=>65 onNext->40 requested=>64 onNext->41 requested=>63 onNext->42 requested=>62 onNext->43 requested=>61 requested=>60 requested=>59 requested=>58 onNext->44 requested=>57 requested=>56 requested=>55 requested=>54 requested=>53 onNext->45 requested=>52 onNext->46 requested=>51 onNext->47 requested=>50 onNext->48 requested=>49 onNext->49 requested=>48 onNext->50 requested=>47 onNext->51 requested=>46 onNext->52 requested=>45 onNext->53 requested=>44 onNext->54 requested=>43 onNext->55 requested=>42 onNext->56 requested=>41 onNext->57 requested=>40 onNext->58 requested=>39 onNext->59 onNext->60 requested=>38 onNext->61 requested=>37 onNext->62 requested=>36 onNext->63 requested=>35 onNext->64 requested=>34 onNext->65 requested=>33 onNext->66 requested=>32 onNext->67 requested=>31 onNext->68 requested=>30 onNext->69 requested=>29 onNext->70 requested=>28 onNext->71 requested=>27 onNext->72 requested=>26 onNext->73 requested=>25 onNext->74 requested=>24 onNext->75 requested=>23 onNext->76 requested=>22 onNext->77 requested=>21 onNext->78 requested=>20 onNext->79 requested=>19 onNext->80 requested=>18 onNext->81 requested=>17 onNext->82 requested=>16 onNext->83 requested=>15 onNext->84 requested=>14 onNext->85 requested=>13 onNext->86 requested=>12 onNext->87 requested=>11 onNext->88 requested=>10 onNext->89 requested=>9 onNext->90 requested=>8 onNext->91 requested=>7 onNext->92 requested=>6 onNext->93 requested=>5 onNext->94 requested=>4 onNext->95 requested=>3 requested=>98 requested=>97 requested=>96 requested=>95 requested=>94 requested=>93 requested=>92 requested=>91 requested=>90 requested=>89 requested=>88 requested=>87 requested=>86 requested=>85 requested=>84 requested=>83 requested=>82 requested=>81 requested=>80 requested=>79 requested=>78 requested=>77 requested=>76 requested=>75 requested=>74 requested=>73 requested=>72 requested=>71 requested=>70 requested=>69 requested=>68 requested=>67 requested=>66 requested=>65 requested=>64 requested=>63 requested=>62 requested=>61 requested=>60 requested=>59 requested=>58 requested=>57 requested=>56 requested=>55 requested=>54 requested=>53 requested=>52 requested=>51 requested=>50 requested=>49 requested=>48 requested=>47 requested=>46 requested=>45 requested=>44 requested=>43 requested=>42 requested=>41 requested=>40 requested=>39 requested=>38 requested=>37 requested=>36 requested=>35 requested=>34 requested=>33 requested=>32 requested=>31 requested=>30 requested=>29 requested=>28 requested=>27 requested=>26 requested=>25 requested=>24注意223行,requested值下降后,又開始回升了。另外一個問題,異步多線程中,消費者調用了request(n)方法試圖改變生產者的requested值,但是消費者與生產者不在同一個線程中,跨線程實時共享變量是有困難的(當然,也可以借助一些中間件,比如redis來保存共享變量,但這就超出rxjava的范疇了)。
有一些場景,生產者是能夠知道發射該什么時候完成的(比如:處理某個文本文件,讀到最后一行了,就認為完成了),這種情況下可以借助flowable來改善程序的性能。比如:處理一個超大的文本(體積>1G),如果全都加載到內存中,估計直接OOM了,下面演示了如何優雅的應對這類場景:(代碼主要來源于參考文章中的例子,略做調整)
目標:把一個文件的每一行內容,都逆序輸出。
@Testpublic void translateTest() throws Exception {Flowable.create((FlowableOnSubscribe<String>) emitter -> {try {FileReader reader = new FileReader("/data/application/test.txt");BufferedReader br = new BufferedReader(reader);String str;while ((str = br.readLine()) != null && !emitter.isCancelled()) {while (emitter.requested() == 0) {if (emitter.isCancelled()) {break;}}emitter.onNext(str);System.out.println("原文=> " + str + " =>" + emitter.requested());Thread.sleep(50); //模擬耗時處理}br.close();reader.close();emitter.onComplete();} catch (Exception e) {emitter.onError(e);}}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).subscribe(new Subscriber<String>() {Subscription s;@Overridepublic void onSubscribe(Subscription subscription) {s = subscription;s.request(1); //開始先請求1行數據}@Overridepublic void onNext(String str) {System.out.println("\t逆序<= " + StringUtils.reverse(str));s.request(1);//每處理完1行數據,再請求1行try {Thread.sleep(100); //模擬耗時處理} catch (InterruptedException e) {e.printStackTrace();}}@Overridepublic void onError(Throwable throwable) {System.out.println("出錯啦:" + throwable.getMessage());}@Overridepublic void onComplete() {System.out.println("完成!");}});while (true) {Thread.sleep(50);}}附:test.txt文本內容如下
青花瓷 詞:方文山 曲:周杰倫素胚勾勒出青花筆鋒濃轉淡 瓶身描繪的牡丹一如你初妝 冉冉檀香透過窗心事我了然 宣紙上走筆至此擱一半釉色渲染仕女圖韻味被私藏 而你嫣然的一笑如含苞待放 你的美一縷飄散 去到我去不了的地方天青色等煙雨而我在等你 炊煙裊裊升起隔江千萬里 在瓶底書漢隸仿前朝的飄逸 就當我為遇見你伏筆 天青色等煙雨而我在等你 月色被打撈起暈開了結局 如傳世的青花瓷自顧自美麗 你眼帶笑意色白花青的錦鯉躍然于碗底 臨摹宋體落款時卻惦記著你 你隱藏在窯燒里千年的秘密 極細膩猶如繡花針落地簾外芭蕉惹驟雨門環惹銅綠 而我路過那江南小鎮惹了你 在潑墨山水畫里 你從墨色深處被隱去天青色等煙雨而我在等你 炊煙裊裊升起隔江千萬里 在瓶底書漢隸仿前朝的飄逸就當我為遇見你伏筆 天青色等煙雨而我在等你 月色被打撈起暈開了結局 如傳世的青花瓷自顧自美麗 你眼帶笑意天青色等煙雨 而我在等你 炊煙裊裊升起 隔江千萬里 在瓶底書漢隸仿前朝的飄逸 就當我為遇見你伏筆 天青色等煙雨 而我在等你 月色被打撈起 暈開了結局 如傳世的青花瓷自顧自美麗 你眼帶笑意---------------------------- <茶湯>山嵐像茶杯上的云煙 顏色越來越淺 你越走越遠 《茶湯》演唱者郁可唯 《茶湯》演唱者郁可唯 有好多的話還來不及兌現,你就不見 我身后窗外那片梯田 像一段段從前 我站在茶園,抬頭望著天,想象你會在山的,那一邊 我說再喝一碗我熬的茶湯 你說你現在馬上要渡江 渡江到那遙遠的寒冷北方 就怕你的手會凍僵 你何時回來喝我熬的茶湯 這次我會多放一些老姜 你寄來的信一直擱在桌上 不知要寄還哪地方 北風它經過多少村落 來來回回繞過 分不清那年,我求天保佑,只見風聲大做,卻更寂寞 那莊稼已經幾次秋收,麥田幾次成熟 于是我焚香,安靜的難過,你還是一直沒有,回來過 我說再喝一碗我熬的茶湯 你說你現在馬上要渡江 渡江到那遙遠的寒冷北方 就怕你的手會凍僵 你何時回來喝我熬的茶湯 這次我會多放一些老姜 你寄來的信一直擱在桌上 不知要寄還哪地方 我身后窗外那片梯田 像一段段從前 我站在茶園,抬頭望著天,想象你會在山的,那一邊 我說再喝一碗我熬的茶湯 你說你現在馬上要渡江 渡江到那遙遠的寒冷北方 就怕你的手會凍僵 你何時回來喝我熬的茶湯 這次我會多放一些老姜 你寄來的信一直擱在桌上 不知要寄還哪地方 我說再喝一碗我熬的茶湯 你說你現在馬上要渡江 想問你到底是否有種藥方 讓熱湯永遠不會涼 你何時回來喝我熬的茶湯 這次我會多放一些老姜 你寄來的信一直擱在桌上 不知要寄還哪地方---------------------------------- <東風破>一盞離愁 孤單佇立在窗口 我在門后 假裝你人還沒走 舊地如重游 月圓更寂寞 夜半清醒的燭火 不忍苛責我 一壺漂泊 浪跡天涯難入喉 你走之后 酒暖回憶思念瘦 水向東流 時間怎么偷 花開就一次成熟 我卻錯過 誰在用琵琶彈奏 一曲東風破 歲月在墻上剝落 看見小時候 猶記得那年我們都還很年幼 而如今琴聲幽幽 我的等候你沒聽過 誰在用琵琶彈奏 一曲東風破 楓葉將故事染色 結局我看透 籬笆外的古道我牽著你走過 荒煙漫草的年頭 就連分手都很沉默 一壺漂泊 浪跡天涯難入喉 你走之后 酒暖回憶思念瘦 水向東流 時間怎么偷 花開就一次成熟 我卻錯過 誰在用琵琶彈奏 一曲東風破 歲月在墻上剝落 看見小時候 猶記得那年我們都還很年幼 而如今琴聲幽幽 我的等候你沒聽過 誰在用琵琶彈奏 一曲東風破 楓葉將故事染色 結局我看透 籬笆外的古道我牽著你走過 荒煙漫草的年頭 就連分手都很沉默 誰在用琵琶彈奏 一曲東風破 歲月在墻上剝落 看見小時候 猶記得那年我們都還很年幼 而如今琴聲幽幽 我的等候你沒聽過 誰在用琵琶彈奏 一曲東風破 楓葉將故事染色 結局我看透 籬笆外的古道我牽著你走過 荒煙漫草的年頭 就連分手都很沉默--------------結束!
參考文章:
https://www.jianshu.com/p/9b1304435564
https://www.jianshu.com/p/a75ecf461e02
轉載于:https://www.cnblogs.com/yjmyzz/p/rx-java-2-tutorial-3.html
總結
以上是生活随笔為你收集整理的RxJava2学习笔记(3)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最小生成树-prim算法模板
- 下一篇: Git入门总结