flux java_FluxJava 新增 RxJava2 的支援功能
FluxJava 最初的設計就是以 Add-on 的方式來提供對于 RxJava 的支持,所以這次增加 RxJava2 的部份也依照相同的模式,在 Project 中加上了 fluxjava-rx2 的 Module。新的 Module 功能上與 fluxjava-rx 大致上相同,只是原本以 RxJava 規格運作的部份,改為使用 RxJava2。
由于 RxJava 與 RxJava2 不太有機會共存在同一個 Module 里,所以 fluxjava-rx2 沿用了 fluxjava-rx 的 Package 名稱,在使用上這二個 Add-on 必須要擇一引用。不過也帶來了一個額外的好處,如果想要由 fluxjava-rx 升級到 fluxjava-rx2 時,只要修改成 RxJava2 的調用規格,不用再特別調整 Import 的內容。
在 fluxjava-rx2 中與 fluxjava-rx 最大的差異,主要是因應 RxJava2 把原本的 Observable 分成了有背壓版本的 Flowable 與沒有背壓版本的 Observable。因此 RxBus 與 RxStore 中都分別再提供了 toFlowable 的 Method 來取得 Flowable,不過 Observable 本來就可以再轉換為 Flowable,此處的功能只是為了增加便利性、簡化源代碼之用。
同時,利用這篇文章補充說明一下一些使用上的技術細節。在 RxStore 中使用的是沒有背壓版本的 Observable 來接收外部傳來的信息,只是接收到之后就會被分派到不同的 Thread 上去處理后續的工作,所以在這個部份是不大有機會遇上 MissingBackpressureException 問題的。但是,這并不代表背壓所造成的情況就不存在了,只是瓶頸移到了 ThreadPool 的承受能力或是執行的環境可以產生 Thread 的數量上。
就算是使用有背壓功能的 Flowable,也不代表就可以高枕無憂了。說穿了,背壓本身不是什么神奇的黑科技,原理上只是在上下游中間加了個水池,讓下游有喘息的空間。水池畢竟還是有一定的物理限制,沒有控制好,依舊會讓水池承載不下而出現錯誤。
因此,不論使用哪一種方法,前端發送的數量仍然應該要被謹慎地控制,避免海量的信息把接收端給淹沒了。像是把 RecyclerView 滾動時產生位移的 Event 毫無選擇地往 RxStore 送。
在發送端就要篩選信息,除了要減少 MissingBackpressureException 可能會出現的機會外,還有一個目的是要節省執行成本。當信息數量大到無法處理時,能做的只有挑選值得處理的部份來進行。當挑選的工作被移到發送后,RxJava 的傳送機制表面上看起來就是簡單地轉了一手,但是如果去追蹤其源代碼,就可以發現其實底下做了不少的工作。而每一次傳送都要運行這些內容,可以想見在數量到達一定的程度之后,就會顯現出可觀的效能差距。
由于 Observable 在定義上所形成的限制使然,同一個發送源無法把的信息分配至不同的 Thread 上送出。RxStore 為了要讓每個資料處理要求可以獨立、同步地進行,所以才會在接收到信息后,以不同的 Thread 進行后續的工作。
在 RxStore 里提供了一個 getExecutor Method,可以使用 ThreadPool 來做為背壓的替代方案,但是并沒有像背壓一樣有可以控制上游的功能。在實作 getExecutor 時要注意,不要直接在回傳時 New 一個 ThreadPool 的 Instance。因為 getExecutor 是在每一次收到信息后調用一次。以上的做法,也等同于每一次收到信息就拿到一個 ThreadPool 的 Instance,每一個 ThreadPool 都只會產生一個 Thread,這就失去了使用 ThreadPool 的用意。
最容易出現以上問題的情況是使用 Executors 來取得 ThreadPool,一般的情況下很容易忽略 Executors 其實是每次調用就產生一個 Instance。所以當以 return Executors.newFixedThreadPool(poolSize); 的方式回傳 getExecutor 時,一開始也許顯示不出問題而被略過,但是一但信息爆量后,就會因為產生 Thread 的數量到達上限而中止運作。
以上,是這次補充的內容,讓使用 FluxJava 的朋友做為參考。
總結
以上是生活随笔為你收集整理的flux java_FluxJava 新增 RxJava2 的支援功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 示开头的成语有哪些啊?
- 下一篇: 圣冕麒麟要不要给王者无敌