Dubbo 3.0 预览版解读,6到飞起~
Dubbo 自 2011 年 10 月 27 日開源后,已被許多非阿里系的公司使用,其中既有當當網(wǎng)、網(wǎng)易考拉等互聯(lián)網(wǎng)公司,也不乏中國人壽、青島海爾等大型傳統(tǒng)企業(yè)。
自去年 12 月開始,Dubbo 3.0?便已正式進入開發(fā)階段《重大利好,Dubbo 3.0要來了》,并備受社區(qū)和廣大 Dubbo 用戶的關(guān)注,本文將為您詳細解讀?3.0 預(yù)覽版的新特性和新功能。
下面先解答一下兩個有意思的與 Dubbo 相關(guān)的疑問。
-
為什么 Dubbo 一開源就是 2.0 版本?之前是否存在 1.0 版本?
筆者曾做過 Dubbo 協(xié)議的適配兼容,Dubbo 確實存在過 1.x 版本,而且從協(xié)議設(shè)計和模型設(shè)計上都與 2.0 的開源版本協(xié)議是完全不一樣的。下圖是關(guān)于 Dubbo 的發(fā)展路徑:
-
阿里內(nèi)部正在使用 Dubbo 開源版本嗎?
是的,非常確定,當前開源版本的 Dubbo 在阿里巴巴被廣泛使用,而阿里的電商核心部門是用的 HSF2.2 版本,這個版本是兼容了 Dubbo 使用方式和 Remoting 協(xié)議。當然,我們現(xiàn)在正在做 HSF2.2 的升級,直接依賴開源版本的 Dubbo 來做內(nèi)核的統(tǒng)一。所以,Dubbo 是得到大規(guī)模線上系統(tǒng)驗證的分布式服務(wù)框架,這一點毋容置疑。
Dubbo 3.0 預(yù)覽版的要點
Dubbo 3.0 在設(shè)計和功能上的新增支持和改進,主要是以下四方面:
-
Dubbo 內(nèi)核之 Filter 鏈的異步化
這里要指出的是,3.0 中規(guī)劃的異步去阻塞和 2.7 中提供的異步是兩個層面的特性。2.7 中的異步是建立在傳統(tǒng) RPC 中 request – response 會話模型上的,而 3.0 中的異步將會從通訊協(xié)議層面由下向上構(gòu)建,關(guān)注的是跨進程、全鏈路的異步問題。通過底層協(xié)議開始支持 streaming 方式,不單單可以支持多種會話模型,還可以在協(xié)議層面開始支持反壓、限流等特性,使得整個分布式體系更具有彈性。綜上所述,2.7 關(guān)注的異步更局限在點對點的異步(一個 consumer 調(diào)用一個 provider),3.0 關(guān)注的異步化,寬度上則關(guān)注整個調(diào)用鏈上的異步,高度上則向上又可以包裝成 Rx 的編程模型。有趣的是,Spring 5.0 發(fā)布了對 Flux 的支持,隨后開始解決跨進程的異步問題。
-
功能方面是 reactive(響應(yīng)式)支持
最近幾年,?reactive programming這個詞語的熱度迅速提升,Wikipedia 上的 reactive programming 解釋是 reactive programming is a programming paradigm oriented around data flows and the propagation of change. Dubbo3.0會實現(xiàn)Reactive Stream 的 rx 接口,從而能讓用戶享受到RP帶來的響應(yīng)性提升,甚至面向 RP 的架構(gòu)升級。當然,我們希望 reactive 不單單能夠帶來事件(event)驅(qū)動的應(yīng)用集成方式的升級,也希望在 Load Balance(選擇最優(yōu)的服務(wù)節(jié)點),fault tolerance(限流降級時最好做到自適應(yīng))等方面發(fā)揮其積極價值。
-
云原生/ ServiceMesh 方向的探索
我們定下的策略是進入 Envoy 社區(qū)來實現(xiàn) Dubbo 融入 mesh 的理念思想,目前 Dubbo 協(xié)議已經(jīng)被 Envoy 支持。當然,Dubbo Mesh 離真正可用還有很長一段距離,其在選址、負載均衡和服務(wù)治理方面的工作需要繼續(xù)在數(shù)據(jù)面建設(shè),另外,控制面板的建設(shè)在社區(qū)也沒有提上日程。
-
融合并支持阿里內(nèi)部
Dubbo 3.0 定下了內(nèi)外融合的策略,也就是說 3.0 的核心最終會在阿里巴巴的生產(chǎn)系統(tǒng)中部署,相信通過大流量、大規(guī)模的考驗,Dubbo 用戶可以獲得一個性能、穩(wěn)定、服務(wù)治理實踐各方面俱佳的核心,用戶在生產(chǎn)系統(tǒng)中采用 3.0 也會更加放心。這一點也是 Dubbo 3.0 最重要的使命。
Filter 鏈的異步化設(shè)計
Dubbo 最強大的一處設(shè)計是其在 Filter 鏈上的抽象設(shè)計,通過其擴展機制的開放性支持,用戶可以對 Dubbo 做功能增強,并允許各個擴展點被定制來是否保留。
Dubbo?的?Filter?定義如下:
按照“調(diào)用一個遠程服務(wù)的方法就像調(diào)用本地的方法一樣”這種說法,這個直接返回 Result 響應(yīng)的方式是非常好的,用起來是簡單直接,問題是時代變換到了需要關(guān)注體驗,需要走 Reactive 響應(yīng)式的時代,也回到基本點:invoke一個 invocation 需要經(jīng)過網(wǎng)絡(luò)在不同的進程處理,天然就是異步的過程,也就是發(fā)送請求(invocation)與接收響應(yīng)(Result)本身是兩個不同的事件,是需要兩個過程方法來在 Filter 鏈處理。那么如何改造這個關(guān)鍵的 SPI 呢?有兩種方案:
第一種,把 invoke 的返回值改成 CompletableFuture, 好處是一目了然,Result 不在建議同步獲取了;但基礎(chǔ)接口的簽名一改會導致代碼改造量巨大,同時也會讓原有的 SPI 擴展不在支持。
第二種,Result 接口直接繼承 CompletationStage,是代表了響應(yīng)的異步計算。這樣能進避免第一種的劣勢。所以,3.0.0 Preview 版本對內(nèi)部調(diào)用鏈路實現(xiàn)做了一次重構(gòu):基于 CompletableFuture 實現(xiàn)了框架內(nèi)部的全異步調(diào)用,而在外圍編程上,同時支持同步、異步調(diào)用模式。
值得注意的是,此次重構(gòu)僅限于框架內(nèi)部實現(xiàn),對使用方?jīng)]有任何影響即接口上保持完全兼容。要了解 Dubbo 異步 API 如何使用,請參考《如何基于 Dubbo 實現(xiàn)全異步的調(diào)用鏈》(地址:http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html),這篇文章將著重對實現(xiàn)思路和原理做一些簡單介紹。此次重構(gòu)的要點有:
-
框架內(nèi)部采用全異步調(diào)用模型,僅在外圍做同步、異步適配;
-
內(nèi)置Filter鏈支持異步回調(diào);
基本工作流程
首先我們來看一個通用的跨網(wǎng)絡(luò)異步調(diào)用的線程模型:
通信框架異步發(fā)送請求消息,請求消息發(fā)送成功后,返回代表業(yè)務(wù)結(jié)果的 CompletableFuture 給業(yè)務(wù)線程。之后對于 Future 的處理,根據(jù)調(diào)用類型會有所區(qū)別:
對于同步請求(如上圖體現(xiàn)的場景),業(yè)務(wù)線程會調(diào)用 future.get 同步阻塞等待結(jié)果,當收到網(wǎng)絡(luò)層返回的業(yè)務(wù)結(jié)果后,future.get 返回并最終將結(jié)果傳遞給調(diào)用發(fā)起方。
對于異步請求,業(yè)務(wù)線程不會調(diào)用 future.get,而是將 future 保存在調(diào)用上下文或者直接返回給調(diào)用者,同時會為 future 注冊回調(diào)監(jiān)聽器,以便當真正的業(yè)務(wù)結(jié)果從通信層返回時監(jiān)聽器可以對結(jié)果做進一步的處理。
接下來具體看一下一次異步 Dubbo RPC 請求的調(diào)用流程:
消費方面向 Proxy 代理編程,發(fā)出調(diào)用請求,請求經(jīng)過 Filter 鏈向下傳遞。
Invoker.invoke() 將請求異步轉(zhuǎn)發(fā)給網(wǎng)絡(luò)層,并收到代表返回結(jié)果的 Future。
Future 被包裝到 Result,轉(zhuǎn)而由 Result 代表這次遠程調(diào)用的結(jié)果(由于 Result 的異步屬性,此時它可能并不包含真正的返回值)。
Result 繼續(xù)沿著調(diào)用鏈返回,在經(jīng)過每個 Filter 時,Filter 可選擇注冊 Listener 監(jiān)聽器,以便在業(yè)務(wù)結(jié)果返回時執(zhí)行結(jié)果預(yù)處理。
最終 Proxy 調(diào)用 result.recreate() 將結(jié)果返回給消費者:
-
如果方法是 CompletableFuture 簽名,則返回 Future;
-
如果方法是普通同步簽名,則返回對象默認值,Future 可通過 RpcContext 拿到;
? 6.?調(diào)用方在拿到代表異步業(yè)務(wù)結(jié)果的 Future 后,可選擇注冊回調(diào)監(jiān)聽器,以監(jiān)聽真正的業(yè)務(wù)結(jié)果返回。
同步調(diào)用和異步調(diào)用基本上是一致的,并且也是走的回調(diào)模式,只是在鏈路返回之前做了一次阻塞 get 調(diào)用,以確保在收到實際結(jié)果時再返回。Filter 在注冊 Listener 時由于 Future 已處于 complete 狀態(tài),因此會同時觸發(fā)回調(diào) onResponse()/onError()。
關(guān)于流程圖中提到的 Result,Result 在 Dubbo 的一次 RPC 調(diào)用中代表返回結(jié)果,在 3.0 中 Result 自身增加了代表狀態(tài)的接口,類似 Future 現(xiàn)在 Result 可以代表一次未完成的調(diào)用。
要讓 Result 具備代表異步返回結(jié)果的能力,有兩中方式來實現(xiàn):
1. Result is a Future,在 Java 8 中更合理的方式是繼承 CompletionStage 接口。
public interface Result extends CompletionStage {}2. 讓 Result 實例持有 Future 實例,與 1 的區(qū)別即是設(shè)計中選用“繼承”還是“組合”。
public class AsyncRpcResult implements Result {private CompletableFuture<RpcResult> resultFuture;}同時,為了讓 Result 更直觀的體現(xiàn)其異步結(jié)果的特性,也為了方便面向 Result 接口編程,我們可以考慮為Result增加一些異步接口:
public interface Result extends Serializable {Result thenApplyWithContext(Function<Result, Result> fn);<U> CompletableFuture<U> thenApply(Function<Result,???extends U> fn);Result get() throws InterruptedException, ExecutionException;}Filter SPI
Filter 是 Dubbo 預(yù)置的攔截器擴展 SPI,用來做請求的預(yù)處理、結(jié)果的后處理,框架本身內(nèi)置了一些攔截器實現(xiàn),而從用戶層面,我相信這個 SPI 也應(yīng)該是被擴展最多的一個。在 3.0 版本中,Filter 回歸單一職責的設(shè)計模式,將回調(diào)接口單獨提取到 Listener 中。
@SPI public interface Filter {Result invoke(Invoker<?>?invoker,?Invocation invocation) throws RpcException;interface Listener {void onResponse(Result result, Invoker<?>?invoker,?Invocation invocation);void onError(Throwable t, Invoker<?>?invoker,?Invocation invocation);} }以上是 Filter 的 SPI 定義,Filter 的核心定義中只有一個 invoke() 方法用來傳遞調(diào)用請求。
同時,增加了一個新的回調(diào)接口 Listener,每個 Filter 實現(xiàn)可以定義自己的 Listenr 回調(diào)器,從而實現(xiàn)對返回結(jié)果的異步監(jiān)聽,參考以下是為 MonitorFilter 增加的 Listener 回調(diào)實現(xiàn):
class MonitorListener implements Listener {@Overridepublic void onResponse(Result result, Invoker<?>?invoker,?Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(),Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false);getConcurrent(invoker, invocation).decrementAndGet(); // count down}}@Overridepublic void onError(Throwable t, Invoker<?>?invoker,?Invocation invocation) {if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true);getConcurrent(invoker, invocation).decrementAndGet(); // count down}} }泛化調(diào)用異步接口支持
為了更直觀的做異步調(diào)用,泛化接口新增了?
CompletableFuture<Object>$invokeAsync(Stringmethod,String[]parameterTypes,Object[]args)接口:
public interface GenericService {/*** Generic invocation** @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is* required, e.g. findPerson(java.lang.String)* @param parameterTypes Parameter types* @param args Arguments* @return invocation return value* @throws GenericException potential exception thrown from the invocation*/Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {Object object = $invoke(method, parameterTypes, args);if (object instanceof CompletableFuture) {return (CompletableFuture<Object>) object;}return CompletableFuture.completedFuture(object);} }這樣,當我們想做異步調(diào)用時,就可以直接這樣使用:
CompletableFuture<Object> genericService.$invokeAsync(method, parameterTypes, args);更具體用例請參見《泛化調(diào)用示例》
https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-generic/dubbo-samples-generic-call
異步與性能
組要注意的是,框架內(nèi)部的異步實現(xiàn)本身并不能提高單次調(diào)用的性能,相反,由于線程切換和回調(diào)邏輯的存在,異步反而可能會導致單次調(diào)用性能的下降,但是異步帶來的優(yōu)勢是能減少對資源的占用,提升整個系統(tǒng)的并發(fā)程度和吞吐量,這點對于 RPC 這種需要處理網(wǎng)絡(luò)延遲的場景非常適用。更多關(guān)于異步化設(shè)計的好處,請參考其他異步化原理介紹相關(guān)文章。
響應(yīng)式編程支持
響應(yīng)式編程讓開發(fā)者更方便地編寫高性能的異步代碼,很可惜,在之前很長一段時間里,dubbo 并不支持響應(yīng)式編程,簡單來說,dubbo 不支持在 rpc 調(diào)用時使用 Mono/Flux 這種流對象(reative-stream 里流的概念),給用戶使用帶來了不便。(關(guān)于響應(yīng)式編程更詳細的信息請參見這里:http://reactivex.io/)。
RSocket 是一個開源的支持 reactive-stream 語義的網(wǎng)絡(luò)通信協(xié)議,他將 reative 語義的復(fù)雜邏輯封裝起來了,使得上層可以方便實現(xiàn)網(wǎng)絡(luò)程序。(RSocket詳細資料請參見這里:http://rsocket.io/)。
dubbo 在 3.0.0-SNAPSHOT ?版本里基于 RSocket 對響應(yīng)式編程進行了簡單的支持,用戶可以在請求參數(shù)和返回值里使用 Mono 和 Flux 類型的對象。下面我們給出使用范例,(范例源碼可以在這里獲取:https://github.com/apache/incubator-dubbo-samples/tree/3.x/dubbo-samples-rsocket)。
首先定義接口如下:
public interface DemoService {Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2);Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2); }然后實現(xiàn)該 demo 接口:
public class DemoServiceImpl implements DemoService {@Overridepublic Mono<String> requestMonoWithMonoArg(Mono<String> m1, Mono<String> m2) {return m1.zipWith(m2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});}@Overridepublic Flux<String> requestFluxWithFluxArg(Flux<String> f1, Flux<String> f2) {return f1.zipWith(f2, new BiFunction<String, String, String>() {@Overridepublic String apply(String s, String s2) {return s+" "+s2;}});} }然后配置并啟動服務(wù)端,注意協(xié)議名字填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo?http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!--?provider's?application?name,?used?for?tracing?dependency?relationship?--> <dubbo:application name="demo-provider"/> <!--?use?registry?center?to?export?service?--> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!--?use?dubbo?protocol?to?export?service?on?port?20880?--> <dubbo:protocol name="rsocket" port="20890"/> <!--?service?implementation,?as?same?as?regular?local?bean?--> <bean id="demoService" class="org.apache.dubbo.samples.basic.impl.DemoServiceImpl"/> <!--?declare?the?service?interface?to?be?exported?--> <dubbo:service interface="org.apache.dubbo.samples.basic.api.DemoService" ref="demoService"/> </beans> public class RsocketProvider {public static void main(String[] args) throws Exception {new EmbeddedZooKeeper(2181, false).start();ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-provider.xml"});context.start();System.in.read(); // press any key to exit} }然后配置并啟動消費者消費者如下, 注意協(xié)議名填寫 rsocket:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xmlns="http://www.springframework.org/schema/beans" xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsdhttp://dubbo.apache.org/schema/dubbo?http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> <!--?consumer's?application?name,?used?for?tracing?dependency?relationship?(not?a?matching?criterion), don't set it same as provider --> <dubbo:application name="demo-consumer"/> <!--?use?registry?center?to?discover?service?--> <dubbo:registry address="zookeeper://127.0.0.1:2181"/> <!--?generate?proxy?for?the?remote?service,?then?demoService?can?be?used?in?the?same?way?as?the local regular interface --> <dubbo:reference id="demoService" check="true" interface="org.apache.dubbo.samples.basic.api.DemoService"/> </beans> public class RsocketConsumer {public static void main(String[] args) {ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"spring/rsocket-consumer.xml"});context.start();DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxywhile (true) {try {Mono<String> monoResult = demoService.requestMonoWithMonoArg(Mono.just("A"), Mono.just("B"));monoResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).block();Flux<String> fluxResult = demoService.requestFluxWithFluxArg(Flux.just("A","B","C"), Flux.just("1","2","3"));fluxResult.doOnNext(new Consumer<String>() {@Overridepublic void accept(String s) {System.out.println(s);}}).blockLast();} catch (Throwable throwable) {throwable.printStackTrace();}}} }可以看到配置上除了協(xié)議名使用 rsocket 以外其他并沒有特殊之處。
實現(xiàn)原理
以前用戶并不能在參數(shù)或者返回值里使用 Mono/Flux 這種流對象(reative-stream 里的流的概念)。因為流對象自帶異步屬性,當業(yè)務(wù)把流對象作為參數(shù)或者返回值傳遞給框架之后,框架并不能將流對象正確的進行序列化。
dubbo 基于 RSocket 實現(xiàn)了 reative 支持。RSocket 將 reative 語義的復(fù)雜邏輯封裝起來了,給上層提供了簡潔的抽象如下:
/** * Fire and Forget interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} that completes when the passed {@code payload} is successfully * handled, otherwise errors. */ Mono<Void> fireAndForget(Payload payload); /** * Request-Response interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing at most a single {@code Payload} representing the * response. */ Mono<Payload> requestResponse(Payload payload); /** * Request-Stream interaction model of {@code RSocket}. * * @param payload Request payload. * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. */ Flux<Payload> requestStream(Payload payload); /** * Request-Channel interaction model of {@code RSocket}. * * @param payloads Stream of request payloads. * @return Stream of response payloads. */ Flux<Payload> requestChannel(Publisher<Payload> payloads);我們只需要在此基礎(chǔ)上添加我們的 rpc 邏輯即可。
-
從客戶端視角看,框架建立連接之后,只需要將請求信息編碼到 Payload 里,然后通過 requestStream 方法即可向服務(wù)端發(fā)起請求。
-
從服務(wù)端視角看,rsocket 收到請求之后,會調(diào)用我們實現(xiàn)的 requestStream 方法,我們從 Payload 里解碼得到請求信息之后,調(diào)用業(yè)務(wù)方法,然后拿到 Flux 類型的返回值即可。
-
需要注意的是業(yè)務(wù)返回值一般是 Flux,而 RSocket 要求的是 Flux,所以我們需要通過 map operator 攔截業(yè)務(wù)數(shù)據(jù),將 BizDO 編碼為 Payload 才可以遞交給我 RSocket。而 RSocket 會負責數(shù)據(jù)的傳輸和 reative 語義的實現(xiàn)。
經(jīng)過上面的分析,我們知道了 Dubbo 如何基于 RSocket 實現(xiàn)了響應(yīng)式編程的支持。有了響應(yīng)式編程支持,業(yè)務(wù)可以更加方便的實現(xiàn)異步邏輯。
小結(jié)
當前 Dubbo 3.0 將提供具備當代特性(如響應(yīng)性編程)的相關(guān)支持,同時汲取阿里內(nèi)部 HSF 的設(shè)計長處來實現(xiàn)兩者的融合,當前預(yù)覽版的很多地方還在探討中,希望大家能夠積極反饋,我們都會虛心學習并參考。
Dubbo 3.0 sample @GitHub:
https://github.com/apache/incubator-dubbo-samples/tree/3.x
覃柳杰(花名:未宇) Github ID: qinliujie,阿里巴巴中間件開發(fā),Dubbo 開源項目 PMC,參與 HSF2.2和 Dubbo3.0 的設(shè)計和開發(fā)。
呂仁琦(花名:空冥) Github ID: jefflv,阿里巴巴中間件開發(fā),Dubbo 開源項目 commiter,參與了內(nèi)部 HSF2.0 的設(shè)計和開發(fā)。
劉軍(花名:陸龜) 阿里巴巴中間件高級開發(fā)工程師,Apache Dubbo (Incubating)PPMC,深度參與 Dubbo 項目開發(fā),主要貢獻者之一。
謝育能(花名:思邪)阿里巴巴中間件開發(fā),Dubbo 3.0 開源項目的響應(yīng)式模塊的負責人,參與了內(nèi)部 HSF2.2 的設(shè)計和開發(fā)。
總結(jié)
以上是生活随笔為你收集整理的Dubbo 3.0 预览版解读,6到飞起~的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 欠阿里云一分钱,会是什么样的后果。。。
- 下一篇: Java 程序员必备的 Intellij