Alibaba Dubbo框架同步调用原理分析-2
2019獨角獸企業重金招聘Python工程師標準>>>
接上一篇,看一下Dubbo的相關代碼關鍵代碼:
| com.taobao.remoting.impl.DefaultClient.java //同步調用遠程接口 public Object?invokeWithSync(Object appRequest, RequestControl control) throws RemotingException, InterruptedException { ??????? byte protocol = getProtocol(control); ??????? if (!TRConstants.isValidProtocol(protocol)) { ??????????? throw new RemotingException("Invalid serialization protocol [" + protocol + "] on invokeWithSync."); ??????? } ??????? ResponseFuture future =?invokeWithFuture(appRequest, control); ??????? return future.get();??//獲取結果時讓當前線程等待,ResponseFuture其實就是前面說的callback } public ResponseFuture?invokeWithFuture(Object appRequest, RequestControl control) { ???????? byte protocol = getProtocol(control); ???????? long timeout = getTimeout(control); ???????? ConnectionRequest request = new ConnectionRequest(appRequest); ???????? request.setSerializeProtocol(protocol); ???????? Callback2FutureAdapter adapter = new Callback2FutureAdapter(request); ???????? connection.sendRequestWithCallback(request, adapter, timeout); ???????? return adapter; } |
?
|
Callback2FutureAdapter implements ResponseFuture public Object?get()?throws RemotingException, InterruptedException { synchronized (this) { ?// 旋鎖 ?? ?while (!isDone) { ?// 是否有結果了 wait();?//沒結果是釋放鎖,讓當前線程處于等待狀態 ?? ?} } if (errorCode == TRConstants.RESULT_TIMEOUT) { ?? ?throw new TimeoutException("Wait response timeout, request[" ?? ?+ connectionRequest.getAppRequest() + "]."); } else if (errorCode > 0) { ?? ?throw new RemotingException(errorMsg); } else { ?? ?return appResp; } } 客戶端收到服務端結果后,回調時相關方法,即設置isDone = true并notifyAll() public void?handleResponse(Object _appResponse) { ???????? appResp = _appResponse;?//將遠程調用結果設置到callback中來 ???????? setDone(); } public void?onRemotingException(int _errorType, String _errorMsg) { ???????? errorCode = _errorType; ???????? errorMsg = _errorMsg; ???????? setDone(); } private void?setDone() { ???????? isDone = true; ???????? synchronized (this) {?//獲取鎖,因為前面wait()已經釋放了callback的鎖了 ???????? ??? notifyAll();?//?喚醒處于等待的線程 ???????? } } |
?
| com.taobao.remoting.impl.DefaultConnection.java ? //?用來存放請求和回調的MAP private final ConcurrentHashMap<Long, Object[]>?requestResidents; ? //發送消息出去 void?sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { ???????? long requestId = connRequest.getId(); ???????? long waitBegin = System.currentTimeMillis(); ???????? long waitEnd = waitBegin + timeoutMs; ???????? Object[] queue = new Object[4]; ???????? int idx = 0; ???????? queue[idx++] = waitEnd; ???????? queue[idx++] = waitBegin;?? //用于記錄日志 ???????? queue[idx++] = connRequest; //用于記錄日志 ???????? queue[idx++] = callback; ?????????requestResidents.put(requestId, queue);?//?記錄響應隊列 ?????????write(connRequest); ? ???????? //?埋點記錄等待響應的Map的大小 ???????? StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), ?????????????????? 1L); } public void?write(final Object connectionMsg) { //mina里的IoSession.write()發送消息 ???????? WriteFuture writeFuture = ioSession.write(connectionMsg); ???????? //?注冊FutureListener,當請求發送失敗后,能夠立即做出響應 ???????? writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); } ? /** *?在得到響應后,刪除對應的請求隊列,并執行回調 *?調用者:MINA線程 */ public void?putResponse(final ConnectionResponse connResp) { ???????? final long?requestId?= connResp.getRequestId(); ???????? Object[] queue =?requestResidents.remove(requestId); ???????? if (null == queue) { ???????? ??? Object appResp = connResp.getAppResponse(); ???????? ??? String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); ???????? ??? StringBuilder sb = new StringBuilder(); ???????? ??? sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); ???????? ??? sb.append("from [").append(connResp.getHost()).append("],"); ???????? ??? sb.append("response type [").append(appRespClazz).append("]."); ???????? ??? LOGGER.warn(sb.toString()); ???????? ??? return; ???????? } ???????? int idx = 0; ???????? idx++; ???????? long waitBegin = (Long) queue[idx++]; ???????? ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ???????? ResponseCallback?callback?= (ResponseCallback) queue[idx++]; ???????? // **?把回調任務交給業務提供的線程池執行?** ???????? Executor callbackExecutor = callback.getExecutor(); ???????? callbackExecutor.execute(new?CallbackExecutorTask(connResp, callback)); ? ???????? long duration = System.currentTimeMillis() - waitBegin; //?實際讀響應時間 ???????? logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |
?
| CallbackExecutorTask static private class CallbackExecutorTask implements Runnable { ???????? final ConnectionResponse resp; ???????? final ResponseCallback callback; ???????? final Thread createThread; ? ???????? CallbackExecutorTask(ConnectionResponse _resp, ResponseCallback _cb) { ???????? ??? resp = _resp; ???????? ??? callback = _cb; ???????? ??? createThread = Thread.currentThread(); ???????? } ? ???????? public void run() { ???????? ??? //?預防這種情況:業務提供的Executor,讓調用者線程來執行任務 ???????? ??? if (createThread == Thread.currentThread() ?????????????????? ??? && callback.getExecutor() != DIYExecutor.getInstance()) { ?????????????????? StringBuilder sb = new StringBuilder(); ?????????????????? sb.append("The network callback task [" + resp.getRequestId() + "] cancelled, cause:"); ?????????????????? sb.append("Can not callback task on the network io thhread."); ?????????????????? LOGGER.warn(sb.toString()); ?????????????????? return; ???????? ??? } ? ???????? ??? if (TRConstants.RESULT_SUCCESS == resp.getResult()) { ???????????????????callback.handleResponse(resp.getAppResponse());?//設置調用結果 ???????? ??? } ???????? ??? else { ???????????????????callback.onRemotingException(resp.getResult(), resp ??????????????????????????? .getErrorMsg());??//處理調用異常 ???????? ??? } ???????? } } |
?
另外:
1,?服務端在處理客戶端的消息,然后再處理時,使用了線程池來并行處理,不用一個一個消息的處理
同樣,客戶端接收到服務端的消息,也是使用線程池來處理消息,再回調
轉載于:https://my.oschina.net/91jason/blog/374170
總結
以上是生活随笔為你收集整理的Alibaba Dubbo框架同步调用原理分析-2的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: OD调试9—实例:深入分析代码完成软件破
- 下一篇: Alibaba Dubbo框架同步调用原