gitlab10.x迁移_1.x到2.x的迁移:可观察与可观察:RxJava FAQ
gitlab10.x遷移
標題不是錯誤。 rx.Observable 1.x的io.reactivex.Observable與2.x的io.reactivex.Observable完全不同。 盲目升級rx依賴關系并重命名項目中的所有導入將進行編譯(稍作更改),但不能保證相同的行為。 在項目的早期, Observable in 1.x中沒有背壓的概念,但后來包含了背壓。 它實際上是什么意思? 假設我們有一個流,每1毫秒產生一個事件,但是處理一個這樣的項目需要1 秒 。 您會發現從長遠來看,它不可能以這種方式工作:
import rx.Observable; //RxJava 1.x import rx.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));MissingBackpressureException在幾百毫秒內MissingBackpressureException 。 但是這個異常是什么意思呢? 好吧,基本上,這是一個安全網(或如果有的話,請進行健全性檢查),以防止損害應用程序。 RxJava會自動發現生產者溢出了消費者,并主動終止了流以避免進一步的損害。 那么,如果我們只是在這里和那里搜索并替換少量進口商品,該怎么辦?
import io.reactivex.Observable; //RxJava 2.x import io.reactivex.schedulers.Schedulers;Observable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));例外不見了! 我們的吞吐量也是如此……一段時間后,應用程序停滯不前,一直處于無休止的GC循環中。 您會看到,RxJava 1.x中的Observable在各處都有斷言(綁定隊列,檢查等),確保您沒有在任何地方溢出。 例如,默認情況下,1.x中的observeOn()運算符的隊列限制為128個元素。 當在整個堆棧上正確實現了背壓時, observeOn()運算符會要求上游傳遞不超過128個元素來填充其內部緩沖區。 然后,與此調度程序分開的線程(工作人員)從該隊列中拾取事件。 當隊列幾乎變空時, observeOn()運算符會要求( request()方法)提供更多信息。 當生產者不遵守背壓請求并發送比允許的更多的數據時,此機制就會破裂,從而有效地消耗了消費者。 observeOn()運算符內的內部隊列已滿,但interval()運算符仍在發出新事件-因為這是interval()要做的事情。
在1.x中Observable到的發現了這種溢出,并通過MissingBackpressureException快速失敗。 字面上的意思是: 我盡了最大努力使系統保持健康狀態,但是我的上游不尊重背壓-缺少背壓實現 。 但是,2.x中的Observable沒有這樣的安全機制。 希望你會成為一個好公民,生產緩慢或消費快速的人,這是一種香草。 當系統運行Observable ,兩個Observable的行為相同。 但是,在1.x負載下,快速失敗,而2.x則緩慢而痛苦地失敗。
這是否意味著RxJava 2.x向后退? 恰恰相反! 在2.x中,有一個重要的區別:
- Observable不在乎背壓,這極大地簡化了其設計和實現。 根據定義,它應用于建模不支持背壓的流,例如用戶界面事件
- Flowable確實支持背壓,并已采取所有安全措施。 換句話說,計算管道中的所有步驟都確保您不會溢出使用者。
2.x在可以支持背壓的流(簡單地說“ 如果需要,可以減慢 ”)和不支持背壓的流之間進行了重要區分。 從類型系統的角度來看,很清楚,我們正在處理哪種源及其保證。 那么,我們應該如何將我們的interval()示例遷移到RxJava 2.x? 比您想像的容易:
Flowable.interval(1, MILLISECONDS).observeOn(Schedulers.computation()).subscribe(x -> sleep(Duration.ofSeconds(1)));這么簡單。 您可能會問自己一個問題,為什么Flowable可以具有按定義不能支持反壓的interval()運算符? 假設所有interval()均以恒定速率傳遞事件后,它就不會減速! 好吧,如果看一下interval()的聲明,您會注意到:
@BackpressureSupport(BackpressureKind.ERROR)簡而言之,這告訴我們,無論何時不再可以保證背壓,RxJava都會照顧好它并拋出MissingBackpressureException 。 這正是我們運行Flowable.interval()程序時發生的事情–它快速失敗,而不是破壞整個應用程序的穩定性。
因此,總結起來,每當您從1.x看到Observable時,您可能想要的是2.x的Flowable 。 至少,除非您的流按定義不支持背壓。 盡管名稱相同,但是這兩個主要版本中的Observable很大不同。 但是,一旦您進行了搜索并從Observable 替換為Flowable您會注意到遷移并不是那么簡單。 這與API更改無關,區別更加深刻。
沒有簡單的Flowable.create()直接與2.x中的Observable.create()等效。 我自己過去在過度使用Observable.create()工廠方法時犯了一個錯誤。 create()允許您以任意速率發出事件,完全忽略背壓。 2.x具有一些友好的功能來處理背壓請求,但它們需要仔細設計流。 這將在下一個常見問題解答中介紹。
翻譯自: https://www.javacodegeeks.com/2017/08/1-x-2-x-migration-observable-vs-observable-rxjava-faq.html
gitlab10.x遷移
總結
以上是生活随笔為你收集整理的gitlab10.x迁移_1.x到2.x的迁移:可观察与可观察:RxJava FAQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spring health_为什么Spr
- 下一篇: DDoS测试(残暴Ddos测试)