webflux系列--基础
文章目錄
- Spring Mvc 和 Spring WebFlux
- WebFlux 的優勢&提升性能
- WebFlux 應用場景
- 異同點
- 響應式編程
- Reactive Streams
- 頂級接口
- Reactor
- Reactor實現
- Flux
- Mono
- Flux常用函數
- **調試reactor**
- StepVerifier方法
- Reactor中的多線程
- **Schedulers**
- Schedulers.immediate()
- Schedulers.single()
- Schedulers.elastic()
- Schedulers.parallel()
- 錯誤處理
- **onErrorReturn**
- **onErrorResume**
- **onErrorMap**
- **doOnError** **記錄錯誤日志**
- **finally 確保做一些事情**
- **retry 重試機制**
- 背壓(流量控制)
Spring Mvc 和 Spring WebFlux
? Spring MVC 構建于 Servlet API 之上,使用的是同步阻塞式 I/O 模型,每一個請求對應一個線程去處理。
? Spring WebFlux 是一個異步非阻塞式的 Web 框架,它能夠充分利用多核 CPU 的硬件資源去處理大量的并發請求。
WebFlux 的優勢&提升性能
? WebFlux 內部使用的是響應式編程(Reactive Programming),以 Reactor 庫為基礎, 基于異步和事件驅動,可以讓我們在不擴充硬件資源的前提下,提升系統的吞吐量和伸縮性。
? WebFlux 并不能使接口的響應時間縮短,它僅僅能夠提升吞吐量和伸縮性。
WebFlux 應用場景
? Spring WebFlux 是一個異步非阻塞式的 Web 框架,所以,它特別適合應用在 IO 密集型的服務中,比如微服務網關這樣的應用中。
備注:
? IO 密集型包括:磁盤IO密集型, 網絡IO密集型,微服務網關就屬于網絡 IO 密集型,使用異步非阻塞式編程模型,能夠顯著地提升網關對下游服務轉發的吞吐量。
異同點
相同點:
- 都可以使用 Spring MVC 注解,如 @Controller, 方便我們在兩個 Web 框架中自由轉換;
- 均可以使用 Tomcat, Jetty, Undertow Servlet 容器(Servlet 3.1+);
注意點:
- Spring MVC 因為是使用的同步阻塞式,更方便開發人員編寫功能代碼,Debug 測試等,一般來說,如果 Spring MVC 能夠滿足的場景,就盡量不要用 WebFlux;
- WebFlux 默認情況下使用 Netty 作為服務器;
- WebFlux 不支持 MySql;
響應式編程
Reactive Streams
? 響應式流和迭代器較相似,不過迭代器是基于“拉”(pull)的,而響應式流是基于“推”(push)的。 迭代器的使用其實是命令式編程,因為由開發者決定什么時候調用next()獲取下一個元素。
? 在響應式流中,與上面等價的是發布者-訂閱者。但當有新的可用元素時,是由發布者推給訂閱者的。這個“推”就是響應式的關鍵所在。另外,對被推過來元素的操作也是以聲明的方式進行的,程序員只需表達做什么就行了,不需要管怎么做。
? 發布者使用onNext方法向訂閱者推送新元素,使用onError方法告知一個錯誤,使用onComplete方法告知已經結束。可見,錯誤處理和完成(結束)也是以一個良好的方式被處理。錯誤和結束都可以終止序列。這種方式非常靈活。這種模式支持0個(沒有)元素/1個元素/n(多)個元素(包括無限序列,如果滴答的鐘表)這些情況。
頂級接口
package org.reactivestreams;Publisher
public interface Publisher<T> {//注冊訂閱者public void subscribe(Subscriber<? super T> s); }Consumer
public interface Subscriber<T> {/*** 該方法在訂閱Publisher之后執行,在訂閱之前不會有數據流的消費*/public void onSubscribe(Subscription s);/*** 消費下一個消息,在執行request方法之后通知Publisher,可被調用多次,有request(x),參數x決定執行幾次* * @param t the element signaled*/public void onNext(T t);/*** 訂閱失敗事件* @param t the throwable signaled*/public void onError(Throwable t);/*** 訂閱成功事件*/public void onComplete(); }Subscription
生產消費對象參數 。用于發布者與訂閱者之間的通信(實現背壓:訂閱者能夠告訴生產者需要多少數據) public interface Subscription {/*** 消費請求 。request(n)來決定這次subscribe獲取元素的最大數目*/public void request(long n);/*** 取消請求。并且清除resources。cancel執行后,不一定會立即取消,可能由于前面的信號。*/public void cancel(); }Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }Reactor
Reactor實現
? Reactor引入可組合響應式的類型,實現了發布者接口,但也提供了豐富的操作符,就是Flux和Mono。
-
Flux,流動,表示0到N個元素。
-
Mono,單個,表示0或1個元素。
它們之間的不同主要在語義上,表示異步處理的粗略基數。
Flux
一個Flux<T>是一個標準的Publisher<T>,表示一個異步序列,可以發射0到N個元素,可以通過一個完成信號或錯誤信號**終止**。這3種類型的信號轉化為對一個下游訂閱者的***onNext***,***onComplete***,***onError***3個方法的調用。這3個方法也可以理解為事件/回調,且它們都是可選的。如沒有onNext但有onComplete,表示一個空的有限序列。既沒有onNext也沒有onComplete,表示一個空的無限序列(沒有什么實際用途,可用于測試)。無限序列也沒有必要是空的,如Flux.interval(Duration)產生一個Flux ,它是無限的,從鐘表里發射出的規則的“嘀嗒”。
Mono
? 一個Mono是一個特殊的Publisher,最多發射一個元素,可以使用***onComplete***信號或**onError信號來終止。
? 它提供的操作符只是Flux提供的一個子集,同樣,一些操作符(如把Mono和Publisher結合起來)可以把它切換到一個Flux。如Mono#concatWith(Publisher)返回一個Flux,然而Mono#then(Mono)返回的是另一個Mono。Mono可以用于表示沒有返回值的異步處理(與Runnable相似),用Mono表示。
Flux常用函數
調試reactor
? 在我們傳統阻塞代碼里面,調試(Debug)的時候是一件非常簡單的事情,通過打斷點,得到相關的stack 的信息,就可以很清楚的知道錯誤信息。 但是在reactor 環境下去調試代碼并不是一件簡單的事情,最常見的就是 一個 Flux流,怎么去得到每個元素信息,怎么去知道在管道里面下一個元素是什么,每個元素是否像期望的那樣做了操作。
官方推薦的工具是 StepVerifier 。
https://projectreactor.io/docs/core/release/reference/index.html#testing
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-test</artifactId> <scope>test</scope> <version>3.3.5.RELEASE</version></dependency>示例:
@Testpublic void reactorTest(){StepVerifier.create(Flux.just(1,2)) //1.expectNext(1,2) //2 .expectComplete() //3 .verify(); //4 }StepVerifier方法
-
map:同stream。
-
flatmap :同stream。
-
filter:同stream。
-
zip
“壓縮”就是將多個流一對一的合并起來,還有一個原因,因為在每個flux流或者mono流里面,各個流的速度是不一樣,zip還有個作用就是將兩個流進行同步對齊。
Reactor中的多線程
Schedulers
在reactor中處理線程調度的不叫thread pool,而是Schedulers(調度器),通過調度器就可以創建出供我們直接使用的多線程環境。
Schedulers.immediate()
在當前線程中使用
Schedulers.single()
創建一個可重用的單線程環境,該方法的所有調用者都會重復使用同一個線程。
Schedulers.elastic()
創建一個彈性線程池,會重用空閑線程,當線程池空閑時間過長就會自動廢棄掉。通常使用的場景是給一個阻塞的任務分配自己的線程,從而不會影響到其他任務的執行。
Schedulers.parallel()
創建一個固定大小的線程池,線程池大小和cpu個數相等。
錯誤處理
onErrorReturn
onErrorReturn在發生錯誤的時候,會提供一個缺省值,類似于安全取值的問題,但是這個在響應式流里面會更加實用。 這樣就可以在處理一些未知元素的時候,又不想讓未知因素中止程序的繼續運行,就可以采取這種方式。
Flux.just(1,2,0).map(v->2/v).onErrorReturn(0).map(v->v*2).subscribe(System.out::println,System.err::println);onErrorResume
在發生錯誤的時候,提供一個新的流或者值返回
Flux.just(1,2,0)//調用redis服務獲取數據.flatMap(id->redisService.getUserByid(id))//當發生異常的時候,從DB用戶服務獲取數據.onErrorResume(v->userService.getUserByCache(id));onErrorMap
上面的都是我們去提供缺省的方法或值處理錯誤,但是有的時候,我們需要拋出錯誤,但是需要將錯誤包裝一下,可讀性好一點,也就是拋出自定義異常。
Flux.just(1,2,0).flatMap(id->getUserByid(id)).onErrorMap(v->new CustomizeExcetion("服務器開小差了",v));doOnError 記錄錯誤日志
在發生錯誤的時候我們需要記錄日志,在reactor里面專門獨立出api記錄錯誤日志。 doOnError 對于流里面的元素只讀,也就是他不會對流里面的任務元素操作,記錄日志后,會將錯誤信號繼續拋到后面,讓后面去處理。
Flux.just(1,2,0).flatMap(id->getUserByid(id)).doOnError(e-> Log.error("this occur something error")).onErrorMap(v->new CustomizeExcetion("服務器開小差了",v));finally 確保做一些事情
有的時候我們想要像傳統的同步代碼那樣使用finally去做一些事情,比如關閉http連接,清理資源,那么在reactor中怎么去做finally
Flux.just(1,2,0).flatMap(id->getUserByid(id)).doOnError(e-> Log.error("this occur something error")).onErrorMap(v->new CustomizeExcetion("服務器開小差了",v)).doFinally(System.out.println("我會確保做一些事情"));retry 重試機制
當遇到一些不可控因素導致的程序失敗,但是代碼邏輯確實是正確的,這個時候需要重試機制。 但是需要注意的是**重試不是從錯誤的地方開始重試**,相當于對publisher 的重訂閱,也就是從零開始重新執行一遍,所以無法達到類似于斷點續傳的功能,所以使用場景還是有限制。
背壓(流量控制)
通過 Reactor提供的BaseSubscriber來進行自定義我們自己流量控制的subscriber
Flux.just(1,2).doOnRequest(s->System.out.println("no. "+s)).subscribe(new BaseSubscriber<Integer>() {@Overrideprotected void hookOnSubscribe(Subscription subscription) {System.out.println("訂閱開始了,我要請求幾個元素");request(1);}@Overrideprotected void hookOnNext(Integer value) {System.out.println("收到一個元素,下一次請求幾個元素");request(1);}});總結
以上是生活随笔為你收集整理的webflux系列--基础的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: skywalking环境搭建
- 下一篇: webflux系列--源码解析二