基于dubbo实现异步调用
1.前言
Java中常見的實現(xiàn)異步調(diào)用的方式:
1.ThreadPool
2.CompletableFuture
3.MQ
4.BlockingQueue
5.Fork/Join
那么作為一款優(yōu)秀的RPC框架,dubbo是如何實現(xiàn)異步調(diào)用的呢?本文將介紹2.6.x版本以來dubbo異步調(diào)用方式的演進。
1.增加consumer配置
2.參數(shù)回調(diào)(2.7.0已廢棄,本文將不展開)
3.事件通知
4.直接定義返回CompletableFuture的服務接口
5.利用AsyncFor注解實現(xiàn)客戶端的同步轉(zhuǎn)異步
6.利用RpcContext.startAsync()實現(xiàn)服務端的同步轉(zhuǎn)異步
其中前面3種方式在2.6.x版本中就已支持,但參數(shù)回調(diào)在2.7.0版本中已廢棄,后面3種則是在2.7.0版本中新增的方式。
2.基于dubbo實現(xiàn)異步調(diào)用
2.1 增加consumer配置
這種方式很簡單,只需要在服務引用時增加<dubbo:method>配置即可,如下所示,其中name為需要異步調(diào)用的方法名,async表示是否啟用異步調(diào)用。
<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880"><dubbo:method name="sayHello" async="true" /> </dubbo:reference>此時consumer端有3種調(diào)用方式:
- 由于配置了異步調(diào)用,因此此時直接調(diào)用將返回null:
- 通過RpcContext獲取Future對象,調(diào)用get方法時阻塞知道返回結(jié)果:
- 通過ResponseFuture設置回調(diào),執(zhí)行完成會回調(diào)done方法,拋異常則會回調(diào)caught方法:
如果只想異步調(diào)用,不需要返回值,則可以配置 return="false",這樣可以避免Future對象的創(chuàng)建,此時RpcContext.getContext().getFuture()將返回null;
2.2 直接定義返回CompletableFuture的服務接口
在上述方式中,想獲取異步調(diào)用的結(jié)果,需要從RpcContext中獲取,使用起來不是很方便。基于java 8中引入的CompletableFuture,dubbo在2.7.0版本中也增加了對CompletableFuture的支持,我們可以直接定義一個返回CompletableFuture類型的接口。
public interface AsyncService {String sayHello(String name);CompletableFuture<String> sayHelloAsync(String name); }服務端實現(xiàn)如下:
public class AsyncServiceImpl implements AsyncService {@Overridepublic String sayHello(String name) {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return name;}@Overridepublic CompletableFuture<String> sayHelloAsync(String name) {return CompletableFuture.supplyAsync(() -> name);} }如此一來,我們就實現(xiàn)了服務端的異步,客戶端直接調(diào)用接口即可,不需要再從RpcContext中獲取返回值:
CompletableFuture<String> completableFuture = asyncService.sayHelloAsync("async"); String result = completableFuture.get();2.3 事件通知
dubbo允許consumer 端在調(diào)用之前、調(diào)用之后或出現(xiàn)異常時,觸發(fā) oninvoke、onreturn、onthrow 三個事件。類似于Spring中的前置增強、后置增強和異常拋出增強。只需要在服務引用時,增加以下配置指定事件通知的方法即可:
<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880"><dubbo:method name="sayHello" oninvoke="notifyServiceImpl.onInvoke" onreturn="notifyServiceImpl.onReturn" onthrow="notifyServiceImpl.onThrow" /> </dubbo:reference>事件通知服務如下:
public class NotifyServiceImpl implements NotifyService {// 方法參數(shù)與調(diào)用方法參數(shù)相同@Overridepublic void onInvoke(String name) {System.out.println("onInvoke: " + name);}// 第一個參數(shù)為調(diào)用方法的返回值,其余為調(diào)用方法的參數(shù)@Overridepublic void onReturn(String retName, String name) {System.out.println("onReturn: " + name);}// 第一個參數(shù)為調(diào)用異常,其余為調(diào)用方法的參數(shù)@Overridepublic void onThrow(Throwable ex, String name) {System.out.println("onThrow: " + name);} }與Spring增強不同的是,dubbo中的事件通知也可以是異步,只需要將調(diào)用方法配置為async="true"即可,但oninvoke方法無法異步執(zhí)行。
2.4 異步調(diào)用源碼分析
dubbo中的異步調(diào)用實際上是通過引入一個FutureFilter來實現(xiàn)的,關(guān)鍵源碼如下。
2.4.1 調(diào)用前獲取方法信息
@Activate(group = Constants.CONSUMER) public class FutureFilter implements PostProcessFilter {protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);@Overridepublic Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {fireInvokeCallback(invoker, invocation);// need to configure if there's return value before the invocation in order to help invoker to judge if it's// necessary to return future.return postProcessResult(invoker.invoke(invocation), invoker, invocation);}... }在fireInvokeCallback()方法中,會首先調(diào)用getAsyncMethodInfo()獲取目標方法的方法信息,看是否有配置事件通知:
private ConsumerMethodModel.AsyncMethodInfo getAsyncMethodInfo(Invoker<?> invoker, Invocation invocation) {// 首先獲取消費者信息final ConsumerModel consumerModel = ApplicationModel.getConsumerModel(invoker.getUrl().getServiceKey());if (consumerModel == null) {return null;}// 獲取消費者對應的方法信息ConsumerMethodModel methodModel = consumerModel.getMethodModel(invocation.getMethodName());if (methodModel == null) {return null;}// 獲取消費者對應方法的事件信息,即是否有配置事件通知final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = methodModel.getAsyncInfo();if (asyncMethodInfo == null) {return null;}return asyncMethodInfo; }2.4.2 同步觸發(fā)oninvoke事件
獲取到調(diào)用方法對應的信息后,回到fireInvokeCallback()方法:
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);if (asyncMethodInfo == null) {return;}// 獲取事件配置信息final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();if (onInvokeMethod == null && onInvokeInst == null) {return;}if (onInvokeMethod == null || onInvokeInst == null) {throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());}if (!onInvokeMethod.isAccessible()) {onInvokeMethod.setAccessible(true);}// 獲取方法參數(shù)Object[] params = invocation.getArguments();try {// 觸發(fā)oninvoke事件onInvokeMethod.invoke(onInvokeInst, params);} catch (InvocationTargetException e) {// 觸發(fā)onthrow事件fireThrowCallback(invoker, invocation, e.getTargetException());} catch (Throwable e) {fireThrowCallback(invoker, invocation, e);} }2.4.3 調(diào)用結(jié)果處理
方法調(diào)用完成后,會回到postProcessResult()方法:
@Override public Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation) {// 如果是異步調(diào)用,返回結(jié)果會被封裝成AsyncRpcResult類型的對象,具體在哪里封裝的,后面會講到if (result instanceof AsyncRpcResult) {AsyncRpcResult asyncResult = (AsyncRpcResult) result;asyncResult.thenApplyWithContext(r -> {asyncCallback(invoker, invocation, r);return r;});return asyncResult;} else {syncCallback(invoker, invocation, result);return result;} }syncCallback和asyncCallback里面的邏輯比較簡單,就是根據(jù)方法是正常返回還是拋異常,觸發(fā)對應的事件。可以看到,如果被調(diào)用方法是同步的,則這兩個事件也是同步的,反之亦然。
2.4.4 方法調(diào)用核心過程
在postProcessResult()方法中,第一個參數(shù)是invoker.invoke(invocation),這里就會走到下一個Filter鏈完成filter鏈的處理,最終調(diào)到原始服務,走到DubboInvoker#doInvoke方法:
protected Result doInvoke(final Invocation invocation) throws Throwable {...try {// 讀取async配置boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);// 讀取future_generated/future_returntype配置,還沒搞明白是干啥的boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);// 讀取return配置boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);if (isOneway) {// 如果配置return="true",future對象就直接設置為nullboolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);currentClient.send(inv, isSent);RpcContext.getContext().setFuture(null);return new RpcResult();} else if (isAsync) {// 如果配置async="true",構(gòu)建future對象ResponseFuture future = currentClient.request(inv, timeout);// For compatibilityFutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);RpcContext.getContext().setFuture(futureAdapter);// 同時將返回結(jié)果包裝為AsyncResult對象Result result;if (isAsyncFuture) {// register resultCallback, sometimes we need the asyn result being processed by the filter chain.result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);} else {result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);}return result;} else {// 否則就是同步調(diào)用,future當然也是nullRpcContext.getContext().setFuture(null);return (Result) currentClient.request(inv, timeout).get();}}... }通過這個過程不難發(fā)現(xiàn),不管是同步調(diào)用還是異步調(diào)用,最終都會走到ExchangeClient#send方法,再往下會走到HeaderExchangeChannel#request方法,這個一個異步方法,返回ResponseFuture對象。
@Overridepublic ResponseFuture request(Object request, int timeout) throws RemotingException {if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setData(request);DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);try {channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future;}看到這里我才恍然大悟,原來dubbo中同步調(diào)用也是通過異步調(diào)用來實現(xiàn),只是同步調(diào)用發(fā)起后,直接調(diào)用future#get的方法來同步等待結(jié)果的返回,而異步調(diào)用只返回Future Response,在用戶需要關(guān)心其結(jié)果時才調(diào)用get方法
總結(jié)
以上是生活随笔為你收集整理的基于dubbo实现异步调用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JQuery模仿淘宝天猫魔盒抢购页面倒计
- 下一篇: eMule备份