JDK11的新特性:HTTP API和reactive streams
文章目錄
- 簡(jiǎn)介
- 怎么在java中使用reactive streams
- POST請(qǐng)求的例子
- 總結(jié)
簡(jiǎn)介
在JDK11的新特性:新的HTTP API中,我們介紹了通過(guò)新的HTTP API,我們可以發(fā)送同步或者異步的請(qǐng)求,并獲得的返回的結(jié)果。
今天我們想探討一下這些同步或者異步請(qǐng)求和響應(yīng)和reactive streams的關(guān)系。
更多內(nèi)容請(qǐng)?jiān)L問(wèn)www.flydean.com
怎么在java中使用reactive streams
reactive streams的介紹大家可以參考reactive stream協(xié)議詳解,使用reactive streams的目的就是為了解決發(fā)送者和消費(fèi)者之間的通信問(wèn)題,發(fā)送者不會(huì)發(fā)送超出消費(fèi)者能力的信息。
我們?cè)倩仡櫼幌聄eactive streams中的幾個(gè)關(guān)鍵概念:
-
Publisher 負(fù)責(zé)產(chǎn)生消息或者事件,并提供了一個(gè)subscribed接口來(lái)和Subscriber進(jìn)行連接。
-
Subscriber 用來(lái)subscribe一個(gè)Publisher,并提供了onNext方法來(lái)處理新的消息,onError來(lái)處理異常,onComplete提供給Publisher調(diào)用來(lái)結(jié)束監(jiān)聽。
-
Subscription 負(fù)責(zé)連接Publisher和Subscriber,可以用來(lái)請(qǐng)求消息或者取消收聽。
更進(jìn)一步,如果我們想要自己實(shí)現(xiàn)一個(gè)reactive streams,我們需要做這些事情:
- 創(chuàng)建Publisher和Subscriber。
- 調(diào)用Publisher.subscribe(Subscriber)建立Publisher和Subscriber之間的連接。
- Publisher創(chuàng)建一個(gè)Subscription,并調(diào)用Subscriber.onSubscription(Subscription)方法。
- Subscriber將Subscription保存起來(lái),供后面使用。
- Subscriber調(diào)用Subscription.request(n) 方法請(qǐng)求n個(gè)消息。
- Publisher調(diào)用Subscriber.onNext(item) 將請(qǐng)求的消息發(fā)送給Subscriber。
- 按照需要重復(fù)上訴過(guò)程。
- Publisher調(diào)用Subscriber.OnError(err) 或者 Subscriber.onComplete()方法。
- Subscriber調(diào)用Subscription.cancel()方法。
POST請(qǐng)求的例子
還記得上篇文章我們講HTTP API新特性的時(shí)候,我們使用的例子嗎?
例子中,我們使用了一個(gè)HttpRequest.BodyPublisher,用來(lái)構(gòu)建Post請(qǐng)求,而BodyPublisher就是一個(gè)Flow.Publisher:
public interface BodyPublisher extends Flow.Publisher<ByteBuffer>也就是說(shuō)從BodyPublisher開始,就已經(jīng)在使用reactive streams了。
為了能夠更好的了解reactive streams的工作原理,我們創(chuàng)建幾個(gè)wrapper類將Publisher,Subscriber,Subscription包裝起來(lái),輸出相應(yīng)的日志。
代碼有點(diǎn)多我們就不一一列出來(lái)了,這里只列一個(gè)CustBodyPublisher的具體實(shí)現(xiàn):
public class CustBodyPublisher implements HttpRequest.BodyPublisher {private final HttpRequest.BodyPublisher bodyPublisher;public CustBodyPublisher(HttpRequest.BodyPublisher bodyPublisher){this.bodyPublisher=bodyPublisher;}@Overridepublic long contentLength() {long contentLength=bodyPublisher.contentLength();log.info("contentLength:{}",contentLength);return contentLength;}@Overridepublic void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {log.info("CustBodyPublisher subscribe {}",subscriber);bodyPublisher.subscribe(new CustSubscriber(subscriber));} }wrapper類很簡(jiǎn)單,通過(guò)構(gòu)造函數(shù)傳入要wrapper的類,然后在相應(yīng)的方法中調(diào)用實(shí)際wrapper類的方法。
最后,我們將之前使用的調(diào)用HTTP API的例子改造一下:
public void testCustPost() throws IOException, InterruptedException {HttpClient client = HttpClient.newBuilder().build();HttpRequest.BodyPublisher requestBody = HttpRequest.BodyPublishers.ofString("{ 我是body }");CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody);HttpRequest postRequest = HttpRequest.newBuilder().POST(custBodyPublisher).uri(URI.create("http://www.flydean.com")).build();HttpResponse<String> response = client.send(postRequest, HttpResponse.BodyHandlers.ofString());log.info("response {}",response);}注意這里CustBodyPublisher custBodyPublisher= new CustBodyPublisher(requestBody),我們創(chuàng)建了一個(gè)新的wrapper類。
運(yùn)行它,觀察輸出結(jié)果:
[HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - contentLength:14 [HttpClient-1-Worker-0] INFO com.flydean.CustBodyPublisher - CustBodyPublisher subscribe jdk.internal.net.http.Http1Request$FixedContentSubscriber@672776b6 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onSubscribe jdk.internal.net.http.PullPublisher$Subscription@580ce038 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onNext length 14 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscription - CustSubscription request 1 [HttpClient-1-Worker-0] INFO com.flydean.CustSubscriber - CustSubscriber onComplete [main] INFO com.flydean.ReactiveHttpUsage - response (POST http://www.flydean.com) 200可以看到reactive stream的具體工作流程。首先創(chuàng)建了CustBodyPublisher,然后調(diào)用了subscribe方法。
接著CustSubscriber調(diào)用onSubscribe創(chuàng)建了Subscription。
每次CustSubscription的request方法都會(huì)導(dǎo)致CustSubscriber的onNext方法被調(diào)用。
最后當(dāng)CustSubscription再次請(qǐng)求無(wú)結(jié)果的時(shí)候,CustSubscriber調(diào)用onComplete方法結(jié)束整個(gè)流程。
注意,上面的例子中,我們wrapper調(diào)用的是BodyPublishers.ofString,其實(shí)BodyPublishers中內(nèi)置了多種BodyPublisher的實(shí)現(xiàn)。感興趣的朋友可以自行探索。
總結(jié)
本文講解了新的HTTP API中reactive Streams的使用。
本文的例子https://github.com/ddean2009/
learn-java-base-9-to-20
更多精彩內(nèi)容且看:
- 區(qū)塊鏈從入門到放棄系列教程-涵蓋密碼學(xué),超級(jí)賬本,以太坊,Libra,比特幣等持續(xù)更新
- Spring Boot 2.X系列教程:七天從無(wú)到有掌握Spring Boot-持續(xù)更新
- Spring 5.X系列教程:滿足你對(duì)Spring5的一切想象-持續(xù)更新
- java程序員從小工到專家成神之路(2020版)-持續(xù)更新中,附詳細(xì)文章教程
本文作者:flydean程序那些事
本文鏈接:http://www.flydean.com/jdk11-http-api-reactive-streams/
本文來(lái)源:flydean的博客
歡迎關(guān)注我的公眾號(hào):程序那些事,更多精彩等著您!
總結(jié)
以上是生活随笔為你收集整理的JDK11的新特性:HTTP API和reactive streams的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 什么?注释里面的代码居然能够执行
- 下一篇: JDK10的新特性:本地变量类型var