教你用 Netty 实现一个简单的 RPC!
眾所周知,dubbo 底層使用了 Netty 作為網絡通訊框架,而 Netty 的高性能我們之前也分析過源碼,對他也算還是比較了解了。
今天我們就自己用 Netty 實現一個簡單的 RPC 框架。
1?需求?
模仿 dubbo,消費者和提供者約定接口和協議,消費者遠程調用提供者,提供者返回一個字符串,消費者打印提供者返回的數據。底層網絡通信使用 Netty 4.1.16。
2?設計?
創建一個接口,定義抽象方法。用于消費者和提供者之間的約定。
創建一個提供者,該類需要監聽消費者的請求,并按照約定返回數據。
創建一個消費者,該類需要透明的調用自己不存在的方法,內部需要使用 Netty 請求提供者返回數據。
3?實現
1. 創建 maven 項目,導入 Netty 4.1.16。
<groupId>cn.thinkinjava</groupId><artifactId>rpc-demo</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.16.Final</version></dependency>2. 項目目錄結構如下:
3. 設計接口
一個簡單的 hello world:
public?interface?HelloService { ?String?hello(String?msg); }4. 提供者相關實現
4.1. 首先實現約定接口,用于返回客戶端數據:
/** * 實現類 */public?class?HelloServiceImpl?implements?HelloService?{ ?public?String hello(String msg) { ? ?return?msg !=?null?? msg +?" -----> I am fine."?:?"I am fine.";} }4.2. 實現 Netty 服務端和自定義 handler
啟動 Netty Server 代碼:
private?static?void?startServer0(String hostName,?int?port)?{ ? ?try?{ServerBootstrap bootstrap =?new?ServerBootstrap();NioEventLoopGroup eventLoopGroup =?new?NioEventLoopGroup();bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class).childHandler(new?ChannelInitializer<SocketChannel>() { ? ? ? ? ? ?@Overrideprotected?void?initChannel(SocketChannel ch)?throws?Exception?{ChannelPipeline p = ch.pipeline();p.addLast(new?StringDecoder());p.addLast(new?StringEncoder());p.addLast(new?HelloServerHandler());}});bootstrap.bind(hostName, port).sync();}?catch?(InterruptedException e) {e.printStackTrace();}}上面的代碼中添加了 String類型的編解碼 handler,添加了一個自定義 handler。
自定義 handler 邏輯如下:
/** * 用于處理請求數據 */public?class?HelloServerHandler?extends?ChannelInboundHandlerAdapter { ?@Overridepublic?void?channelRead(ChannelHandlerContext ctx,?Object?msg) { ? ?// 如何符合約定,則調用本地方法,返回數據if?(msg.toString().startsWith(ClientBootstrap.providerName)) {String?result =?new?HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") +?1));ctx.writeAndFlush(result);}} }這里顯示判斷了是否符合約定(并沒有使用復雜的協議,只是一個字符串判斷),然后創建一個具體實現類,并調用方法寫回客戶端。
還需要一個啟動類:
public?class?ServerBootstrap?{??public?static?void?main(String[] args)?{NettyServer.startServer("localhost",?8088);} }好,關于提供者的代碼就寫完了,主要就是創建一個 netty 服務端,實現一個自定義的 handler,自定義 handler 判斷是否符合之間的約定(算是協議吧),如果符合,就創建一個接口的實現類,并調用他的方法返回字符串。
5. 消費者相關實現
消費者有一個需要注意的地方,就是調用需要透明,也就是說,框架使用者不用關心底層的網絡實現。這里我們可以使用 JDK 的動態代理來實現這個目的。
思路:客戶端調用代理方法,返回一個實現了 HelloService 接口的代理對象,調用代理對象的方法,返回結果。
我們需要在代理中做手腳,當調用代理方法的時候,我們需要初始化 Netty 客戶端,還需要向服務端請求數據,并返回數據。
5.1. 首先創建代理相關的類
public?class?RpcConsumer?{ ?private?static?ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ?private?static?HelloClientHandler client; ?/*** 創建一個代理對象*/public?Object?createProxy(final Class<?> serviceClass, ? ? ?final String providerName)?{ ? ?return?Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), ? ? ? ?new?Class<?>[]{serviceClass}, (proxy, method, args) -> { ? ? ? ? ?if?(client ==?null) {initClient();} ? ? ? ? ?// 設置參數client.setPara(providerName + args[0]); ? ? ? ? ?return?executor.submit(client).get();});} ?/*** 初始化客戶端*/private?static?void?initClient()?{client =?new?HelloClientHandler();EventLoopGroup?group?=?new?NioEventLoopGroup();Bootstrap b =?new?Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,?true).handler(new?ChannelInitializer<SocketChannel>() { ? ? ? ? ?@Overridepublic?void?initChannel(SocketChannel ch) throws Exception?{ChannelPipeline p = ch.pipeline();p.addLast(new?StringDecoder());p.addLast(new?StringEncoder());p.addLast(client);}}); ? ?try?{b.connect("localhost",?8088).sync();}?catch?(InterruptedException e) {e.printStackTrace();}} }該類有 2 個方法,創建代理和初始化客戶端。
初始化客戶端邏輯:創建一個 Netty 的客戶端,并連接提供者,并設置一個自定義 handler,和一些 String 類型的編解碼器。
創建代理邏輯:使用 JDK 的動態代理技術,代理對象中的 invoke 方法實現如下:如果 client 沒有初始化,則初始化 client,這個 client 既是 handler ,也是一個 Callback。將參數設置進 client ,使用線程池調用 client 的 call 方法并阻塞等待數據返回。
看看 HelloClientHandler 的實現:
public?class?HelloClientHandler?extends?ChannelInboundHandlerAdapter?implements?Callable?{ ?private?ChannelHandlerContext context; ?private?String result; ?private?String para; ?@Overridepublic?void?channelActive(ChannelHandlerContext ctx)?{context = ctx;} ?/*** 收到服務端數據,喚醒等待線程*/@Overridepublic?synchronized?void?channelRead(ChannelHandlerContext ctx, Object msg)?{result = msg.toString();notify();} ?/*** 寫出數據,開始等待喚醒*/@Overridepublic?synchronized?Object?call()?throws?InterruptedException?{context.writeAndFlush(para);wait(); ? ?return?result;} ?void?setPara(String para)?{ ? ?this.para = para;} }該類緩存了 ChannelHandlerContext,用于下次使用,有兩個屬性:返回結果和請求參數。
當成功連接后,緩存 ChannelHandlerContext,當調用 call 方法的時候,將請求參數發送到服務端,等待。當服務端收到并返回數據后,調用 channelRead 方法,將返回值賦值個 result,并喚醒等待在 call 方法上的線程。此時,代理對象返回數據。
再看看設計的測試類:
public?class?ClientBootstrap?{ ?public?static?final?String providerName =?"HelloService#hello#"; ?public?static?void?main(String[] args)?throws?InterruptedException?{RpcConsumer consumer =?new?RpcConsumer(); ? ?// 創建一個代理對象HelloService service = (HelloService) consumer.createProxy(HelloService.class, providerName); ? ?for?(; ; ) {Thread.sleep(1000);System.out.println(service.hello("are you ok ?"));}} }測試類首先創建了一個代理對象,然后每隔一秒鐘調用代理的 hello 方法,并打印服務端返回的結果。
測試結果
成功打印。
4?總結
看了這么久的 Netty 源碼,我們終于實現了一個自己的 Netty 應用,雖然這個應用很簡單,甚至代碼寫的有些粗糙,但功能還是實現了,RPC 的目的就是允許像調用本地服務一樣調用遠程服務,需要對使用者透明,于是我們使用了動態代理。并使用 Netty 的 handler 發送數據和響應數據,完成了一次簡單的 RPC 調用。
當然,還是那句話,代碼比較簡單,主要是思路,以及了解 RPC 底層的實現。
?
作者:莫那魯道
https://www.cnblogs.com/stateis0/p/8960791.html
總結
以上是生活随笔為你收集整理的教你用 Netty 实现一个简单的 RPC!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring的核心思想,总结得非常好!
- 下一篇: Spring Boot JdbcTemp