Hadoop RPC机制的使用
一、RPC基礎(chǔ)概念
1.1 RPC的基礎(chǔ)概念
RPC,即Remote Procdure Call,中文名:遠(yuǎn)程過程調(diào)用;
(1)它允許一臺(tái)計(jì)算機(jī)程序遠(yuǎn)程調(diào)用另外一臺(tái)計(jì)算機(jī)的子程序,而不用去關(guān)心底層的網(wǎng)絡(luò)通信細(xì)節(jié),對我們來說是透明的。因此,它經(jīng)常用于分布式網(wǎng)絡(luò)通信中。
RPC協(xié)議假定某些傳輸協(xié)議的存在,如TCP或UDP,為通信程序之間攜帶信息數(shù)據(jù)。在OSI網(wǎng)絡(luò)通信模型中,RPC跨越了傳輸層和應(yīng)用層。RPC使得開發(fā)包括網(wǎng)絡(luò)分布式多程序在內(nèi)的應(yīng)用程序更加容易。
(2)Hadoop的進(jìn)程間交互都是通過RPC來進(jìn)行的,比如Namenode與Datanode直接,Jobtracker與Tasktracker之間等。
因此,可以說:Hadoop的運(yùn)行就是建立在RPC基礎(chǔ)之上的。
1.2 RPC的顯著特點(diǎn)
(1)透明性:遠(yuǎn)程調(diào)用其他機(jī)器上的程序,對用戶來說就像是調(diào)用本地方法一樣;
(2)高性能:RPC Server能夠并發(fā)處理多個(gè)來自Client的請求;
(3)可控性:jdk中已經(jīng)提供了一個(gè)RPC框架—RMI,但是該P(yáng)RC框架過于重量級并且可控之處比較少,所以Hadoop RPC實(shí)現(xiàn)了自定義的PRC框架。
1.3 RPC的基本流程
(1)RPC采用了C/S的模式;
(2)Client端發(fā)送一個(gè)帶有參數(shù)的請求信息到Server;
(3)Server接收到這個(gè)請求以后,根據(jù)發(fā)送過來的參數(shù)調(diào)用相應(yīng)的程序,然后把自己計(jì)算好的結(jié)果發(fā)送給Client端;
(4)Client端接收到結(jié)果后繼續(xù)運(yùn)行;
1.4 Hadoop中的RPC機(jī)制
同其他RPC框架一樣,Hadoop RPC分為四個(gè)部分:
(1)序列化層:Clent與Server端通信傳遞的信息采用了Hadoop里提供的序列化類或自定義的Writable類型;
(2)函數(shù)調(diào)用層:Hadoop RPC通過動(dòng)態(tài)代理以及java反射實(shí)現(xiàn)函數(shù)調(diào)用;
(3)網(wǎng)絡(luò)傳輸層:Hadoop RPC采用了基于TCP/IP的socket機(jī)制;
(4)服務(wù)器端框架層:RPC?Server利用java NIO以及采用了事件驅(qū)動(dòng)的I/O模型,提高RPC Server的并發(fā)處理能力;
Hadoop RPC在整個(gè)Hadoop中應(yīng)用非常廣泛,Client、DataNode、NameNode之間的通訊全靠它了。例如:我們平時(shí)操作HDFS的時(shí)候,使用的是FileSystem類,它的內(nèi)部有個(gè)DFSClient對象,這個(gè)對象負(fù)責(zé)與NameNode打交道。在運(yùn)行時(shí),DFSClient在本地創(chuàng)建一個(gè)NameNode的代理,然后就操作這個(gè)代理,這個(gè)代理就會(huì)通過網(wǎng)絡(luò),遠(yuǎn)程調(diào)用到NameNode的方法,也能返回值。
1.5 Hadoop RPC設(shè)計(jì)技術(shù)
(1)動(dòng)態(tài)代理
About:動(dòng)態(tài)代理可以提供對另一個(gè)對象的訪問,同時(shí)隱藏實(shí)際對象的具體事實(shí),代理對象對客戶隱藏了實(shí)際對象。目前Java開發(fā)包中提供了對動(dòng)態(tài)代理的支持,但現(xiàn)在只支持對接口的實(shí)現(xiàn)。
(2)反射——?jiǎng)討B(tài)加載類
(3)序列化
(4)非阻塞的異步IO(NIO)
Java NIO原理請參考閱讀:http://weixiaolu.iteye.com/blog/1479656
二、如何使用RPC
2.1 Hadoop RPC對外提供的接口
Hadoop RPC對外主要提供了兩種接口(見類org.apache.hadoop.ipc.RPC),分別是:
(1)public static <T> ProtocolProxy <T> getProxy/waitForProxy(…)
構(gòu)造一個(gè)客戶端代理對象(該對象實(shí)現(xiàn)了某個(gè)協(xié)議),用于向服務(wù)器發(fā)送RPC請求。
(2)public static Server RPC.Builder (Configuration).build()
為某個(gè)協(xié)議(實(shí)際上是Java接口)實(shí)例構(gòu)造一個(gè)服務(wù)器對象,用于處理客戶端發(fā)送的請求。
2.2 使用Hadoop RPC的四大步湊
(1)定義RPC協(xié)議
RPC協(xié)議是客戶端和服務(wù)器端之間的通信接口,它定義了服務(wù)器端對外提供的服務(wù)接口。
(2)實(shí)現(xiàn)RPC協(xié)議
Hadoop RPC協(xié)議通常是一個(gè)Java接口,用戶需要實(shí)現(xiàn)該接口。
(3)構(gòu)造和啟動(dòng)RPC SERVER
直接使用靜態(tài)類Builder構(gòu)造一個(gè)RPC Server,并調(diào)用函數(shù)start()啟動(dòng)該Server。
(4)構(gòu)造RPC Client并發(fā)送請求
使用靜態(tài)方法getProxy構(gòu)造客戶端代理對象,直接通過代理對象調(diào)用遠(yuǎn)程端的方法。
三、RPC應(yīng)用實(shí)例
3.1 定義RPC協(xié)議
如下所示,我們定義一個(gè)IProxyProtocol?通信接口,聲明了一個(gè)Add()方法。
public interface IProxyProtocol extends VersionedProtocol {static final long VERSION = 23234L; //版本號,默認(rèn)情況下,不同版本號的RPC Client和Server之間不能相互通信int Add(int number1,int number2); }需要注意的是:
(1)Hadoop中所有自定義RPC接口都需要繼承VersionedProtocol接口,它描述了協(xié)議的版本信息。
(2)默認(rèn)情況下,不同版本號的RPC Client和Server之間不能相互通信,因此客戶端和服務(wù)端通過版本號標(biāo)識。
3.2 實(shí)現(xiàn)RPC協(xié)議
Hadoop RPC協(xié)議通常是一個(gè)Java接口,用戶需要實(shí)現(xiàn)該接口。對IProxyProtocol接口進(jìn)行簡單的實(shí)現(xiàn)如下所示:
public class MyProxy implements IProxyProtocol {public int Add(int number1,int number2) {System.out.println("我被調(diào)用了!");int result = number1+number2;return result;}public long getProtocolVersion(String protocol, long clientVersion)throws IOException {System.out.println("MyProxy.ProtocolVersion=" + IProxyProtocol.VERSION);// 注意:這里返回的版本號與客戶端提供的版本號需保持一致return IProxyProtocol.VERSION;} }這里實(shí)現(xiàn)的Add方法很簡單,就是一個(gè)加法操作。為了查看效果,這里通過控制臺(tái)輸出一句:“我被調(diào)用了!”
3.3 構(gòu)造RPC Server并啟動(dòng)服務(wù)
這里通過RPC的靜態(tài)方法getServer來獲得Server對象,如下代碼所示:
public class MyServer {public static int PORT = 5432;public static String IPAddress = "127.0.0.1";public static void main(String[] args) throws Exception {MyProxy proxy = new MyProxy();final Server server = RPC.getServer(proxy, IPAddress, PORT, new Configuration());server.start();} }這段代碼的核心在于第5行的RPC.getServer方法,該方法有四個(gè)參數(shù),第一個(gè)參數(shù)是被調(diào)用的java對象,第二個(gè)參數(shù)是服務(wù)器的地址,第三個(gè)參數(shù)是服務(wù)器的端口?。獲得服務(wù)器對象后,啟動(dòng)服務(wù)器。這樣,服務(wù)器就在指定端口監(jiān)聽客戶端的請求。到此為止,服務(wù)器就處于監(jiān)聽狀態(tài),不停地等待客戶端請求到達(dá)。
3.4 構(gòu)造RPC Client并發(fā)出請求
這里使用靜態(tài)方法getProxy或waitForProxy構(gòu)造客戶端代理對象,直接通過代理對象調(diào)用遠(yuǎn)程端的方法,具體如下所示:
public class MyClient {public static void main(String[] args) {InetSocketAddress inetSocketAddress = new InetSocketAddress(MyServer.IPAddress, MyServer.PORT);try {// 注意:這里傳入的版本號需要與代理保持一致IProxyProtocol proxy = (IProxyProtocol) RPC.waitForProxy(IProxyProtocol.class, IProxyProtocol.VERSION, inetSocketAddress,new Configuration());int result = proxy.Add(10, 25);System.out.println("10+25=" + result);RPC.stopProxy(proxy);} catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}以上代碼中核心在于RPC.waitForProxy(),該方法有四個(gè)參數(shù),第一個(gè)參數(shù)是被調(diào)用的接口類,第二個(gè)是客戶端版本號,第三個(gè)是服務(wù)端地址。返回的代理對象,就是服務(wù)端對象的代理,內(nèi)部就是使用java.lang.Proxy實(shí)現(xiàn)的。
經(jīng)過以上四步,我們便利用Hadoop RPC搭建了一個(gè)非常高效的客戶機(jī)–服務(wù)器網(wǎng)絡(luò)模型。
3.5 查看運(yùn)行結(jié)果
(1)啟動(dòng)服務(wù)端,開始監(jiān)聽客戶端請求
(2)啟動(dòng)客戶端,開始向服務(wù)端發(fā)請求
(3)查看服務(wù)端狀態(tài),是否被調(diào)用
SUMMARY:從上面的RPC調(diào)用中,可以看出:在客戶端調(diào)用的業(yè)務(wù)類的方法是定義在業(yè)務(wù)類的接口中的。該接口實(shí)現(xiàn)了VersionedProtocal接口。
(4)現(xiàn)在我們在命令行執(zhí)行jps命令,查看輸出信息,會(huì)出現(xiàn)如下圖所示的:
從上圖中可以看到一個(gè)java進(jìn)程,是“MyServer”,該進(jìn)程正是我們剛剛運(yùn)行的RPC的服務(wù)端類MyServer。因此,大家可以聯(lián)想到我們搭建Hadoop環(huán)境時(shí),也執(zhí)行過該命令用來判斷Hadoop的相關(guān)進(jìn)程是否全部啟動(dòng)。
SUMMARY:那么可以判斷,Hadoop啟動(dòng)時(shí)產(chǎn)生的5個(gè)java進(jìn)程也應(yīng)該是RPC的服務(wù)端?! ?/p>
下面我們觀察NameNode的源代碼,如下圖所示,可以看到NameNode確實(shí)創(chuàng)建了RPC的服務(wù)端。
private void initialize(Configuration conf) throws IOException {......// create rpc serverInetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);if (dnSocketAddr != null) {int serviceHandlerCount =conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(), dnSocketAddr.getPort(), serviceHandlerCount,false, conf, namesystem.getDelegationTokenSecretManager());this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();setRpcServiceServerAddress(conf);}this.server = RPC.getServer(this, socAddr.getHostName(),socAddr.getPort(), handlerCount, false, conf, namesystem.getDelegationTokenSecretManager());...... }?
轉(zhuǎn)載于:https://www.cnblogs.com/csguo/p/7568594.html
總結(jié)
以上是生活随笔為你收集整理的Hadoop RPC机制的使用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Vue的数据双向绑定和Object.de
- 下一篇: 升讯威微信营销系统开发实践:(3)功能介