SOFA 源码分析 — 连接管理器
前言
RPC 框架需要維護(hù)客戶端和服務(wù)端的連接,通常是一個(gè)客戶端對應(yīng)多個(gè)服務(wù)端,而客戶端看到的是接口,并不是服務(wù)端的地址,服務(wù)端地址對于客戶端來講是透明的。
那么,如何實(shí)現(xiàn)這樣一個(gè) RPC 框架的網(wǎng)絡(luò)連接呢?
我們從 SOFA 中尋找答案。
連接管理器介紹
先從一個(gè)小 demo 開始看:
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setProtocol("bolt") // 指定協(xié)議.setDirectUrl("bolt://127.0.0.1:9696"); // 指定直連地址HelloService helloService = consumerConfig.refer();while (true) {System.out.println(helloService.sayHello("world"));try {Thread.sleep(2000);} catch (Exception e) {} }上面的代碼中,一個(gè) ConsumerConfig 對應(yīng)一個(gè)接口服務(wù),并指定了直連地址。
然后調(diào)用 ref 方法。每個(gè) ConsumerConfig 綁定了一個(gè) ConsumerBootstrap,這是一個(gè)非單例的類。
而每個(gè) ConsumerBootstrap 又綁定了一個(gè) Cluster,這是真正的客戶端。該類包含了一個(gè)客戶端所有的關(guān)鍵信息,例如:
這 5 個(gè)實(shí)例是 Cluster 的核心。一個(gè)客戶端的正常使用絕對離不開這 5 個(gè)元素。
我們之前分析了 5 個(gè)中的 4 個(gè),今天分析最后一個(gè) —— 連接管理器。
他可以說是 RPC 網(wǎng)絡(luò)通信的核心。
地址管理器代表的是:一個(gè)客戶端可以擁有多個(gè)接口。
連接管理器代表的是:一個(gè)客戶端可以擁有多個(gè) TCP 連接。
很明顯,地址管理器的數(shù)據(jù)肯定比連接管理器要多。因?yàn)橥ǔR粋€(gè) TCP 連接(Server 端)可以含有多個(gè)接口。
那么 SOFA 是如何實(shí)現(xiàn)連接管理器的呢?
從 AbstractCluster 的 init 方法中,我們知道,該方法初始化了 Cluster。同時(shí)也初始化了 connectionHolder。
具體代碼如下:
// 連接管理器 connectionHolder = ConnectionHolderFactory.getConnectionHolder(consumerBootstrap);使用了 SPI 的方式進(jìn)行的初始化。目前 RPC 框架的具體實(shí)現(xiàn)類只有一個(gè) AllConnectConnectionHolder。即長連接管理器。
該類需要一個(gè) ConsumerConfig 才能初始化。
該類中包含很多和連接相關(guān)的屬性,有 4 個(gè) Map,未初始化的 Map,存活的節(jié)點(diǎn)列表,存活但亞健康的列表,失敗待重試的列表。這些 Map 的元素都會隨著服務(wù)的網(wǎng)絡(luò)變化而變化。
而這些 Map 中的元素則是:ConcurrentHashMap<ProviderInfo, ClientTransport> 。
即每個(gè)服務(wù)者的信息對應(yīng)一個(gè)客戶端傳輸。那么這個(gè) ClientTransport 是什么呢?看過之前文章的都知道,這個(gè)一個(gè) RPC 和 Bolt 的膠水類。該類的默認(rèn)實(shí)現(xiàn) BoltClientTransport 包含了一個(gè) RpcClient 屬性,注意,該屬性是個(gè)靜態(tài)的。也就是說,是所有實(shí)例公用的。并且,BoltClientTransport 包含一個(gè) ProviderInfo 屬性。還有一個(gè) Url 屬性,Connection 屬性(網(wǎng)絡(luò)連接)。
我們理一下:一個(gè) ConsumerConfig 綁定一個(gè) Cluster,一個(gè) Cluster 綁定一個(gè) connectionHolder,一個(gè) connectionHolder 綁定多個(gè) ProviderInfo 和 ClientTransport。
因?yàn)橐粋€(gè)客戶端可以和多個(gè)服務(wù)進(jìn)行通信。
代碼如何實(shí)現(xiàn)?
在 Cluster 中,會對 connectionHolder 進(jìn)行初始化,在 Cluster 從注冊中心得到服務(wù)端列表后,會建立長連接。
從這里開始,地址管理器開始運(yùn)作。
Cluster 的 updateAllProviders 方法是源頭。該方法會將服務(wù)列表添加到 connectionHolder 中。即調(diào)用 connectionHolder.updateAllProviders(providerGroups) 方法。該方法會全量更新服務(wù)端列表。
如果更新的時(shí)候,發(fā)現(xiàn)有新的服務(wù),便會建立長連接。具體代碼如下:
if (!needAdd.isEmpty()) {addNode(needAdd); }addNode 方法就是添加新的節(jié)點(diǎn)。該方法會多線程建立 TCP 連接。
首先會根據(jù) ProviderInfo 信息創(chuàng)建一個(gè) ClientTransport,然后向線程池提交一個(gè)任務(wù),任務(wù)內(nèi)容是 initClientTransport(),即初始化客戶端傳輸。
該方法代碼如下(精簡過了):
private void initClientTransport(String interfaceId, ProviderInfo providerInfo, ClientTransport transport) {transport.connect();if (doubleCheck(interfaceId, providerInfo, transport)) {printSuccess(interfaceId, providerInfo, transport);addAlive(providerInfo, transport);} else {printFailure(interfaceId, providerInfo, transport);addRetry(providerInfo, transport);} }其中關(guān)鍵是調(diào)用 transport 的 connect 方法建立連接。
該方法的默認(rèn)實(shí)現(xiàn)在 BoltClientTransport 中,符合我們的預(yù)期。我們知道, BoltClientTransport 有一個(gè) RpcClient 的靜態(tài)實(shí)例。這個(gè)實(shí)例在類加載的時(shí)候,就會在靜態(tài)塊中初始化。初始化內(nèi)容則是初始化他的一些屬性,例如地址解析器,連接管理器,連接監(jiān)控等等。
我們再看 BoltClientTransport 的 connect 方法,該方法主要邏輯是初始化連接。方式則是通過 RpcClient 的 getConnection 方法來獲取,具體代碼如下:
connection = RPC_CLIENT.getConnection(url, url.getConnectTimeout());傳入一個(gè) URL 和超時(shí)時(shí)間。 RpcClient 則是調(diào)用連接管理器的 getAndCreateIfAbsent 方法獲取,同樣傳入 Url,這個(gè)方法的名字很好,根據(jù) URL 獲取連接,如果沒有,就創(chuàng)建一個(gè)。
有必要看看具體代碼:
public Connection getAndCreateIfAbsent(Url url) throws InterruptedException, RemotingException {// get and create a connection pool with initialized connections.ConnectionPool pool = this.getConnectionPoolAndCreateIfAbsent(url.getUniqueKey(),new ConnectionPoolCall(url));if (null != pool) {return pool.get();} else {logger.error("[NOTIFYME] bug detected! pool here must not be null!");return null;} }該方法會繼續(xù)調(diào)用自身的 getConnectionPoolAndCreateIfAbsent 方法,傳入 URL 的唯一標(biāo)識,和一個(gè) ConnectionPoolCall 對象(實(shí)現(xiàn)了 Callable)。
然后阻塞等待返回連接。
我們看看這個(gè) ConnectionPoolCall 的 call 方法實(shí)現(xiàn)。該方法調(diào)用了連接管理器的 doCreate 方法。傳入了 URL 和一個(gè)連接池。然后 call 方法返回連接池。
doCreate 方法中,重點(diǎn)就是 create 方法,傳入了一個(gè) url,返回一個(gè) Connection,并放入連接池。默認(rèn)池中只有一個(gè)長連接。
而 create 方法則是調(diào)用連接工廠的 createConnection 方法。然后調(diào)用 doCreateConnection 方法。該方法內(nèi)部給了我們明確的答案:調(diào)用 Netty 的 Bootstrap 的 connect 方法。
代碼如下:
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout); ChannelFuture future = bootstrap.connect(new InetSocketAddress(targetIP, targetPort));熟悉 Netty 的同學(xué)一眼便看出來了。這是一個(gè)連接服務(wù)端的操作。而這個(gè) BootStrap 的初始化則是在 RpcClient 初始化的時(shí)候進(jìn)行的。注意:BootStrap 是可以共享的。
可以看到, ConnectionPoolCall 的 call 方法就是用來創(chuàng)建 Netty 連接的。回到 getAndCreateIfAbsent 方法里,繼續(xù)看 getConnectionPoolAndCreateIfAbsent 方法的實(shí)現(xiàn)。
該方法內(nèi)部將 Callable 包裝成一個(gè) FutureTask,目的應(yīng)該是為了以后的異步運(yùn)行吧,總之,最后還是同步調(diào)用了 run 方法。然后調(diào)用 get 方法阻塞等待,等待剛剛 call 方法返回的連接池。然后返回。
得到連接池,連接池調(diào)用 get 方法,從池中根據(jù)策略選取一個(gè)連接返回。目前只有一個(gè)隨機(jī)選取的策略。
這個(gè) Connection 連接實(shí)例會保存在 BoltClientTransport 中。
在客戶端進(jìn)行調(diào)用的時(shí)候, RpcClient 會根據(jù) URL 找到對應(yīng)的連接,然后,獲取這個(gè)連接對應(yīng)的 Channel ,向服務(wù)端發(fā)送數(shù)據(jù)。具體代碼如下:
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (!f.isSuccess()) {conn.removeInvokeFuture(request.getId());future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), f.cause()));logger.error("Invoke send failed, id={}", request.getId(), f.cause());}} });以上,就是 SOFA 的連接的原理和設(shè)計(jì)。
總結(jié)
連接管理器是我們分析 SOFA—RPC Cluster 中的最后一個(gè)模塊,他管理著一個(gè)客戶端對應(yīng)的所有服務(wù)網(wǎng)絡(luò)連接。
connectionHolder 內(nèi)部包含多個(gè) Map,Map 中的 key 是 Provider,value 是 ClientTransport,ClientTransport 是 RpcClient 和 SOFA 的膠水類,通常一個(gè) Provider 對應(yīng)一個(gè) ClientTransport。ClientTransport 其實(shí)就是一個(gè)連接的包裝。
ClientTransport 獲取連接的方式則是通過 RpcClient 的 連接管理器獲取的。該連接管理器內(nèi)部包含一個(gè)連接工廠,會根據(jù) URL 創(chuàng)建連接。創(chuàng)建連接的凡是則是通過 Netty 的 BootStrap 來創(chuàng)建。
當(dāng)我們使用 Provider 對應(yīng)的 ClientTransport 中的 RpcClient 發(fā)送數(shù)據(jù)的時(shí)候,則會根據(jù) URL 找到對應(yīng) Connection,并獲取他的 Channel ,向服務(wù)端發(fā)送數(shù)據(jù)。
好了,以上就是 SOFA—RPC 連接管理的分析。
篇幅有限,如有錯(cuò)誤,還請指正。
轉(zhuǎn)載于:https://www.cnblogs.com/stateis0/p/9006080.html
總結(jié)
以上是生活随笔為你收集整理的SOFA 源码分析 — 连接管理器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 《程序员代码面试指南》第八章 数组和矩阵
- 下一篇: cocos2dx3.0五种屏幕适配模式,