HDFS2.x之RPC流程分析
HDFS2.x之RPC流程分析
1 概述
??? Hadoop提供了一個(gè)統(tǒng)一的RPC機(jī)制來處理client-namenode, namenode-dataname,client-dataname之間的通信。RPC是整個(gè)Hadoop中通信框架的核心,目前采用ProtocolBuf作為RPC的默認(rèn)實(shí)現(xiàn)。RPC的整體調(diào)用流程如下:
?
2 Protobuf
??? Protocol buffer(以下簡(jiǎn)稱PB),PB是Google開源的一種輕便高效的結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)格式,可以用于結(jié)構(gòu)化數(shù)據(jù)的序列化和反序列化,很適合做數(shù)據(jù)存儲(chǔ)或 RPC 數(shù)據(jù)交換格式,目前提供了 C++、Java、Python 三種語(yǔ)言的 API。序列化/反序列化速度快,網(wǎng)絡(luò)或者磁盤IO傳輸?shù)臄?shù)據(jù)少。
RPC就是一臺(tái)機(jī)器上的某個(gè)進(jìn)程要調(diào)用另外一臺(tái)機(jī)器上的某個(gè)進(jìn)程的方法,中間通信傳輸?shù)木褪穷愃朴凇胺椒?shù)1、參數(shù)2……”這樣的信息,是結(jié)構(gòu)化的。
我們要定義一種PB實(shí)現(xiàn)的RPC傳輸格式,首先要定義相應(yīng)的.proto文件,在Hadoop common工程里,這些文件放在hadoop-common\src\main\proto目錄下;在Hadoop HDFS工程里這些文件放在hadoop-hdfs\src\main\proto目錄下,以此類推。Hadoop編譯腳本會(huì)調(diào)用相應(yīng)的protoc二進(jìn)制程序來編譯這些以.proto結(jié)尾的文件,生成相應(yīng)的.java文件。
?
由proto文件生成的類,均提供了讀寫二進(jìn)制數(shù)據(jù)的方法:
(1)byte[] toByteArray():序列化message并且返回一個(gè)原始字節(jié)類型的字節(jié)數(shù)組;
(2)static Person parseFrom(byte[] data): 將給定的字節(jié)數(shù)組解析為message;
(3)void writeTo(OutputStream output): 將序列化后的message寫入到輸出流;
(4)static Person parseFrom(InputStream input): 讀入并且將輸入流解析為一個(gè)message;
??? 另外,PB類中都有一些Builder子類,利用其中的build方法,可以完成對(duì)象的創(chuàng)建。PB的具體應(yīng)用會(huì)在下面的RPC的Client和Server的分析中說明。
3 RPC Client端
以create方法為例,來說明RPC的具體執(zhí)行流程。首先看下在Client端的執(zhí)行過程。
?
??? 由HDFS客戶端發(fā)起的create操作,在經(jīng)過一系列的前置步驟之后,會(huì)通過DFSClient類中的namenode代理來完成,其定義如下:
final ClientProtocol namenode; ……NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class);this.dtService = proxyInfo.getDelegationTokenService();this.namenode = proxyInfo.getProxy();?
這說明此處的namenode實(shí)現(xiàn)的接口是ClientProtocol,也就是Client與NameNode之間RPC通信的協(xié)議。
HDFS2.x引入了NameNode的HA,這就使得Client端的底層代理是有多個(gè)的,分別連接Active NN和Standby NN。但是在實(shí)際運(yùn)行過程中需要對(duì)Client調(diào)用呈現(xiàn)統(tǒng)一的接口,那么就出現(xiàn)了一個(gè)上層代理來統(tǒng)一上述這兩個(gè)底層代理。所有由Clientfa來的方法調(diào)用都是先到達(dá)上層代理,通過上層代理轉(zhuǎn)發(fā)到下層代理。并且,上層代理還會(huì)根據(jù)底層代理返回的Exception來決定是否進(jìn)行Failover或者Retry等操作。
在使用HA模式時(shí),客戶端創(chuàng)建代理的總體流程是:
?
其中,
(1)RetryProxy.create方法會(huì)創(chuàng)建上層代理,用于接收客戶端的請(qǐng)求,并根據(jù)情況調(diào)用連接到當(dāng)前兩個(gè)NameNode的底層代理。
Proxy.newProxyInstance(proxyProvider.getInterface().getClassLoader(),new Class<?>[] { iface },new RetryInvocationHandler(proxyProvider, retryPolicy) );?
生成的這個(gè)代理對(duì)象實(shí)現(xiàn)了ClientProtocol接口,Client可以通過這個(gè)代理對(duì)象調(diào)用ClientProtocol接口中相應(yīng)的方法。根據(jù)Java的動(dòng)態(tài)代理機(jī)制,用戶對(duì)這個(gè)代理對(duì)象的方法調(diào)用都轉(zhuǎn)換為對(duì)RetryInvocationHandler(proxyProvider, retryPolicy)對(duì)象中invoke()方法的調(diào)用了。RetryInvocationHandler是與FailoverProxyProvider密切相關(guān)的,因?yàn)樗枰狥ailoverProxyProvider提供底層代理的支持。
(2)當(dāng)代理對(duì)象接收到請(qǐng)求的時(shí)候,會(huì)調(diào)用invoke方法來進(jìn)行處理,這里的invoke方法是上層代理中的RetryInvocationHanlder.invoke方法。
首先要獲取一個(gè)RetryPolicy,默認(rèn)的策略是在構(gòu)造RetryInvocationHandler時(shí)的參數(shù)。在Client與NameNode之間的ClientProtocol的RetryPolicy是:
RetryPolicies.failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL, config.maxFailoverAttempts, config.failoverSleepBaseMillis,
????????????? config.failoverSleepMaxMillis)
??? 接著,會(huì)調(diào)用invokeMethod方法調(diào)用底層的代理進(jìn)行實(shí)際的處理:
Object ret = invokeMethod(method, args);
? ? ?????????? -> method.invoke(currentProxy, args);
currentProxy是現(xiàn)在正在使用的底層代理。當(dāng)NN發(fā)生主從切換的時(shí)候,這個(gè)currentProxy也會(huì)發(fā)生相應(yīng)的變化。
??? 如果在調(diào)用過程中出現(xiàn)了異常,則針對(duì)不同的異常會(huì)做出不同的處理,這里的判斷是根據(jù)生成動(dòng)態(tài)代理(上層代理)的時(shí)候給定的RetryPolicy策略,默認(rèn)的RetryPolicy是FailoverOnNetworkExceptionRetry,所以調(diào)用對(duì)應(yīng)的shouldRetry()函數(shù)。
(2.1)如果Retry的次數(shù)已經(jīng)超過最大嘗試的次數(shù)了,那么就返回一個(gè)
RetryAction.RetryDecision.FAIL的RetryAction。
(2.2) 如果拋出的異常是ConnectionException、NoRouteToHostException、UnKnownHostException、StandbyException、RemoteException中的一個(gè),說明底層代理在RPC過程中Active NN連不上或者宕機(jī)或者已經(jīng)發(fā)生主從切換了,那么就需要返回一個(gè)RetryAction.RetryDecision.FAILOVER_AND_RETRY的RetryAction,需要執(zhí)行performFailover()操作,然后用另外一個(gè)NN的底層代理重試。
(2.3)如果拋出的異常是SocketException、 IOException或者其他非RemoteException的異常,那么就無(wú)法判斷這個(gè)RPC命令到底是不是執(zhí)行成功了。可能是本地的Socket或者IO出問題,也可能是NN端的Socket或者IO問題。那就進(jìn)行進(jìn)一步的判斷:如果被調(diào)用的方法是idempotent的,也就是多次執(zhí)行是沒有副作用的,那么就連接另外的一個(gè)底層代理重試;否則直接返回RetryAction.RetryDecision.FAIL。
(3)FailoverProxyProvider類的當(dāng)前實(shí)現(xiàn)類為ConfiguredFailoverProxyProvider。它負(fù)責(zé)管理那兩個(gè)activeNN和standbyNN的代理,當(dāng)上層代理接收到來自用戶的一個(gè)RPC命令之后,轉(zhuǎn)發(fā)給當(dāng)前正在使用的底層代理(由ConfiguredFailoverProxyProvider.currentProxyIndex決定,表示當(dāng)前的代理對(duì)象的序號(hào))執(zhí)行,然后看是否拋出異常。如果拋出了異常,根據(jù)異常的種類來判斷是執(zhí)行failover,還是retry,或者兩者都不做。如果需要切換NameNode代理的話,則會(huì)執(zhí)行:
currentProxyIndex = (currentProxyIndex + 1) % proxies.size();
??? 底層代理的實(shí)現(xiàn)是用的非HA模式:
current.namenode = NameNodeProxies.createNonHAProxy(conf,
??????????? current.address, xface, ugi, false).getProxy();
進(jìn)一步調(diào)用->NameNodeProxies.createNNProxyWithClientProtocol
??????????? ->RPC.getProtocolProxy
方法,并把生成的ClientNamenodeProtocolPB類型的代理對(duì)象proxy封裝成ClientNamenodeProtocolTranslatorPB類型。
??? 這里又會(huì)涉及到Java的動(dòng)態(tài)代理,是在RPC.getProtocolProxy方法生成proxy對(duì)象的時(shí)候,RPC.getProtocolProxy的實(shí)現(xiàn)代碼為:
return getProtocolEngine(protocol,conf).getProxy(protocol, clientVersion, addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy);
這里的引擎就是protocolbuf,所以,所有的RPC請(qǐng)求最終都會(huì)調(diào)用ProtobufRpcEngine類中的invoke方法進(jìn)行和RPC的Server端通信以及數(shù)據(jù)的序列化和反序列化操作。
??? 把Client的請(qǐng)求封裝成call的操作返回也是在invoke中進(jìn)行的:
val = (RpcResponseWritable) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
??????????? new RpcRequestWritable(rpcRequest), remoteId);
??? 封裝的具體實(shí)現(xiàn)是調(diào)用的Client類中的call方法:
//封裝成call
Call call = new Call(rpcKind, rpcRequest);
//建立和NameNode的連接
Connection connection = getConnection(remoteId, call);
//向NameNode發(fā)送數(shù)據(jù)
connection.sendParam(call);
RPC客戶端的執(zhí)行流程(HA模式)為:
?
4 RPC Server端
RPC的Server端的初始化方法是NameNode中被調(diào)用的:
rpcServer = createRpcServer(conf);
實(shí)際上初始化NameNodeRpcServer對(duì)象,調(diào)用其構(gòu)造函數(shù):
return new NameNodeRpcServer(conf, this);
??? 在構(gòu)造方法中,會(huì)初始化兩個(gè)RPCServer,一個(gè)是serviceRpcServer,用來處理數(shù)據(jù)節(jié)點(diǎn)發(fā)來的RPC請(qǐng)求,另一個(gè)是clientRpcServer,用于處理客戶端發(fā)來的RPC請(qǐng)求。
??? NameNodeRpcServer的構(gòu)造方法會(huì)初始化RPC的Server端所需要的handler的數(shù)目(默認(rèn)為10個(gè)),設(shè)置好處理引擎為Protocolbuf,初始化ClientNamenodeProtocolServerSideTranslatorPB類型的對(duì)象clientProtocolServerTranslator用來對(duì)傳來的數(shù)據(jù)進(jìn)行反序列化,對(duì)發(fā)送的數(shù)據(jù)進(jìn)行序列化。
??? 另外,會(huì)初始化提供不同RPC服務(wù)的對(duì)象BlockingService,針對(duì)客戶端、數(shù)據(jù)節(jié)點(diǎn)端的有:
BlockingService clientNNPbService = ClientNamenodeProtocol.newReflectiveBlockingService(clientProtocolServerTranslator);
BlockingService dnProtoPbService = DatanodeProtocolService.newReflectiveBlockingService(dnProtoPbTranslator);
??? 緊接著,會(huì)獲取RPC的Server對(duì)象:
this.clientRpcServer = RPC.getServer(org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class,clientNNPbService, socAddr.getHostName(),socAddr.getPort(), handlerCount,false,
conf,
namesystem.getDelegationTokenSecretManager());
?
此對(duì)象主要負(fù)責(zé)接收網(wǎng)絡(luò)連接,讀取數(shù)據(jù),調(diào)用處理數(shù)據(jù)函數(shù),返回結(jié)果。前兩個(gè)參數(shù)表示如果RPC發(fā)送過來的是ClientNamenodeProtocolPB協(xié)議,那么負(fù)責(zé)處理這個(gè)協(xié)議的服務(wù)(com.google.protobuf.BlockingService類型的對(duì)象)就是clientNNPbService。
這個(gè)Server對(duì)象里有Listener, Handler, Responder內(nèi)部類:
(1) Listener Thread:Server端會(huì)啟一個(gè)Listener線程主要用于監(jiān)聽Client發(fā)送過來的Request,Listene會(huì)啟動(dòng)一個(gè)Reader的線程組,并把客戶端發(fā)來的Connection對(duì)象通過NIO的SelectionKey傳遞給Reader, Listener相當(dāng)于只作了一層轉(zhuǎn)發(fā);
(2) Reader Thread Pool:主要用于讀取Listener傳過來的Connection,并調(diào)用Connection的readAndProcess方法來讀取Request,并封裝成一個(gè)Call放到Call Queue中;
(3) Hanlder Thread Pool:Server會(huì)啟動(dòng)一組線程組來處理Call Queue中Call,并把處理的Respone中放到response queue中;
(4) Responder Thread:主要處理response queue中的response,并把response發(fā)送給client,如果當(dāng)前response queue為空,則第一個(gè)新增的response會(huì)馬上發(fā)送給client端,不會(huì)通過responer thread來發(fā)送。
這個(gè)RPC.getServer()會(huì)經(jīng)過層層調(diào)用,因?yàn)楝F(xiàn)在默認(rèn)的RPCEngine是ProtobufRpcEngine(ProtobufRpcEngine.java),就會(huì)調(diào)用到ProtobufRpcEngine.getServer這個(gè)函數(shù),在這生成了一個(gè)Server對(duì)象,就是用于接收client端RPC請(qǐng)求,處理,回復(fù)的Server。這個(gè)Server對(duì)象是一個(gè)純粹的網(wǎng)絡(luò)服務(wù)的Server,在RPC中起到基礎(chǔ)網(wǎng)絡(luò)IO服務(wù)的作用。
RPC的Server端創(chuàng)建的總體流程是:
?
4.1 Reader處理
Server里的Reader線程也是基于Selector的異步IO模式,每次Select選出一個(gè)SelectionKey之后,會(huì)調(diào)用SelectionKey.attachment()把這個(gè)SelectionKey所attach的Connection對(duì)象獲取(在Listener的run方法中進(jìn)行的attatch),然后執(zhí)行對(duì)應(yīng)的readAndProcess()方法,把這個(gè)SelectionKey所對(duì)應(yīng)的管道上的網(wǎng)絡(luò)IO數(shù)據(jù)讀入緩沖區(qū)。readAndProcess()方法會(huì)層層調(diào)用到Server.processData()方法,在這個(gè)方法內(nèi)部,會(huì)把剛才從網(wǎng)絡(luò)IO中讀取的數(shù)據(jù)反序列化成對(duì)象rpcRequest對(duì)象。
rpcRequest對(duì)象的類型是繼承自Writable類型的子類的對(duì)象,也就是說可以序列化/反序列化的類。這里rpcRequest對(duì)象里包含的RPC請(qǐng)求的內(nèi)容對(duì)象是由.proto文件中Message生成的類,也就是說PB框架自動(dòng)編譯出來的類,后面可以通過調(diào)用這個(gè)類的get方法獲取RPC中真正傳輸?shù)臄?shù)據(jù)。之后把生成的rpcRequest對(duì)象放到一個(gè)Call對(duì)象里面,再把Call對(duì)象放到隊(duì)列Server.callQueue里面。
Reader的處理流程圖如下:
?
4.2 Handler處理
Handler線程默認(rèn)有10個(gè),所以處理邏輯是多線程的。每個(gè)Handler線程會(huì)從剛才提到的callQueue中取一個(gè)Call對(duì)象,然后調(diào)用Server.call()方法執(zhí)行這個(gè)Call對(duì)象中蘊(yùn)含的RPC請(qǐng)求。Server.call()->RPC.Server.call()->Server.getRpcInvoker()->ProtobufRpcInvoker.call()在最后這個(gè)call()函數(shù)里面真正執(zhí)行。
call方法會(huì)首先校驗(yàn)這個(gè)請(qǐng)求發(fā)過來的數(shù)據(jù)是不是合理的。然后就是獲取實(shí)現(xiàn)這個(gè)協(xié)議的服務(wù)。實(shí)現(xiàn)協(xié)議的服務(wù)在初始化的時(shí)候已經(jīng)注冊(cè)過了,就是前面說的那個(gè)com.google.protobuf.BlockingService類型的對(duì)象clientNNPbService。
這個(gè)就是實(shí)現(xiàn)Client和NameNode之間的ClientNamenodeProtocol協(xié)議的服務(wù),通過調(diào)用這句代碼:
result = service.callBlockingMethod(methodDescriptor, null, param);
就會(huì)執(zhí)行這個(gè)RPC請(qǐng)求的邏輯。service對(duì)象會(huì)把相應(yīng)的方法調(diào)用轉(zhuǎn)移到一個(gè)繼承自BlockingInterface接口的實(shí)現(xiàn)類上。Service的真正實(shí)現(xiàn)類就是clientProtocolServerTranslator,是newReflectiveBlockingService()這個(gè)函數(shù)的參數(shù)。并且此類是ClientNamenodeProtocolProtos中的子類,是在HDFS編譯的時(shí)候根據(jù)proto文件創(chuàng)建的。由于clientProtocolServerTranslator的構(gòu)造方法中傳遞的參數(shù)是NameNodeRpcServer,因此進(jìn)一步的方法調(diào)用都在NameNodeRpcServer中實(shí)現(xiàn)的。
??? Handler處理流程如下:
?
如果元數(shù)據(jù)操作邏輯NameNodeRpcServer里面拋出IOException,那么它都會(huì)把它封裝成ServiceException,然后一路傳遞給client端。在client端,會(huì)通過ProtobufHelper.getRemoteException()把封裝在ServiceException中的IOException獲取出來。
??? RPC的Server端總體處理流程如下:
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/Scott007/p/3273352.html
總結(jié)
以上是生活随笔為你收集整理的HDFS2.x之RPC流程分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 通过 ANE(Adobe Native
- 下一篇: NSWindow上添加NSView