response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题
現在, Java 的各種基于 Reactor 模型的響應式編程庫或者框架越來越多了,像是 RxJava,Project Reactor,Vert.x 等等等等。在 Java 9, Java 也引入了自己的 響應式編程的一種標準接口,即java.util.concurrent.Flow這個類。這個類里面規定了 Java 響應式編程所要實現的接口與抽象。我們這個系列要討論的就是Project Reactor這個實現。
這里也提一下,為了能對于沒有升級到 Java 9 的用戶也能兼容,java.util.concurrent.Flow這個類也被放入了一個 jar 供 Java 9 之前的版本,依賴是:
<dependency><groupId>org.reactivestreams</groupId><artifactId>reactive-streams</artifactId><version>1.0.3</version> </dependency>本系列所講述的 Project Reactor 就是 reactive-streams 的一種實現。 首先,我們先來了解下,什么是響應式編程,Java 如何實現
什么是響應式編程,Java 如何實現
我們這里用通過唯一 id 獲取知乎的某個回答作為例子,首先我們先明確下,一次HTTP請求到服務器上處理完之后,將響應寫回這次請求的連接,就是完成這次請求了,如下:
public void request(Connection connection, HttpRequest request) {//處理request,省略代碼connection.write(response);//完成響應 }假設獲取回答需要調用兩個接口,獲取評論數量還有獲取回答信息,傳統的代碼可能會這么去寫:
//獲取評論數量 public void getCommentCount(Connection connection, HttpRequest request) {Integer commentCount = null;try {//從緩存獲取評論數量,阻塞IOcommentCount = getCommnetCountFromCache(id);} catch(Exception e) {try {//緩存獲取失敗就從數據庫中獲取,阻塞IOcommentCount = getVoteCountFromDB(id);} catch(Exception ex) {}}connection.write(commentCount); }//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {//獲取點贊數量Integer voteCount = null;try {//從緩存獲取點贊數量,阻塞IOvoteCount = getVoteCountFromCache(id);} catch(Exception e) {try {//緩存獲取失敗就從數據庫中獲取,阻塞IOvoteCount = getVoteCountFromDB(id);} catch(Exception ex) {}}//從數據庫獲取回答信息,阻塞IOAnswer answer = getAnswerFromDB(id);//拼裝ResponseResultVO response = new ResultVO();if (voteCount != null) {response.setVoteCount(voteCount);}if (answer != null) {response.setAnswer(answer);}connection.write(response);//完成響應 }在這種實現下,你的進程只需要一個線程池,承載了所有請求。這種實現下,有兩個弊端:
現在,NIO 非阻塞 IO 很普及了,有了非阻塞 IO,我們可以通過響應式編程,來讓我們的線程不會阻塞,而是一直在處理請求。這是如何實現的呢?
傳統的 BIO,是線程將數據寫入 Connection 之后,當前線程進入 Block 狀態,直到響應返回,之后接著做響應返回后的動作。NIO 則是線程將數據寫入 Connection 之后,將響應返回后需要做的事情以及參數緩存到一個地方之后,直接返回。在有響應返回后,NIO 的 Selector 的 Read 事件會是 Ready 狀態,掃描 Selector 事件的線程,會告訴你的線程池數據好了,然后線程池中的某個線程,拿出剛剛緩存的要做的事情還有參數,繼續處理。
那么,怎樣實現緩存響應返回后需要做的事情以及參數的呢?Java 本身提供了兩種接口,一個是基于回調的 Callback 接口(Java 8 引入的各種Functional Interface),一種是 Future 框架。
基于 Callback 的實現:
//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {ResultVO resultVO = new ResultVO();getVoteCountFromCache(id, (count, throwable) -> {//異常不為null則為獲取失敗if (throwable != null) {//讀取緩存失敗就從數據庫獲取getVoteCountFromDB(id, (count2, throwable2) -> {if (throwable2 == null) {resultVO.setVoteCount(voteCount);}//從數據庫讀取回答信息getAnswerFromDB(id, (answer, throwable3) -> {if (throwable3 == null) {resultVO.setAnswer(answer);connection.write(resultVO);} else {connection.write(throwable3);}});});} else {//獲取成功,設置voteCountresultVO.setVoteCount(voteCount);//從數據庫讀取回答信息getAnswerFromDB(id, (answer, throwable2) -> {if (throwable2 == null) {resultVO.setAnswer(answer);//返回響應connection.write(resultVO);} else {//返回錯誤響應connection.write(throwable2);}});}}); }可以看出,隨著調用層級的加深,callback 層級越來越深,越來越難寫,而且啰嗦的代碼很多。并且,基于 CallBack 想實現獲取點贊數量其實和獲取回答信息并發是很難寫的,這里還是先獲取點贊數量之后再獲取回答信息。
那么基于 Future 呢?我們用 Java 8 之后引入的 CompletableFuture 來試著實現下。
//獲取回答 public void getAnswer(Connection connection, HttpRequest request) {ResultVO resultVO = new ResultVO();//所有的異步任務都執行完之后要做的事情CompletableFuture.allOf(getVoteCountFromCache(id)//發生異常,從數據庫讀取.exceptionallyComposeAsync(throwable -> getVoteCountFromDB(id))//讀取完之后,設置VoteCount.thenAccept(voteCount -> {resultVO.setVoteCount(voteCount);}),getAnswerFromDB(id).thenAccept(answer -> {resultVO.setAnswer(answer);})).exceptionallyAsync(throwable -> {connection.write(throwable);}).thenRun(() -> {connection.write(resultVO);}); }這種實現就看上去簡單多了,并且讀取點贊數量還有讀取回答內容是同時進行的。 Project Reactor 在 Completableuture 這種實現的基礎上,增加了更多的組合方式以及更完善的異常處理機制,以及面對背壓時候的處理機制,還有重試機制。
響應式編程里面遇到的問題 - 背壓
由于響應式編程,不阻塞,所以把之前因為基本不會發生而忽視的一個問題帶了上來,就是背壓(Back Pressure)。
背壓是指,當上游請求過多,下游服務來不及響應,導致 Buffer 溢出的這樣一個問題。在響應式編程,由于線程不阻塞,遇到 IO 就會把當前參數和要做的事情緩存起來,這樣無疑增大了很多吞吐量,同時內存占用也大了起來,如果不限制的話,很可能 OutOfMemory,這就是背壓問題。
在這個問題上,Project Reactor 基于的模型,是有處理方式的,Completableuture 這個體系里面沒有。
為何現在響應式編程在業務開發微服務開發不普及
主要因為數據庫 IO,不是 NIO。
不論是Java自帶的Future框架,還是 Spring WebFlux,還是 Vert.x,他們都是一種非阻塞的基于Ractor模型的框架(后兩個框架都是利用netty實現)。
在阻塞編程模式里,任何一個請求,都需要一個線程去處理,如果io阻塞了,那么這個線程也會阻塞在那。但是在非阻塞編程里面,基于響應式的編程,線程不會被阻塞,還可以處理其他請求。舉一個簡單例子:假設只有一個線程池,請求來的時候,線程池處理,需要讀取數據庫 IO,這個 IO 是 NIO 非阻塞 IO,那么就將請求數據寫入數據庫連接,直接返回。之后數據庫返回數據,這個鏈接的 Selector 會有 Read 事件準備就緒,這時候,再通過這個線程池去讀取數據處理(相當于回調),這時候用的線程和之前不一定是同一個線程。這樣的話,線程就不用等待數據庫返回,而是直接處理其他請求。這樣情況下,即使某個業務 SQL 的執行時間長,也不會影響其他業務的執行。
但是,這一切的基礎,是 IO 必須是非阻塞 IO,也就是 NIO(或者 AIO)。官方JDBC沒有 NIO,只有 BIO 實現。這樣無法讓線程將請求寫入鏈接之后直接返回,必須等待響應。但是也就解決方案,就是通過其他線程池,專門處理數據庫請求并等待返回進行回調,也就是業務線程池 A 將數據庫 BIO 請求交給線程池B處理,讀取完數據之后,再交給 A 執行剩下的業務邏輯。這樣A也不用阻塞,可以處理其他請求。但是,這樣還是有因為某個業務 SQL 的執行時間長,導致B所有線程被阻塞住隊列也滿了從而A的請求也被阻塞的情況,這是不完美的實現。真正完美的,需要 JDBC 實現 NIO。
Java 自帶的 Future框架可以這么用JDBC:
@GetMapping public DeferredResult<Result> get() { DeferredResult<Result> deferredResult = new DeferredResult<>(); CompletableFuture.supplyAsync(() -> {return 阻塞數據庫IO;//dbThreadPool用來處理阻塞的數據庫IO}, dbThreadPool).thenComposeAsync(result -> {//spring 的 DeferredResult 來實現異步回調寫入結果返回deferredResult.setResult(result); }); return deferredResult; }WebFlux 也可以使用阻塞JDBC,但是同理:
@GetMapping public Mono<Result> get() { return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {return 阻塞數據庫IO;//dbThreadPool用來處理阻塞的數據庫IO}, dbThreadPool)); }Vert.x 也可以使用阻塞的JDBC,也是同理:
@GetMapping public DeferredResult<Result> get() { DeferredResult<Result> deferredResult = new DeferredResult<>(); getResultFromDB().setHandler(asyncResult -> {if (asyncResult.succeeded()) {deferredResult.setResult(asyncResult.result());} else {deferredResult.setErrorResult(asyncResult.cause());}}); return deferredResult; }private WorkerExecutor dbThreadPool = vertx.createSharedWorkerExecutor("DB", 16);private Future<Result> getResultFromDB() {Future<Result> result = Future.future();dbThreadPool.executeBlocking(future -> {return 阻塞數據庫IO;}, false, asyncResult -> {if (asyncResult.succeeded()) {result.complete(asyncResult.result());} else {result.fail(asyncResult.cause());}});return result; }相當于通過另外的線程池(當然也可以通過原有線程池,反正就是要用和請求不一樣的線程,才能實現回調,而不是當次就阻塞等待),封裝了阻塞 JDBC IO。
但是,這樣幾乎對數據庫IO主導的應用性能沒有提升,還增加了線程切換,得不償失。所以,需要使用真正實現了 NIO 的數據庫客戶端。目前有這些 NIO 的 JDBC 客戶端,但是都不普及:
總結
以上是生活随笔為你收集整理的response获取响应内容_Project Reactor 深度解析 - 1. 响应式编程介绍,实现以及现有问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python效率提升_Python GU
- 下一篇: 给老人选择舒适的床垫 哪个品牌的床垫最好