Spark2.x RPC解析
1、概述
在Spark中很多地方都涉及網(wǎng)絡(luò)通信,比如Spark各個組件間的消息互通、用戶文件與Jar包的上傳、節(jié)點間的Shuffle過程、Block數(shù)據(jù)的復(fù)制與備份等。Spark 2.0 之后,master 和worker 之間完全不使用akka 通信,改用netty實現(xiàn)。因為使用Akka要求message發(fā)送端和接收端有相同的版本,為了避免Akka造成的版本問題,給用戶的應(yīng)用更大靈活性,決定使用更通用的RPC實現(xiàn)。
spark 基于netty新的rpc框架借鑒了Akka的中的設(shè)計,它是基于Actor模型,如下圖所示:
Spark通訊框架中各個組件(Client/Master/Worker)可以認為是一個個獨立的實體,各個實體之間通過消息來進行通信。具體各個組件之間的關(guān)系圖如下:
Endpoint(Client/Master/Worker)有1個InBox和N個OutBox(N>=1,N取決于當前Endpoint與多少其他的Endpoint進行通信,一個與其通訊的其他Endpoint對應(yīng)一個OutBox),Endpoint接收到的消息被寫入InBox,發(fā)送出去的消息寫入OutBox并被發(fā)送到其他Endpoint的InBox中。
2、Spark通信架構(gòu)
(1)RpcEndpoint:RPC端點,Spark針對每個節(jié)點(Client/Master/Worker)都稱之為一個Rpc端點,且都實現(xiàn)RpcEndpoint接口,內(nèi)部根據(jù)不同端點的需求,設(shè)計不同的消息和不同的業(yè)務(wù)處理,如果需要發(fā)送(詢問)則調(diào)用Dispatcher;
(2)RpcEnv:RPC上下文環(huán)境,每個RPC端點運行時依賴的上下文環(huán)境稱為RpcEnv;
(3)Dispatcher:消息分發(fā)器,針對于RPC端點需要發(fā)送消息或者從遠程RPC接收到的消息,分發(fā)至對應(yīng)的指令收件箱/發(fā)件箱。如果指令接收方是自己則存入收件箱,如果指令接收方不是自己,則放入發(fā)件箱;
(4)Inbox:指令消息收件箱,一個本地RpcEndpoint對應(yīng)一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應(yīng)EndpointData加入內(nèi)部ReceiverQueue中,另外Dispatcher創(chuàng)建時會啟動一個單獨線程進行輪詢ReceiverQueue,進行收件箱消息消費;
(5)RpcEndpointRef:RpcEndpointRef是對遠程RpcEndpoint的一個引用。當我們需要向一個具體的RpcEndpoint發(fā)送消息時,一般我們需要獲取到該RpcEndpoint的引用,然后通過該應(yīng)用發(fā)送消息。
(6)OutBox:指令消息發(fā)件箱,對于當前RpcEndpoint來說,一個目標RpcEndpoint對應(yīng)一個發(fā)件箱,如果向多個目標RpcEndpoint發(fā)送信息,則有多個OutBox。當消息放入Outbox后,緊接著通過TransportClient將消息發(fā)送出去。消息放入發(fā)件箱以及發(fā)送過程是在同一個線程中進行;
(7)RpcAddress:表示遠程的RpcEndpointRef的地址,Host + Port。
(8)TransportClient:Netty通信客戶端,一個OutBox對應(yīng)一個TransportClient,TransportClient不斷輪詢OutBox,根據(jù)OutBox消息的receiver信息,請求對應(yīng)的遠程TransportServer;
(9)TransportServer:Netty通信服務(wù)端,一個RpcEndpoint對應(yīng)一個TransportServer,接受遠程消息后調(diào)用Dispatcher分發(fā)消息至對應(yīng)收發(fā)件箱;
3、相關(guān)源碼閱讀
3.1、RpcEnv
RpcEnv是Rpc的環(huán)境(相當于Actor中的ActorSystem),所有的RPCEndPoint都需要注冊給RpcEnv實例對象(注冊的時候會指定注冊的名稱,這樣客戶端就可以通過名稱查詢到RPCEndPoint的RPCEndPointRef引用,進而進行通信(客戶端通過操作RPCEndPointRef要給RpcEndPoint發(fā)信息,怎么發(fā)要RpcEnv去管理,RpcEnv在具體的實例看見發(fā)的信息,因為有Ref肯定有路由,就路由到遠程的具體的RpcEndPoint實體內(nèi)部的receive方法中)),如果不注冊的話收不到消息。所有的RpcEndPoint其實都是屬于RpcEnv的,只有屬于他客戶端發(fā)消息的時候才能把信息路由給RpcEndPoint。
也就是RpcEnv?是一個RPC?環(huán)境。?RpcEndpoint需要使用RpcEnv的名稱來注冊自己,以接收消息。RpcEnv將處理從RpcEndpointRef或遠程節(jié)點發(fā)送的消息,并將它們傳遞到相應(yīng)的RpcEndpoint。
RpcEnv是個抽象類,作為Rpc通信肯定要傳入SparkConf,因為是分布式的,在spark2.x版本中,使用的具體的實現(xiàn)類是NettyRpcEnv。RpcEnv的結(jié)構(gòu)如下:
在RpcEnv的伴生對象中,重要的功能是創(chuàng)建一個RpcEnv的實例,這個實例就是用于管理endpoint。
在?RpcEnv中,有一個重要且常用的方法setupEndpoint方法。該方法就是利用上面創(chuàng)建的RpcEnv實例,來初始化(注冊)一個Endpoint,并返回該endpoint的RpcEndpointRef(代理對象)。當我們調(diào)用RpcEnv中的setupEndpoint來注冊一個endpoint到rpcEnv的時候,在NettyRpcEnv內(nèi)部,會將該endpoint的名稱與其本省的映射關(guān)系,rpcEndpoint與rpcEndpointRef之間映射關(guān)系保存在dispatcher對應(yīng)的成員變量中。我們拿到Endpoint的代理對象后就能向該endpoint發(fā)送消息
還有一個setupEndpointRef 方法來獲取到指定endpoint的引用對象
下面看看master端的代碼,master在啟動的時候在其伴生對象中會有一個rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)方法創(chuàng)建一個RpcEnv的實例,這個實例就是用于管理endpoint。然后masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,?new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))獲得一個具體一個具體的endpoint的實例
3.2、RpcEndpointRef
RpcEndpointRef是RpcEndpoint的一個引用,簡稱代理。RpcEndpointRef是線程安全的。
要想向一個RpcEndpoint發(fā)送消息,必須先持有其Ref代理,通過該代理才能發(fā)送消息。RpcEnv結(jié)構(gòu)如下
常用的發(fā)送消息的方法是send,ask,askWithRetry,他們之間的區(qū)別:send發(fā)送的消息,沒有返回值,用receive接收即可;ask和askWithRetry,發(fā)送的消息有返回值,用receiveAndReply來接收。
下面看看worker端的代碼:worker獲取到master的ref引用對象,然后發(fā)送注冊消息
3.3、在standalone模式中worker向master注冊案例
(1)在worker啟動的時候有onStart方法,這里面調(diào)用了registerWithMaster,這里面用了tryRegisterAllMaster方法在具體注冊的時候向所有的master提交,是用線程池的中一個線程來提交。然后就獲得了masterEndpoint。獲得了masterEndpoint之后,將其作為參數(shù)傳入registerWithMaster方法。然后通過ask發(fā)送消息。
(2)當調(diào)用ask將消息發(fā)送出去。其實是調(diào)用NettyRpcEndpointRef中ask,在方法中當前發(fā)送地址(nettyEnv.address),目標的master地址(this)和發(fā)送的消息message被封裝成了RequestMessage消息。
(3)在NettyRpcEnv.ask中如果是遠程rpc調(diào)用的話,最終ask將調(diào)用postToOutbox函數(shù),并且此時消息會被序列化成Byte流。實現(xiàn)如下:
(4)在postToOutbox函數(shù)中,消息將經(jīng)過OutboxMessage中的sendWith方法(client.sendRpc(content)),最終通過TransportClient的sendRpc方法(client.sendRpc(content)),而在TransportClient中將消息進一步封裝,然后發(fā)送給master。
(5)在master端TransportRequestHandler的handle方法中,由于信息在worker端被分裝成了RpcRequest,所以在該handle方法中,將調(diào)用processRpcRequest進行處理。
(6)processRpcRequest函數(shù)將調(diào)用rpcHandler的實現(xiàn)類NettyRpcHandler中的receive方法。在該方法中,首先通過internalRecieve將消息解包成RequestMessage。然后該消息通過dispatcher的分發(fā)給對應(yīng)的endpoint
(7)在Dispatcher的postMessage方法中,可以看到,首先根據(jù)對應(yīng)的endpoint的EndpointData信息(主要是該endpoint及其應(yīng)用以及其信箱(inbox)),然后將消息塞到給endpoint(此例中的master)的信箱中,最后將消息塞到recievers的阻塞隊列中。
(8)在Dispatcher中有一個線程池threadpool在MessageLoop類的run方法中,將receivers中的對象取出來,交由信箱的process方法去處理。如果沒有收到任何消息,將會阻塞在take處
(9)在inbox的proces方法中,首先取出消息,然后根據(jù)消息的類型,最終將調(diào)用endpoint的receiver方法進行處理(也就是master中的receive方法)。至此,整個一次rpc調(diào)用的流程結(jié)束。
總結(jié):①當調(diào)用ask將消息發(fā)送出去。其實是調(diào)用NettyRpcEndpointRef中的ask等方法,并將消息封裝②NettyRpcEndpointRef中的ask方法調(diào)用了NettyRpcEnv.ask如果是遠程rpc調(diào)用的話,最終ask將調(diào)用postToOutbox函數(shù),并且此時消息會被序列化成Byte流。③在postToOutbox函數(shù)中調(diào)用OutboxMessage中的sendWith方法中調(diào)用TransportClient的sendRpc方法,在TransportClient中將消息進一步封裝,然后發(fā)送給master④在master端TransportRequestHandler的handle方法中進行消息類型判斷,調(diào)用processRpcRequest函數(shù)⑤processRpcRequest函數(shù)將調(diào)用rpcHandler的實現(xiàn)類NettyRpcHandler中的receive方法,然后該消息通過dispatcher的分發(fā)給對應(yīng)的endpoint⑥在Dispatcher的postMessage方法中,可以看到,首先根據(jù)對應(yīng)的endpoint的EndpointData信息放到inbox信箱中,最后將消息塞到recievers的阻塞隊列中⑦在Dispatcher中有一個線程池threadpool在MessageLoop類的run方法中,將receivers中的對象取出來,交由信箱的process方法去處理。如果沒有收到任何消息,將會阻塞在take處⑧在inbox的proces方法中,首先取出消息,然后根據(jù)消息的類型,最終將調(diào)用endpoint的receiver或receiveAndReply方法進行處理(也就是master中的receive方法)。
?
?
?
?
總結(jié)
以上是生活随笔為你收集整理的Spark2.x RPC解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java 流程控制篇 2021/02/2
- 下一篇: 数据的特征工程