dubbo源码解析(十)远程通信——Exchange层
遠程通訊——Exchange層
目標:介紹Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解析。前言
上一篇文章我講的是dubbo框架設計中Transport層,這篇文章我要講的是它的上一層Exchange層,也就是信息交換層。官方文檔對這一層的解釋是封裝請求響應模式,同步轉異步,以 Request, Response為中心,擴展接口為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer。
這一層的設計意圖是什么?它應該算是在信息傳輸層上又做了部分裝飾,為了適應rpc調用的一些需求,比如rpc調用中一次請求只關心它所對應的響應,這個時候只是一個message消息傳輸過來,是無法區分這是新的請求還是上一個請求的響應,這種類似于冪等性的問題以及rpc異步處理返回結果、內置事件等特性都是在Transport層無法解決滿足的,所有在Exchange層講message分成了request和response兩種類型,并且在這兩個模型上增加一些系統字段來處理問題。具體我會在下面講到。而dubbo把一條消息分為了協議頭和內容兩部分:協議頭包括系統字段,例如編號等,內容包括具體請求的參數和響應的結果等。在exchange層中大量邏輯都是基于協議頭的。
現在對這一層的設計意圖大致應該有所了解了吧,現在來看看exchange的類圖:
我講解的順序還是按照類圖從上而下,分塊講解,忽略綠色的test類。
源碼解析
(一)ExchangeChannel
public interface ExchangeChannel extends Channel {ResponseFuture request(Object request) throws RemotingException;ResponseFuture request(Object request, int timeout) throws RemotingException;ExchangeHandler getExchangeHandler();@Overridevoid close(int timeout);}該接口是信息交換通道接口,有四個方法,前兩個是發送請求消息,區別就是第二個發送請求有超時的參數,getExchangeHandler方法就是返回一個信息交換處理器,第四個是需要覆寫父類的方法。
(二)HeaderExchangeChannel
該類實現了ExchangeChannel,是基于協議頭的信息交換通道。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeChannel.class);/*** 通道的key值*/ private static final String CHANNEL_KEY = HeaderExchangeChannel.class.getName() + ".CHANNEL";/*** 通道*/ private final Channel channel;/*** 是否關閉*/ private volatile boolean closed = false;上述屬性比較簡單,還是放一下這個類的屬性是因為該類中有channel屬性,也就是說HeaderExchangeChannel是Channel的裝飾器,每個實現方法都會調用channel的方法。
2.靜態方法
static HeaderExchangeChannel getOrAddChannel(Channel ch) {if (ch == null) {return null;}// 獲得通道中的HeaderExchangeChannelHeaderExchangeChannel ret = (HeaderExchangeChannel) ch.getAttribute(CHANNEL_KEY);if (ret == null) {// 創建一個HeaderExchangeChannel實例ret = new HeaderExchangeChannel(ch);// 如果通道連接if (ch.isConnected()) {// 加入屬性值ch.setAttribute(CHANNEL_KEY, ret);}}return ret; }static void removeChannelIfDisconnected(Channel ch) {// 如果通道斷開連接if (ch != null && !ch.isConnected()) {// 移除屬性值ch.removeAttribute(CHANNEL_KEY);} }該靜態方法做了HeaderExchangeChannel的創建和銷毀,并且生命周期隨channel銷毀而銷毀。
3.send
@Override public void send(Object message) throws RemotingException {send(message, getUrl().getParameter(Constants.SENT_KEY, false)); }@Override public void send(Object message, boolean sent) throws RemotingException {// 如果通道關閉,拋出異常if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");}// 判斷消息的類型if (message instanceof Request|| message instanceof Response|| message instanceof String) {// 發送消息channel.send(message, sent);} else {// 新建一個request實例Request request = new Request();// 設置信息的版本request.setVersion(Version.getProtocolVersion());// 該請求不需要響應request.setTwoWay(false);// 把消息傳入request.setData(message);// 發送消息channel.send(request, sent);} }該方法是在channel的send方法上加上了request和response模型,最后再調用channel.send,起到了裝飾器的作用。
4.request
@Override public ResponseFuture request(Object request) throws RemotingException {return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); }@Override public ResponseFuture request(Object request, int timeout) throws RemotingException {// 如果通道關閉,則拋出異常if (closed) {throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");}// create request.創建請求Request req = new Request();// 設置版本號req.setVersion(Version.getProtocolVersion());// 設置需要響應req.setTwoWay(true);// 把請求數據傳入req.setData(request);// 創建DefaultFuture對象,可以從future中主動獲得請求對應的響應信息DefaultFuture future = new DefaultFuture(channel, req, timeout);try {// 發送請求消息channel.send(req);} catch (RemotingException e) {future.cancel();throw e;}return future; }該方法是請求方法,用Request模型把請求內容裝飾起來,然后發送一個Request類型的消息,并且返回DefaultFuture實例,DefaultFuture我會在后面講到。
cloes方法也重寫了,我就不再多說,因為比較簡單,沒有重點,其他方法都是直接調用channel屬性的方法。
(三)ExchangeClient
該接口繼承了Client和ExchangeChannel,是信息交換客戶端接口,其中沒有定義多余的方法。
(四)HeaderExchangeClient
該類實現了ExchangeClient接口,是基于協議頭的信息交互客戶端類,同樣它是Client、Channel的適配器。在該類的源碼中可以看到所有的實現方法都是調用了client和channel屬性的方法。該類主要的作用就是增加了心跳功能,為什么要增加心跳功能呢,對于長連接,一些拔網線等物理層的斷開,會導致TCP的FIN消息來不及發送,對方收不到斷開事件,那么就需要用到發送心跳包來檢測連接是否斷開。consumer和provider斷開,處理措施不一樣,會分別做出重連和關閉通道的操作。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(HeaderExchangeClient.class);/*** 定時器線程池*/ private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true)); /*** 客戶端*/ private final Client client; /*** 信息交換通道*/ private final ExchangeChannel channel; // heartbeat timer /*** 心跳定時器*/ private ScheduledFuture<?> heartbeatTimer; // heartbeat(ms), default value is 0 , won't execute a heartbeat. /*** 心跳周期,間隔多久發送心跳消息檢測一次*/ private int heartbeat; /*** 心跳超時時間*/ private int heartbeatTimeout;該類的屬性除了需要適配的屬性外,其他都是跟心跳相關屬性。
2.構造函數
public HeaderExchangeClient(Client client, boolean needHeartbeat) {if (client == null) {throw new IllegalArgumentException("client == null");}this.client = client;// 創建信息交換通道this.channel = new HeaderExchangeChannel(client);// 獲得dubbo版本String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);//獲得心跳周期配置,如果沒有配置,并且dubbo是1.0版本的,則這只為1分鐘,否則設置為0this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);// 獲得心跳超時配置,默認是心跳周期的三倍this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);// 如果心跳超時時間小于心跳周期的兩倍,則拋出異常if (heartbeatTimeout < heartbeat * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");}if (needHeartbeat) {// 開啟心跳startHeartbeatTimer();} }構造函數就是對一些屬性初始化設置,優先從url中獲取。心跳超時時間小于心跳周期的兩倍就拋出異常,意思就是至少重試兩次心跳檢測。
3.startHeartbeatTimer
private void startHeartbeatTimer() {// 停止現有的心跳線程stopHeartbeatTimer();// 如果需要心跳if (heartbeat > 0) {// 創建心跳定時器heartbeatTimer = scheduled.scheduleWithFixedDelay(// 新建一個心跳線程new HeartBeatTask(new HeartBeatTask.ChannelProvider() {@Overridepublic Collection<Channel> getChannels() {// 返回一個只包含HeaderExchangeClient對象的不可變列表return Collections.<Channel>singletonList(HeaderExchangeClient.this);}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);} }該方法就是開啟心跳。利用心跳定時器來做到定時檢測心跳。因為這是信息交換客戶端類,所有這里的只是返回包含HeaderExchangeClient對象的不可變列表,因為客戶端跟channel是一一對應的,只有這一個該客戶端本身的channel需要心跳。
4.stopHeartbeatTimer
private void stopHeartbeatTimer() {if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {try {// 取消定時器heartbeatTimer.cancel(true);// 取消大量已排隊任務,用于回收空間scheduled.purge();} catch (Throwable e) {if (logger.isWarnEnabled()) {logger.warn(e.getMessage(), e);}}}heartbeatTimer = null; }該方法是停止現有心跳,也就是停止定時器,釋放空間。
其他方法都是調用channel和client屬性的方法。
(五)HeartBeatTask
該類實現了Runnable接口,實現的是心跳任務,里面包含了核心的心跳策略。
1.屬性
/*** 通道管理*/ private ChannelProvider channelProvider;/*** 心跳間隔 單位:ms*/ private int heartbeat;/*** 心跳超時時間 單位:ms*/ private int heartbeatTimeout;后兩個屬性跟HeaderExchangeClient中的屬性含義一樣,第一個是該類自己內部的一個接口:
interface ChannelProvider {// 獲得所有的通道集合,需要心跳的通道數組Collection<Channel> getChannels(); }該接口就定義了一個方法,獲得需要心跳的通道集合。可想而知,會對集合內的通道都做心跳檢測。
2.run
@Override public void run() {try {long now = System.currentTimeMillis();// 遍歷所有通道for (Channel channel : channelProvider.getChannels()) {// 如果通道關閉了,則跳過if (channel.isClosed()) {continue;}try {// 最后一次接收到消息的時間戳Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);// 最后一次發送消息的時間戳Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);// 如果最后一次接收或者發送消息到時間到現在的時間間隔超過了心跳間隔時間if ((lastRead != null && now - lastRead > heartbeat)|| (lastWrite != null && now - lastWrite > heartbeat)) {// 創建一個requestRequest req = new Request();// 設置版本號req.setVersion(Version.getProtocolVersion());// 設置需要得到響應req.setTwoWay(true);// 設置事件類型,為心跳事件req.setEvent(Request.HEARTBEAT_EVENT);// 發送心跳請求channel.send(req);if (logger.isDebugEnabled()) {logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");}}// 如果最后一次接收消息的時間到現在已經超過了超時時間if (lastRead != null && now - lastRead > heartbeatTimeout) {logger.warn("Close channel " + channel+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");// 如果該通道是客戶端,也就是請求的服務器掛掉了,客戶端嘗試重連服務器if (channel instanceof Client) {try {// 重新連接服務器((Client) channel).reconnect();} catch (Exception e) {//do nothing}} else {// 如果不是客戶端,也就是是服務端返回響應給客戶端,但是客戶端掛掉了,則服務端關閉客戶端連接channel.close();}}} catch (Throwable t) {logger.warn("Exception when heartbeat to remote channel " + channel.getRemoteAddress(), t);}}} catch (Throwable t) {logger.warn("Unhandled exception when heartbeat, cause: " + t.getMessage(), t);} }該方法中是心跳機制的核心邏輯。注意以下幾個點:
(六)ResponseFuture
public interface ResponseFuture {Object get() throws RemotingException;Object get(int timeoutInMillis) throws RemotingException;void setCallback(ResponseCallback callback);boolean isDone();}該接口是響應future接口,該接口的設計意圖跟java.util.concurrent.Future很類似。發送出去的消息,潑出去的水,只有等到對方主動響應才能得到結果,但是請求方需要去主動回去該請求的結果,就顯得有些艱難,所有產生了這樣一個接口,它能夠獲取任務執行結果、可以核對請求消息是否被響應,還能設置回調來支持異步。
(七)DefaultFuture
該類實現了ResponseFuture接口,其中封裝了處理響應的邏輯。你可以把DefaultFuture看成是一個中介,買房和賣房都通過這個中介進行溝通,中介擁有著買房者的信息request和賣房者的信息response,并且促成他們之間的買賣。
1.屬性
private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);/*** 通道集合*/ private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();/*** Future集合,key為請求編號*/ private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();// invoke id. /*** 請求編號*/ private final long id; /*** 通道*/ private final Channel channel; /*** 請求*/ private final Request request; /*** 超時*/ private final int timeout; /*** 鎖*/ private final Lock lock = new ReentrantLock(); /*** 完成情況,控制多線程的休眠與喚醒*/ private final Condition done = lock.newCondition(); /*** 創建開始時間*/ private final long start = System.currentTimeMillis(); /*** 發送請求時間*/ private volatile long sent; /*** 響應*/ private volatile Response response; /*** 回調*/ private volatile ResponseCallback callback;可以看到,該類的屬性包含了request、response、channel三個實例,在該類中,把請求和響應通過唯一的id一一對應起來。做到異步處理返回結果時能給準確的返回給對應的請求。可以看到屬性中有兩個集合,分別是通道集合和future集合,也就是該類本身也是所有 DefaultFuture 的管理容器。
2.構造函數
public DefaultFuture(Channel channel, Request request, int timeout) {this.channel = channel;this.request = request;// 設置請求編號this.id = request.getId();this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);// put into waiting map.,加入到等待集合中FUTURES.put(id, this);CHANNELS.put(id, channel); }構造函數比較簡單,每一個DefaultFuture實例都跟每一個請求一一對應,被存入到集合中管理起來。
3.closeChannel
public static void closeChannel(Channel channel) {// 遍歷通道集合for (long id : CHANNELS.keySet()) {if (channel.equals(CHANNELS.get(id))) {// 通過請求id獲得futureDefaultFuture future = getFuture(id);if (future != null && !future.isDone()) {// 創建一個關閉通道的響應Response disconnectResponse = new Response(future.getId());disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);disconnectResponse.setErrorMessage("Channel " +channel +" is inactive. Directly return the unFinished request : " +future.getRequest());// 接收該關閉通道并且請求未完成的響應DefaultFuture.received(channel, disconnectResponse);}}} }該方法是關閉不活躍的通道,并且返回請求未完成。也就是關閉指定channel的請求,返回的是請求未完成。
4.received
public static void received(Channel channel, Response response) {try {// future集合中移除該請求的future,(響應id和請求id一一對應的)DefaultFuture future = FUTURES.remove(response.getId());if (future != null) {// 接收響應結果future.doReceived(response);} else {logger.warn("The timeout response finally returned at "+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))+ ", response " + response+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()+ " -> " + channel.getRemoteAddress()));}} finally {// 通道集合移除該請求對應的通道,代表著這一次請求結束CHANNELS.remove(response.getId());} }該方法是接收響應,也就是某個請求得到了響應,那么代表這次請求任務完成,所有需要把future從集合中移除。具體的接收響應結果在doReceived方法中實現。
5.doReceived
private void doReceived(Response res) {// 獲得鎖lock.lock();try {// 設置響應response = res;if (done != null) {// 喚醒等待done.signal();}} finally {// 釋放鎖lock.unlock();}if (callback != null) {// 執行回調invokeCallback(callback);} }可以看到,當接收到響應后,會把等待的線程喚醒,然后執行回調來處理該響應結果。
6.invokeCallback
private void invokeCallback(ResponseCallback c) {ResponseCallback callbackCopy = c;if (callbackCopy == null) {throw new NullPointerException("callback cannot be null.");}c = null;Response res = response;if (res == null) {throw new IllegalStateException("response cannot be null. url:" + channel.getUrl());}// 如果響應成功,返回碼是20if (res.getStatus() == Response.OK) {try {// 使用響應結果執行 完成 后的邏輯callbackCopy.done(res.getResult());} catch (Exception e) {logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);}//超時,回調處理成超時異常} else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {try {TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());// 回調處理異常callbackCopy.caught(te);} catch (Exception e) {logger.error("callback invoke error ,url:" + channel.getUrl(), e);}// 其他情況處理成RemotingException異常} else {try {RuntimeException re = new RuntimeException(res.getErrorMessage());callbackCopy.caught(re);} catch (Exception e) {logger.error("callback invoke error ,url:" + channel.getUrl(), e);}} }該方法是執行回調來處理響應結果。分為了三種情況:
具體的處理都在ResponseCallback接口的實現類里執行,后面我會講到。
7.get
@Override public Object get() throws RemotingException {return get(timeout); }@Override public Object get(int timeout) throws RemotingException {// 超時時間默認為1sif (timeout <= 0) {timeout = Constants.DEFAULT_TIMEOUT;}// 如果請求沒有完成,也就是還沒有響應返回if (!isDone()) {long start = System.currentTimeMillis();// 獲得鎖lock.lock();try {// 輪詢 等待請求是否完成while (!isDone()) {// 線程阻塞等待done.await(timeout, TimeUnit.MILLISECONDS);// 如果請求完成或者超時,則結束if (isDone() || System.currentTimeMillis() - start > timeout) {break;}}} catch (InterruptedException e) {throw new RuntimeException(e);} finally {// 釋放鎖lock.unlock();}// 如果沒有收到響應,則拋出超時的異常if (!isDone()) {throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));}}// 返回響應return returnFromResponse(); }該方法是實現了ResponseFuture定義的方法,是獲得該future對應的請求對應的響應結果,其實future、請求、響應都是一一對應的。其中如果還沒得到響應,則會線程阻塞等待,等到有響應結果或者超時,才返回。返回的邏輯在returnFromResponse中實現。
8.returnFromResponse
private Object returnFromResponse() throws RemotingException {Response res = response;if (res == null) {throw new IllegalStateException("response cannot be null");}// 如果正常返回,則返回響應結果if (res.getStatus() == Response.OK) {return res.getResult();}// 如果超時,則拋出超時異常if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());}// 其他 拋出RemotingException異常throw new RemotingException(channel, res.getErrorMessage()); }這代碼跟invokeCallback方法中差不多,都是把響應分了三種情況。
9.cancel
public void cancel() {// 創建一個取消請求的響應Response errorResult = new Response(id);errorResult.setErrorMessage("request future has been canceled.");response = errorResult;// 從集合中刪除該請求FUTURES.remove(id);CHANNELS.remove(id); }該方法是取消一個請求,可以直接關閉一個請求,也就是值創建一個響應來回應該請求,把response值設置到該請求對于到future中,做到了中斷請求的作用。該方法跟closeChannel的區別是closeChannel中對response的狀態設置了CHANNEL_INACTIVE,而cancel方法是中途被主動取消的,雖然有response值,但是并沒有一個響應狀態。
10.RemotingInvocationTimeoutScan
private static class RemotingInvocationTimeoutScan implements Runnable {@Overridepublic void run() {while (true) {try {for (DefaultFuture future : FUTURES.values()) {// 已經完成,跳過掃描if (future == null || future.isDone()) {continue;}// 超時if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {// create exception response.,創建一個超時的響應Response timeoutResponse = new Response(future.getId());// set timeout status.,設置超時狀態,是服務端側超時還是客戶端側超時timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);// 設置錯誤信息timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));// handle response.,接收創建的超時響應DefaultFuture.received(future.getChannel(), timeoutResponse);}}// 睡眠Thread.sleep(30);} catch (Throwable e) {logger.error("Exception when scan the timeout invocation of remoting.", e);}}} }該方法是掃描調用超時任務的線程,每次都會遍歷future集合,檢測請求是否超時了,如果超時則創建一個超時響應來回應該請求。
static {// 開啟一個后臺掃描調用超時任務Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");th.setDaemon(true);th.start(); }開啟一個后臺線程進行掃描的邏輯寫在了靜態代碼塊里面,只開啟一次。
(八)SimpleFuture
該類實現了ResponseFuture,目前沒有用到,很簡單的實現,我就不多說了。
(九)ExchangeHandler
該接口繼承了ChannelHandler, TelnetHandler接口,是信息交換處理器接口。
public interface ExchangeHandler extends ChannelHandler, TelnetHandler {/*** reply.* 回復請求結果* @param channel* @param request* @return response* @throws RemotingException*/Object reply(ExchangeChannel channel, Object request) throws RemotingException;}該接口只定義了一個回復請求結果的方法,返回的是請求結果。
(十)ExchangeHandlerDispatcher
該類實現了ExchangeHandler接口, 是信息交換處理器調度器類,也就是對應不同的事件,選擇不同的處理器去處理。該類中有三個屬性,分別對應了三種事件:
/*** 回復者調度器*/ private final ReplierDispatcher replierDispatcher;/*** 通道處理器調度器*/ private final ChannelHandlerDispatcher handlerDispatcher;/*** Telnet 命令處理器*/ private final TelnetHandler telnetHandler;如果事件是跟通道處理器有關的,就調用通道處理器來處理,比如:
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Object reply(ExchangeChannel channel, Object request) throws RemotingException {return ((Replier) replierDispatcher).reply(channel, request); }@Override public void connected(Channel channel) {handlerDispatcher.connected(channel); } @Override public String telnet(Channel channel, String message) throws RemotingException {return telnetHandler.telnet(channel, message); }可以看到以上三種事件,回復請求結果需要回復者調度器來處理,連接需要通道處理器調度器來處理,telnet消息需要Telnet命令處理器來處理。
(十一)ExchangeHandlerAdapter
該類繼承了TelnetHandlerAdapter,實現了ExchangeHandler,是信息交換處理器的適配器類。
public abstract class ExchangeHandlerAdapter extends TelnetHandlerAdapter implements ExchangeHandler {@Overridepublic Object reply(ExchangeChannel channel, Object msg) throws RemotingException {// 直接返回nullreturn null;}}該類直接讓ExchangeHandler定義的方法reply返回null,交由它的子類選擇性的去實現具體的回復請求結果。
(十二)ExchangeServer
該接口繼承了Server接口,定義了兩個方法:
public interface ExchangeServer extends Server {/*** get channels.* 獲得通道集合* @return channels*/Collection<ExchangeChannel> getExchangeChannels();/*** get channel.* 根據遠程地址獲得對應的信息通道* @param remoteAddress* @return channel*/ExchangeChannel getExchangeChannel(InetSocketAddress remoteAddress);}該接口比較好理解,并且在Server接口基礎上新定義了兩個方法。直接來看看它的實現類吧。
(十三)HeaderExchangeServer
該類實現了ExchangeServer接口,是基于協議頭的信息交換服務器實現類,HeaderExchangeServer是Server的裝飾器,每個實現方法都會調用server的方法。
1.屬性
protected final Logger logger = LoggerFactory.getLogger(getClass());/*** 線程池*/ private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,new NamedThreadFactory("dubbo-remoting-server-heartbeat",true)); /*** 服務器*/ private final Server server; // heartbeat timer /*** 心跳定時器*/ private ScheduledFuture<?> heartbeatTimer; // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat. /*** 心跳周期*/ private int heartbeat; /*** 心跳超時時間*/ private int heartbeatTimeout; /*** 信息交換服務器是否關閉*/ private AtomicBoolean closed = new AtomicBoolean(false);該類里面的很多實現跟HeaderExchangeClient差不多,包括心跳檢測等邏輯。看得懂上述我講的HeaderExchangeClient的屬性,想必這里的屬性應該也很簡單了。
2.構造函數
public HeaderExchangeServer(Server server) {if (server == null) {throw new IllegalArgumentException("server == null");}this.server = server;//獲得心跳周期配置,如果沒有配置,默認設置為0this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);// 獲得心跳超時配置,默認是心跳周期的三倍this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);// 如果心跳超時時間小于心跳周期的兩倍,則拋出異常if (heartbeatTimeout < heartbeat * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");}// 開始心跳startHeartbeatTimer(); }public Server getServer() {return server; }構造函數就是對屬性的設置,心跳的機制以及默認值都跟HeaderExchangeClient中的一模一樣。
3.isRunning
private boolean isRunning() {Collection<Channel> channels = getChannels();// 遍歷所有連接該服務器的通道for (Channel channel : channels) {/*** If there are any client connections,* our server should be running.*/// 只要有任何一個客戶端連接,則服務器還運行著if (channel.isConnected()) {return true;}}return false; }該方法是檢測服務器是否還運行,只要有一個客戶端連接著,就算服務器運行著。
4.close
@Override public void close() {// 關閉線程池和心跳檢測doClose();// 關閉服務器server.close(); }@Override public void close(final int timeout) {// 開始關閉startClose();if (timeout > 0) {final long max = (long) timeout;final long start = System.currentTimeMillis();if (getUrl().getParameter(Constants.CHANNEL_SEND_READONLYEVENT_KEY, true)) {// 發送 READONLY_EVENT事件給所有連接該服務器的客戶端,表示 Server 不可讀了。sendChannelReadOnlyEvent();}// 當服務器還在運行,并且沒有超時,睡眠,也就是等待timeout左右時間在進行關閉while (HeaderExchangeServer.this.isRunning()&& System.currentTimeMillis() - start < max) {try {Thread.sleep(10);} catch (InterruptedException e) {logger.warn(e.getMessage(), e);}}}// 關閉線程池和心跳檢測doClose();// 延遲關閉server.close(timeout); }兩個close方法,第二個close方法是優雅的關閉,有一定的延時來讓一些響應或者操作做完。關閉分兩個步驟,第一個就是關閉信息交換服務器中的線程池和心跳檢測,然后才是關閉服務器。
5.sendChannelReadOnlyEvent
private void sendChannelReadOnlyEvent() {// 創建一個READONLY_EVENT事件的請求Request request = new Request();request.setEvent(Request.READONLY_EVENT);// 不需要響應request.setTwoWay(false);// 設置版本request.setVersion(Version.getProtocolVersion());Collection<Channel> channels = getChannels();// 遍歷連接的通道,進行通知for (Channel channel : channels) {try {// 通過通道還連接著,則發送通知if (channel.isConnected())channel.send(request, getUrl().getParameter(Constants.CHANNEL_READONLYEVENT_SENT_KEY, true));} catch (RemotingException e) {logger.warn("send cannot write message error.", e);}} }在關閉服務器中有一個操作就是發送事件READONLY_EVENT,告訴客戶端該服務器不可讀了,就是該方法實現的,逐個通知連接的客戶端該事件。
6.doClose
private void doClose() {if (!closed.compareAndSet(false, true)) {return;}// 停止心跳檢測stopHeartbeatTimer();try {// 關閉線程池scheduled.shutdown();} catch (Throwable t) {logger.warn(t.getMessage(), t);} }該方法就是close方法調用到的停止心跳檢測和關閉線程池。
7.getExchangeChannels
@Override public Collection<ExchangeChannel> getExchangeChannels() {Collection<ExchangeChannel> exchangeChannels = new ArrayList<ExchangeChannel>();// 獲得連接該服務器通道集合Collection<Channel> channels = server.getChannels();if (channels != null && !channels.isEmpty()) {// 遍歷通道集合,為每個通道都創建信息交換通道,并且加入信息交換通道集合for (Channel channel : channels) {exchangeChannels.add(HeaderExchangeChannel.getOrAddChannel(channel));}}return exchangeChannels; }該方法是返回連接該服務器信息交換通道集合。邏輯就是先獲得通道集合,在根據通道來創建信息交換通道,然后返回信息通道集合。
8.reset
@Override public void reset(URL url) {// 重置屬性server.reset(url);try {// 重置的邏輯跟構造函數一樣設置if (url.hasParameter(Constants.HEARTBEAT_KEY)|| url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);if (t < h * 2) {throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");}if (h != heartbeat || t != heartbeatTimeout) {heartbeat = h;heartbeatTimeout = t;// 重新開始心跳startHeartbeatTimer();}}} catch (Throwable t) {logger.error(t.getMessage(), t);} }該方法就是重置屬性,重置后,重新開始心跳,設置心跳屬性的機制跟構造函數一樣。
9.startHeartbeatTimer
private void startHeartbeatTimer() {// 先停止現有的心跳檢測stopHeartbeatTimer();if (heartbeat > 0) {// 創建心跳定時器heartbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() {@Overridepublic Collection<Channel> getChannels() {// 返回一個不可修改的連接該服務器的信息交換通道集合return Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);} }該方法是開始心跳,跟HeaderExchangeClient類中的開始心跳方法唯一區別是獲得的通道不一樣,客戶端跟通道是一一對應的,所有只要對一個通道進行心跳檢測,而服務端跟通道是一對多的關系,所有需要對該服務器連接的所有通道進行心跳檢測。
10.stopHeartbeatTimer
private void stopHeartbeatTimer() {if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) {try {// 取消定時器heartbeatTimer.cancel(true);// 取消大量已排隊任務,用于回收空間scheduled.purge();} catch (Throwable e) {if (logger.isWarnEnabled()) {logger.warn(e.getMessage(), e);}}}heartbeatTimer = null; }該方法是停止當前的心跳檢測。
(十四)ExchangeServerDelegate
該類實現了ExchangeServer接口,是信息交換服務器裝飾者,是ExchangeServer的裝飾器。該類就一個屬性ExchangeServer server,所有實現方法都調用了server屬性的方法。目前只有在p2p中被用到,代碼為就不貼了,很簡單。
(十五)Exchanger
@SPI(HeaderExchanger.NAME) public interface Exchanger {/*** bind.* 綁定一個服務器* @param url 服務器url* @param handler 數據交換處理器* @return message server 數據交換服務器*/@Adaptive({Constants.EXCHANGER_KEY})ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException;/*** connect.* 連接一個服務器,也就是創建一個客戶端* @param url 服務器url* @param handler 數據交換處理器* @return message channel 返回數據交換客戶端*/@Adaptive({Constants.EXCHANGER_KEY})ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException;}該接口是數據交換者接口,該接口是一個可擴展接口默認實現的是HeaderExchanger類,并且用到了dubbo SPI的Adaptive機制,優先實現url攜帶的配置。如果不了解dubbo SPI機制的可以看《dubbo源碼解析(二)Dubbo擴展機制SPI》。那么回到該接口定義的方法,定義了綁定和連接兩個方法,分別返回信息交互服務器和客戶端實例。
(十六)HeaderExchanger
public class HeaderExchanger implements Exchanger {public static final String NAME = "header";@Overridepublic ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {// 用傳輸層連接返回的client 創建對應的信息交換客戶端,默認開啟心跳檢測return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);}@Overridepublic ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 用傳輸層綁定返回的server 創建對應的信息交換服務端return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));}}該類繼承了Exchanger接口,是Exchanger接口的默認實現,實現了Exchanger接口定義的兩個方法,分別調用的是Transporters的連接和綁定方法,再利用這這兩個方法返回的客戶端和服務端實例來創建信息交換的客戶端和服務端。
(十七)Replier
我們知道Request對應的是ExchangeHandler接口實現對象來處理,但有些時候我們需要不同數據類型對應不同的處理器,該類就是為了支持這一需求所設計的。
public interface Replier<T> {/*** reply.* 回復請求結果* @param channel* @param request* @return response* @throws RemotingException*/Object reply(ExchangeChannel channel, T request) throws RemotingException;}可以看到該接口跟ExchangeHandler定義的方法也一一,只有請求的類型改為了范型。
(十八)ReplierDispatcher
該類實現了Replier接口,是回復者調度器實現類。
/*** 默認回復者*/ private final Replier<?> defaultReplier;/*** 回復者集合*/ private final Map<Class<?>, Replier<?>> repliers = new ConcurrentHashMap<Class<?>, Replier<?>>();這是該類的兩個屬性,緩存了回復者集合和默認的回復者。
/*** 從回復者集合中找到該類型的回復者,并且返回* @param type* @return*/ private Replier<?> getReplier(Class<?> type) {for (Map.Entry<Class<?>, Replier<?>> entry : repliers.entrySet()) {if (entry.getKey().isAssignableFrom(type)) {return entry.getValue();}}if (defaultReplier != null) {return defaultReplier;}throw new IllegalStateException("Replier not found, Unsupported message object: " + type); }/*** 回復請求* @param channel* @param request* @return* @throws RemotingException*/ @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Object reply(ExchangeChannel channel, Object request) throws RemotingException {return ((Replier) getReplier(request.getClass())).reply(channel, request); }上述是該類中關鍵的兩個方法,reply還是調用實現類的reply。根據請求的數據類型來使用指定的回復者進行回復。
(十九)MultiMessage
該類實現了實現 Iterable 接口,是多消息的封裝,我們直接看它的屬性:
/*** 消息集合*/ private final List messages = new ArrayList();該類要和《dubbo源碼解析(九)遠程通信——Transport層》的(八)MultiMessageHandler聯合著看。
(二十)HeartbeatHandler
該類繼承了AbstractChannelHandlerDelegate類,是心跳處理器。是用來處理心跳事件的,也接收消息上增加了對心跳消息的處理。該類是
@Override public void received(Channel channel, Object message) throws RemotingException {// 設置接收時間的時間戳屬性值setReadTimestamp(channel);// 如果是心跳請求if (isHeartbeatRequest(message)) {Request req = (Request) message;// 如果需要響應if (req.isTwoWay()) {// 創建一個響應Response res = new Response(req.getId(), req.getVersion());// 設置為心跳事件的響應res.setEvent(Response.HEARTBEAT_EVENT);// 發送消息,也就是返回響應channel.send(res);if (logger.isInfoEnabled()) {int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);if (logger.isDebugEnabled()) {logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress()+ ", cause: The channel has no data-transmission exceeds a heartbeat period"+ (heartbeat > 0 ? ": " + heartbeat + "ms" : ""));}}}return;}// 如果是心跳響應,則直接returnif (isHeartbeatResponse(message)) {if (logger.isDebugEnabled()) {logger.debug("Receive heartbeat response in thread " + Thread.currentThread().getName());}return;}handler.received(channel, message); }該方法是就是在handler處理消息上增加了處理心跳消息的功能,做到了功能增強。
(二十一)Exchangers
該類跟Transporters的設計意圖是一樣的,Transporters我在《dubbo源碼解析(八)遠程通信——開篇》的(十)Transporters已經講到了。Exchangers也用到了外觀模式。代碼為就不貼了,可以對照著Transporters來看,很簡單。
(二十二)Request
請求模型類,最重要的肯定是模型的屬性,我們來看看屬性:
/*** 心跳事件*/ public static final String HEARTBEAT_EVENT = null;/*** 只讀事件*/ public static final String READONLY_EVENT = "R";/*** 請求編號自增序列*/ private static final AtomicLong INVOKE_ID = new AtomicLong(0);/*** 請求編號*/ private final long mId;/*** dubbo版本*/ private String mVersion;/*** 是否需要響應*/ private boolean mTwoWay = true;/*** 是否是事件*/ private boolean mEvent = false;/*** 是否是異常的請求*/ private boolean mBroken = false;/*** 請求數據*/ private Object mData;(二十三)Response
響應模型,來看看它的屬性:
/*** 心跳事件*/ public static final String HEARTBEAT_EVENT = null;/*** 只讀事件*/ public static final String READONLY_EVENT = "R";/*** ok.* 成功狀態碼*/ public static final byte OK = 20;/*** clien side timeout.* 客戶端側的超時狀態碼*/ public static final byte CLIENT_TIMEOUT = 30;/*** server side timeout.* 服務端側超時的狀態碼*/ public static final byte SERVER_TIMEOUT = 31;/*** channel inactive, directly return the unfinished requests.* 通道不活躍,返回未完成請求的狀態碼*/ public static final byte CHANNEL_INACTIVE = 35;/*** request format error.* 請求格式錯誤狀態碼*/ public static final byte BAD_REQUEST = 40;/*** response format error.* 響應格式錯誤狀態碼*/ public static final byte BAD_RESPONSE = 50;/*** service not found.* 服務找不到狀態碼*/ public static final byte SERVICE_NOT_FOUND = 60;/*** service error.* 服務錯誤狀態碼*/ public static final byte SERVICE_ERROR = 70;/*** internal server error.* 內部服務器錯誤狀態碼*/ public static final byte SERVER_ERROR = 80;/*** internal server error.* 客戶端錯誤狀態碼*/ public static final byte CLIENT_ERROR = 90;/*** server side threadpool exhausted and quick return.* 服務器端線程池耗盡并快速返回狀態碼*/ public static final byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100;/*** 響應編號*/ private long mId = 0;/*** dubbo 版本*/ private String mVersion;/*** 狀態*/ private byte mStatus = OK;/*** 是否是事件*/ private boolean mEvent = false;/*** 錯誤信息*/ private String mErrorMsg;/*** 返回結果*/ private Object mResult;很多屬性跟Request模型的屬性一樣,并且含義也一樣,不過該模型多了很多的狀態碼。關鍵的是id跟請求一一對應。
(二十四)ResponseCallback
public interface ResponseCallback {/*** done.* 處理請求* @param response*/void done(Object response);/*** caught exception.* 處理異常* @param exception*/void caught(Throwable exception);}該接口是回調的接口,定義了兩個方法,分別是處理正常的響應結果和處理異常。
(二十五)ExchangeCodec
該類繼承了TelnetCodec,是信息交換編解碼器。在本文的開頭,我就寫到,dubbo將一條消息分成了協議頭和協議體,用來解決粘包拆包問題,但是頭跟體在編解碼上有區別,我們先來看看dubbo 的協議頭的配置:
上圖是官方文檔的圖片,能夠清晰的看出協議中各個數據所占的位數:
可以看到一個該協議中前65位是協議頭,后面的都是協議體數據。那么在編解碼中,協議頭是通過 Codec 編解碼,而body部分是用Serialization序列化和反序列化的。下面我們就來看看該類對協議頭的編解碼。
1.屬性
// header length. /*** 協議頭長度:16字節 = 128Bits*/ protected static final int HEADER_LENGTH = 16; // magic header. /*** MAGIC二進制:1101101010111011,十進制:55995*/ protected static final short MAGIC = (short) 0xdabb; /*** Magic High,也就是0-7位:11011010*/ protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0]; /*** Magic Low 8-15位 :10111011*/ protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1]; // message flag. /*** 128 二進制:10000000*/ protected static final byte FLAG_REQUEST = (byte) 0x80; /*** 64 二進制:1000000*/ protected static final byte FLAG_TWOWAY = (byte) 0x40; /*** 32 二進制:100000*/ protected static final byte FLAG_EVENT = (byte) 0x20; /*** 31 二進制:11111*/ protected static final int SERIALIZATION_MASK = 0x1f;可以看到 MAGIC是個固定的值,用來判斷是不是dubbo協議的數據包,并且MAGIC_LOW和MAGIC_HIGH分別是MAGIC的低位和高位。其他的屬性用來干嘛后面會講到。
2.encode
@Override public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {if (msg instanceof Request) {// 如果消息是Request類型,對請求消息編碼encodeRequest(channel, buffer, (Request) msg);} else if (msg instanceof Response) {// 如果消息是Response類型,對響應消息編碼encodeResponse(channel, buffer, (Response) msg);} else {// 直接讓父類( Telnet ) 處理,目前是 Telnet 命令的結果。super.encode(channel, buffer, msg);} }該方法是根據消息的類型來分別進行編碼,分為三種情況:Request類型、Response類型以及其他
3.encodeRequest
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {Serialization serialization = getSerialization(channel);// header.// 創建16字節的字節數組byte[] header = new byte[HEADER_LENGTH];// set magic number.// 設置前16位數據,也就是設置header[0]和header[1]的數據為Magic High和Magic LowBytes.short2bytes(MAGIC, header);// set request and serialization flag.// 16-23位為serialization編號,用到或運算10000000|serialization編號,例如serialization編號為11111,則為00011111header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());// 繼續上面的例子,00011111|1000000 = 01011111if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;// 繼續上面的例子,01011111|100000 = 011 11111 可以看到011代表請求標記、雙向、是事件,這樣就設置了16、17、18位,后面19-23位是Serialization 編號if (req.isEvent()) header[2] |= FLAG_EVENT;// set request id.// 設置32-95位請求idBytes.long2bytes(req.getId(), header, 4);// encode request data.// // 編碼 `Request.data` 到 Body ,并寫入到 Bufferint savedWriteIndex = buffer.writerIndex();buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);// 對body數據序列化ObjectOutput out = serialization.serialize(channel.getUrl(), bos);// 如果該請求是事件if (req.isEvent()) {// 特殊事件編碼encodeEventData(channel, out, req.getData());} else {// 正常請求編碼encodeRequestData(channel, out, req.getData(), req.getVersion());}// 釋放資源out.flushBuffer();if (out instanceof Cleanable) {((Cleanable) out).cleanup();}bos.flush();bos.close();int len = bos.writtenBytes();//檢驗消息長度checkPayload(channel, len);// 設置96-127位:Body值Bytes.int2bytes(len, header, 12);// write// 把header寫入到bufferbuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }該方法是對Request類型的消息進行編碼,仔細閱讀上述我寫的注解,結合協議頭各個位數的含義,好好品味我舉的例子。享受二進制位運算帶來的快樂,也可以看到前半部分邏輯是對協議頭的編碼,后面還有對body值的序列化。
4.encodeResponse
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {Serialization serialization = getSerialization(channel);// header.// 創建16字節的字節數組byte[] header = new byte[HEADER_LENGTH];// set magic number.// 設置前16位數據,也就是設置header[0]和header[1]的數據為Magic High和Magic LowBytes.short2bytes(MAGIC, header);// set request and serialization flag.// 16-23位為serialization編號,用到或運算10000000|serialization編號,例如serialization編號為11111,則為00011111header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());// 繼續上面的例子,00011111|1000000 = 01011111if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;// 繼續上面的例子,01011111|100000 = 011 11111 可以看到011代表請求標記、雙向、是事件,這樣就設置了16、17、18位,后面19-23位是Serialization 編號if (req.isEvent()) header[2] |= FLAG_EVENT;// set request id.// 設置32-95位請求idBytes.long2bytes(req.getId(), header, 4);// encode request data.// // 編碼 `Request.data` 到 Body ,并寫入到 Bufferint savedWriteIndex = buffer.writerIndex();buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);// 對body數據序列化ObjectOutput out = serialization.serialize(channel.getUrl(), bos);// 如果該請求是事件if (req.isEvent()) {// 特殊事件編碼encodeEventData(channel, out, req.getData());} else {// 正常請求編碼encodeRequestData(channel, out, req.getData(), req.getVersion());}// 釋放資源out.flushBuffer();if (out instanceof Cleanable) {((Cleanable) out).cleanup();}bos.flush();bos.close();int len = bos.writtenBytes();//檢驗消息長度checkPayload(channel, len);// 設置96-127位:Body值Bytes.int2bytes(len, header, 12);// write// 把header寫入到bufferbuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); }protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {int savedWriteIndex = buffer.writerIndex();try {Serialization serialization = getSerialization(channel);// header.// 創建16字節大小的字節數組byte[] header = new byte[HEADER_LENGTH];// set magic number.// 設置前0-15位為魔數Bytes.short2bytes(MAGIC, header);// set request and serialization flag.// 設置響應標志和序列化idheader[2] = serialization.getContentTypeId();// 如果是心跳事件,則設置第18位為事件if (res.isHeartbeat()) header[2] |= FLAG_EVENT;// set response status.// 設置24-31位為狀態碼byte status = res.getStatus();header[3] = status;// set request id.// 設置32-95位為請求idBytes.long2bytes(res.getId(), header, 4);// 寫入數據buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);// 對body進行序列化ObjectOutput out = serialization.serialize(channel.getUrl(), bos);// encode response data or error message.if (status == Response.OK) {if (res.isHeartbeat()) {// 對心跳事件編碼encodeHeartbeatData(channel, out, res.getResult());} else {// 對普通響應編碼encodeResponseData(channel, out, res.getResult(), res.getVersion());}} else out.writeUTF(res.getErrorMessage());// 釋放out.flushBuffer();if (out instanceof Cleanable) {((Cleanable) out).cleanup();}bos.flush();bos.close();int len = bos.writtenBytes();checkPayload(channel, len);Bytes.int2bytes(len, header, 12);// writebuffer.writerIndex(savedWriteIndex);buffer.writeBytes(header); // write header.buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);} catch (Throwable t) {// clear bufferbuffer.writerIndex(savedWriteIndex);// send error message to Consumer, otherwise, Consumer will wait till timeout.//如果在寫入數據失敗,則返回響應格式錯誤的返回碼if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {Response r = new Response(res.getId(), res.getVersion());r.setStatus(Response.BAD_RESPONSE);if (t instanceof ExceedPayloadLimitException) {logger.warn(t.getMessage(), t);try {r.setErrorMessage(t.getMessage());// 發送響應channel.send(r);return;} catch (RemotingException e) {logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);}} else {// FIXME log error message in Codec and handle in caught() of IoHanndler?logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);try {r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));channel.send(r);return;} catch (RemotingException e) {logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);}}}// Rethrow exceptionif (t instanceof IOException) {throw (IOException) t;} else if (t instanceof RuntimeException) {throw (RuntimeException) t;} else if (t instanceof Error) {throw (Error) t;} else {throw new RuntimeException(t.getMessage(), t);}} }該方法是對Response類型的消息進行編碼,該方法里面我沒有舉例子演示如何進行編碼,不過過程跟encodeRequest類似。
5.decode
@Override public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {int readable = buffer.readableBytes();// 讀取前16字節的協議頭數據,如果數據不滿16字節,則讀取全部byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];buffer.readBytes(header);// 解碼return decode(channel, buffer, readable, header); }@Override protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {// check magic number.// 核對魔數(該數字固定)if (readable > 0 && header[0] != MAGIC_HIGH|| readable > 1 && header[1] != MAGIC_LOW) {int length = header.length;// 將 buffer 完全復制到 `header` 數組中if (header.length < readable) {header = Bytes.copyOf(header, readable);buffer.readBytes(header, length, readable - length);}for (int i = 1; i < header.length - 1; i++) {if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {buffer.readerIndex(buffer.readerIndex() - header.length + i);header = Bytes.copyOf(header, i);break;}}return super.decode(channel, buffer, readable, header);}// check length.// Header 長度不夠,返回需要更多的輸入,解決拆包現象if (readable < HEADER_LENGTH) {return DecodeResult.NEED_MORE_INPUT;}// get data length.int len = Bytes.bytes2int(header, 12);// 檢查信息頭長度checkPayload(channel, len);int tt = len + HEADER_LENGTH;// 總長度不夠,返回需要更多的輸入,解決拆包現象if (readable < tt) {return DecodeResult.NEED_MORE_INPUT;}// limit input stream.ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);try {// 對body反序列化return decodeBody(channel, is, header);} finally {// 如果不可用if (is.available() > 0) {try {// 打印錯誤日志if (logger.isWarnEnabled()) {logger.warn("Skip input stream " + is.available());}// 跳過未讀完的流StreamUtils.skipUnusedStream(is);} catch (IOException e) {logger.warn(e.getMessage(), e);}}} }該方法就是解碼前的一些核對過程,包括檢測是否為dubbo協議,是否有拆包現象等,具體的解碼在decodeBody方法。
6.decodeBody
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {// 用并運算符byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);// get request id.// 獲得請求idlong id = Bytes.bytes2long(header, 4);// 如果第16位為0,則說明是響應if ((flag & FLAG_REQUEST) == 0) {// decode response.Response res = new Response(id);// 如果第18位不是0,則說明是心跳事件if ((flag & FLAG_EVENT) != 0) {res.setEvent(Response.HEARTBEAT_EVENT);}// get status.byte status = header[3];res.setStatus(status);try {ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);// 如果響應是成功的if (status == Response.OK) {Object data;if (res.isHeartbeat()) {// 如果是心跳事件,則心跳事件的解碼data = decodeHeartbeatData(channel, in);} else if (res.isEvent()) {// 如果是事件,則事件的解碼data = decodeEventData(channel, in);} else {// 否則執行普通解碼data = decodeResponseData(channel, in, getRequestData(id));}// 重新設置響應結果res.setResult(data);} else {res.setErrorMessage(in.readUTF());}} catch (Throwable t) {res.setStatus(Response.CLIENT_ERROR);res.setErrorMessage(StringUtils.toString(t));}return res;} else {// decode request.// 對請求類型解碼Request req = new Request(id);// 設置版本號req.setVersion(Version.getProtocolVersion());// 如果第17位不為0,則是雙向req.setTwoWay((flag & FLAG_TWOWAY) != 0);// 如果18位不為0,則是心跳事件if ((flag & FLAG_EVENT) != 0) {req.setEvent(Request.HEARTBEAT_EVENT);}try {// 反序列化ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);Object data;if (req.isHeartbeat()) {// 如果請求是心跳事件,則心跳事件解碼data = decodeHeartbeatData(channel, in);} else if (req.isEvent()) {// 如果是事件,則事件解碼data = decodeEventData(channel, in);} else {// 否則,用普通解碼data = decodeRequestData(channel, in);}// 把重新設置請求數據req.setData(data);} catch (Throwable t) {// bad request// 設置是異常請求req.setBroken(true);req.setData(t);}return req;} }該方法就是解碼的過程,并且對協議頭和協議體分開解碼,協議頭編碼是做或運算,而解碼則是做并運算,協議體用反序列化的方式解碼,同樣也是分為了Request類型、Response類型進行解碼。
后記
該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...該文章講解了Exchange層的相關設計和邏輯、介紹dubbo-remoting-api中的exchange包內的源碼解,其中關鍵的是設計了Request和Response模型,整個信息交換都圍繞這兩大模型,并且設計了dubbo協議,解決拆包粘包問題,在信息交換中協議頭攜帶的信息起到了關鍵作用,也滿足了rpc調用的一些需求。下一篇我會講解遠程通信的buffer部分。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。
總結
以上是生活随笔為你收集整理的dubbo源码解析(十)远程通信——Exchange层的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 退居二线VS在深圳发展,一个十年IT人的
- 下一篇: arcgis 坐标系 2000坐标系_干