Dubbo的异步调用
文章目錄
- dubbo異步調(diào)用
- 2.6版本中dubbo異步調(diào)用的實(shí)現(xiàn)
- 2.7版本dubbo 客戶端Consumer異步調(diào)用
- 使用CompletableFuture簽名的接口
- 1、調(diào)用遠(yuǎn)程服務(wù):
- 2、 使用RpcContext
- 2.7 版本 服務(wù)提供者Provider異步執(zhí)行
- 1、定義CompletableFuture簽名的接口
- 2、使用AsyncContext
- springboot 項(xiàng)目集成異步調(diào)用
在微服務(wù)環(huán)境中,往往一個(gè)接口,是經(jīng)過多個(gè)服務(wù)間的接口調(diào)用,最后封裝成一個(gè)接口中返回。如果每個(gè)等待每個(gè)接口串并執(zhí)行結(jié)果,會(huì)比較耗時(shí),此時(shí)我們就需要異步處理。
dubbo異步調(diào)用
dubbo的異步調(diào)用,基于NIO的非阻塞實(shí)現(xiàn)并行調(diào)用,客戶端不需要啟動(dòng)多線程即可完成并行調(diào)用多個(gè)遠(yuǎn)程服務(wù),相對(duì)多線程開銷較小。
在userThread用戶線程請(qǐng)求網(wǎng)關(guān)服務(wù),調(diào)用某個(gè)方法,執(zhí)行ioThread,再去調(diào)用服務(wù)端server,在服務(wù)端返回?cái)?shù)據(jù)之前,將Future對(duì)象,復(fù)制到RpcContext中,在通過getFuture獲取Future對(duì)象。
在服務(wù)端返回?cái)?shù)據(jù)后,通知Future對(duì)象,通過Future的get方法,獲取返回結(jié)果。
2.6版本中dubbo異步調(diào)用的實(shí)現(xiàn)
在客戶端的consumer.xml 中配置如下:
<dubbo:reference id="fooService" interface="com.alibaba.dubbo.samples.async.api.FooService"><dubbo:method name="findFoo" async="true"/> </dubbo:reference><dubbo:reference id="barService" interface="com.alibaba.dubbo.samples.async.api.BarService"><dubbo:method name="findBar" async="true"/> </dubbo:reference>調(diào)用代碼。
// 此調(diào)用會(huì)立即返回null fooService.findFoo(fooId) // 拿到調(diào)用的Future引用, 當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此Future。 Future<Foo> fooFuture = RpcContext.getContext().getFuture();// 此調(diào)用會(huì)立即返回null barService.findBar(barId) // 拿到調(diào)用的Future引用, 當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此Future。 Future<Bar> fooFuture = RpcContext.getContext().getFuture();// 此時(shí)findFoo 和 findBar 的請(qǐng)求同時(shí)在執(zhí)行,客戶端不需要啟動(dòng)多線程來支持并行,而是借助NIO的非阻塞完成。// 如果foo 已返回,直接拿到返回值,否則當(dāng)前線程wait住, 等待foo返回后,線程會(huì)被notify喚醒。 Foo foo = fooFuture.get(). // 同理等待bar返回。 Bar bar = barFuture.get(); // 如果foo需要5秒返回,bar需要6秒返回,實(shí)際只需要6秒,即可獲取到foo和bar,進(jìn)行接下來的處理。一些特殊場(chǎng)景下,客戶端為了盡快調(diào)用返回值,可以設(shè)置是否等待消息發(fā)出:
- sent=“true” 等待消息發(fā)出,消息發(fā)送失敗將拋出異常;
- sent=“false” 不等待消息發(fā)出,將消息放入 IO 隊(duì)列,即刻返回。
默認(rèn)為fase。配置方式如下:
如果你只是想異步,完全忽略返回值,可以配置 return="false",以減少 Future 對(duì)象的創(chuàng)建和管理成本:
<dubbo:method name="goodbye" async="true" return="false"/>此時(shí),RpcContext.getContext().getFuture()將返回null。
2.7版本dubbo 客戶端Consumer異步調(diào)用
從v2.7.0開始,Dubbo的所有異步編程接口開始以CompletableFuture為基礎(chǔ)。
使用CompletableFuture簽名的接口
需要服務(wù)提供者事先定義CompletableFuture簽名的服務(wù),具體參見服務(wù)端異步執(zhí)行接口定義:
public interface AsyncService {CompletableFuture<String> sayHello(String name); }注意接口的返回類型是CompletableFuture<String>。
XML引用服務(wù):
<dubbo:reference id="asyncService" timeout="10000" interface="com.alibaba.dubbo.samples.async.api.As1、調(diào)用遠(yuǎn)程服務(wù):
// 調(diào)用直接返回CompletableFuture CompletableFuture<String> future = asyncService.sayHello("async call request"); // 增加回調(diào) future.whenComplete((v, t) -> {if (t != null) {t.printStackTrace();} else {System.out.println("Response: " + v);} }); // 早于結(jié)果輸出 System.out.println("Executed before response return.");### Springboot2、 使用RpcContext
在 consumer.xml 中配置:
<dubbo:reference id="asyncService" interface="org.apache.dubbo.samples.governance.api.AsyncService"><dubbo:method name="sayHello" async="true" /> </dubbo:reference>調(diào)用代碼:
// 此調(diào)用會(huì)立即返回null asyncService.sayHello("world"); // 拿到調(diào)用的Future引用,當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此Future CompletableFuture<String> helloFuture = RpcContext.getContext().getCompletableFuture(); // 為Future添加回調(diào) helloFuture.whenComplete((retValue, exception) -> {if (exception == null) {System.out.println(retValue);} else {exception.printStackTrace();} });或者,你也可以這樣做異步調(diào)用:
CompletableFuture<String> future = RpcContext.getContext().asyncCall(() -> {asyncService.sayHello("oneway call request1");} );future.get();2.7 版本 服務(wù)提供者Provider異步執(zhí)行
Provier端異步執(zhí)行將阻塞的業(yè)務(wù)從Dubbo內(nèi)部線程池切換到業(yè)務(wù)自定義線程,避免Dubbo線程池過度占用,有助于避免不同服務(wù)間的互相影響。異步執(zhí)行無益于節(jié)省資源或提升RPC響應(yīng)性能,因?yàn)槿绻麡I(yè)務(wù)執(zhí)行需要阻塞,則始終還是要有線程來負(fù)責(zé)執(zhí)行。
注意:Provider端異步執(zhí)行和Consumer端異步調(diào)用是相互獨(dú)立的,你可以任意正交組合兩端配置
- Consumer同步 - Provider同步
- Consumer異步 - Provider同步
- Consumer同步 - Provider異步
- Consumer異步 - Provider異步
1、定義CompletableFuture簽名的接口
服務(wù)接口定義:
public interface AsyncService {CompletableFuture<String> sayHello(String name); }服務(wù)實(shí)現(xiàn):
public class AsyncServiceImpl implements AsyncService {@Overridepublic CompletableFuture<String> sayHello(String name) {RpcContext savedContext = RpcContext.getContext();// 建議為supplyAsync提供自定義線程池,避免使用JDK公用線程池return CompletableFuture.supplyAsync(() -> {System.out.println(savedContext.getAttachment("consumer-key1"));try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "async response from provider.";});} }通過return CompletableFuture.supplyAsync(),業(yè)務(wù)執(zhí)行已從Dubbo線程切換到業(yè)務(wù)線程,避免了對(duì)Dubbo線程池的阻塞。
2、使用AsyncContext
Dubbo提供了一個(gè)類似Serverlet 3.0的異步接口AsyncContext,在沒有CompletableFuture簽名接口的情況下,也可以實(shí)現(xiàn)Provider端的異步執(zhí)行。
服務(wù)接口定義:
public interface AsyncService {String sayHello(String name); }服務(wù)暴露,和普通服務(wù)完全一致:
<bean id="asyncService" class="org.apache.dubbo.samples.governance.impl.AsyncServiceImpl"/> <dubbo:service interface="org.apache.dubbo.samples.governance.api.AsyncService" ref="asyncService"/>服務(wù)實(shí)現(xiàn):
public class AsyncServiceImpl implements AsyncService {public String sayHello(String name) {final AsyncContext asyncContext = RpcContext.startAsync();new Thread(() -> {// 如果要使用上下文,則必須要放在第一句執(zhí)行asyncContext.signalContextSwitch();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}// 寫回響應(yīng)asyncContext.write("Hello " + name + ", response from provider.");}).start();return null;} }springboot 項(xiàng)目集成異步調(diào)用
1、啟動(dòng)來添加注解@EnableAsync
@SpringBootApplication(scanBasePackages = {"com.stylefeng.guns"}) @EnableDubboConfiguration @EnableAsync public class GatewayApplication {public static void main(String[] args) {SpringApplication.run(GatewayApplication.class, args);} }2、 客戶端注解@Reference中添加屬性async = true
@Reference(interfaceClass = FilmAsyncServiceApi.class, validation = "1.0", async = true)private FilmAsyncServiceApi filmAsyncServiceApi;3、 接口調(diào)用
/*** 影片詳情接口* @return*/@RequestMapping(value = "films/{searchParam}", method = RequestMethod.GET)public ResponseVO films(@PathVariable String searchParam,int searchType) throws ExecutionException, InterruptedException {// 根據(jù)searchType,判斷查詢類型FilmDetailVO filmDetail = filmServiceApi.getFilmDetail(searchType, searchParam);// 查詢影片的詳細(xì)信息 -> Dubbo 的異步獲取// 獲取影片描述信息if (filmDetail == null ) {return ResponseVO.serviceFail("沒有可查詢的影片");} else if (filmDetail.getFilmId() == null || filmDetail.getFilmId().trim().length() == 0) {return ResponseVO.serviceFail("沒有可查詢的影片");}String filmId = filmDetail.getFilmId();filmServiceApi.getFilmDesc(filmId);// 拿到調(diào)用的Future引用,當(dāng)結(jié)果返回后,會(huì)被通知和設(shè)置到此future。Future<FilmDescVO> filmDescVOFuture = RpcContext.getContext().getFuture();// 獲取圖片信息filmServiceApi.getImags(filmId);Future<ImgVO> imgVOFuture = RpcContext.getContext().getFuture();// 獲取導(dǎo)演信息filmServiceApi.getDectInfo(filmId);Future<ActorVO> actorVOFuture = RpcContext.getContext().getFuture();// 獲取演員信息filmServiceApi.getActors(filmId);Future<List<ActorVO>> actorsFuture = RpcContext.getContext().getFuture();FilmInfoVO filmInfoVO = new FilmInfoVO();BeanUtils.copyProperties(filmDetail, filmInfoVO);// 組織Actor屬性InfoRequestVO infoRequestVO = new InfoRequestVO();ActorRequestVO actorRequestVO = new ActorRequestVO();actorRequestVO.setActors(actorsFuture.get());actorRequestVO.setDirector(actorVOFuture.get());infoRequestVO.setActors(actorsFuture.get());infoRequestVO.setBiography(filmDescVOFuture.get().getBiography());infoRequestVO.setFilmId(filmId);infoRequestVO.setImgVO(imgVOFuture.get());filmInfoVO.setInfo04(infoRequestVO);return ResponseVO.success(IMG_PRE, filmInfoVO);}在每個(gè)服務(wù)調(diào)用后,都會(huì)拿到對(duì)應(yīng)Future,拿到值之后,設(shè)置到Future中,通過get方法獲取。
總結(jié)
以上是生活随笔為你收集整理的Dubbo的异步调用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Group Normalization(
- 下一篇: C 异步调用