RPC实现Consumer 远程调用
梳理一下基本的實(shí)現(xiàn)思路,主要完成一個(gè)這樣的功能:API 模塊中的接口功能在服務(wù)端實(shí)現(xiàn)(并沒有在客戶端實(shí)現(xiàn))。因此,客戶端調(diào)用API 中定義的某一個(gè)接口方法時(shí),實(shí)際上是要發(fā)起一次網(wǎng)絡(luò)請(qǐng)求去調(diào)用服務(wù)端的某一個(gè)服務(wù)。而這個(gè)網(wǎng)絡(luò)請(qǐng)求首先被注冊(cè)中心接收,由注冊(cè)中心先確定需要調(diào)用的服務(wù)的位置,再將請(qǐng)求轉(zhuǎn)發(fā)至真實(shí)的服務(wù)實(shí)現(xiàn),最終調(diào)用服務(wù)端代碼,將返回值通過網(wǎng)絡(luò)傳輸給客戶端。整個(gè)過程對(duì)于客戶端而言是完全無感知的,就像調(diào)用本地方法一樣。具體調(diào)用過程如下圖所示:
下面來看代碼實(shí)現(xiàn),創(chuàng)建RpcProxy 類:
import java.lang.reflect.Proxy; public class RpcProxy {public static <T> T create(Class<?> clazz){//clazz 傳進(jìn)來本身就是interfaceMethodProxy proxy = new MethodProxy(clazz);Class<?> [] interfaces = clazz.isInterface() ?new Class[]{clazz} :clazz.getInterfaces();T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,proxy);return result;} }在RpcProxy 類的內(nèi)部實(shí)現(xiàn)遠(yuǎn)程方法調(diào)用的代理類,即由Netty 發(fā)送網(wǎng)絡(luò)請(qǐng)求,具體代碼如下:
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class RpcProxy {public static <T> T create(Class<?> clazz){}private static class MethodProxy implements InvocationHandler {private Class<?> clazz;public MethodProxy(Class<?> clazz){this.clazz = clazz;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {//如果傳進(jìn)來是一個(gè)已實(shí)現(xiàn)的具體類(本次演示略過此邏輯)if (Object.class.equals(method.getDeclaringClass())) {try {return method.invoke(this, args);} catch (Throwable t) {t.printStackTrace();}//如果傳進(jìn)來的是一個(gè)接口(核心)} else {return rpcInvoke(proxy,method, args);}return null;}/*** 實(shí)現(xiàn)接口的核心方法* @param method* @param args* @return*/public Object rpcInvoke(Object proxy,Method method,Object[] args){//傳輸協(xié)議封裝InvokerProtocol msg = new InvokerProtocol();msg.setClassName(this.clazz.getName());msg.setMethodName(method.getName());msg.setValues(args);msg.setParames(method.getParameterTypes());final RpcProxyHandler consumerHandler = new RpcProxyHandler();EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//自定義協(xié)議解碼器/** 入?yún)⒂? 個(gè),分別解釋如下maxFrameLength:框架的最大長(zhǎng)度。如果幀的長(zhǎng)度大于此值,則將拋出TooLongFrameException。lengthFieldOffset:長(zhǎng)度字段的偏移量:即對(duì)應(yīng)的長(zhǎng)度字段在整個(gè)消息數(shù)據(jù)中得位置lengthFieldLength:長(zhǎng)度字段的長(zhǎng)度:如:長(zhǎng)度字段是int 型表示,那么這個(gè)值就是4(long 型就是8)lengthAdjustment:要添加到長(zhǎng)度字段值的補(bǔ)償值initialBytesToStrip:從解碼幀中去除的第一個(gè)字節(jié)數(shù)*/pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定義協(xié)議編碼器pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//對(duì)象參數(shù)類型編碼器pipeline.addLast("encoder", new ObjectEncoder());//對(duì)象參數(shù)類型解碼器pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler",consumerHandler);}});ChannelFuture future = b.connect("localhost", 8080).sync();future.channel().writeAndFlush(msg).sync();future.channel().closeFuture().sync();} catch(Exception e){e.printStackTrace();}finally {group.shutdownGracefully();}return consumerHandler.getResponse();}} }接收網(wǎng)絡(luò)調(diào)用的返回值
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class RpcProxyHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("client exception is general");} }完成客戶端調(diào)用代碼:
public class RpcConsumer {public static void main(String [] args){IRpcHelloService rpcHello = RpcProxy.create(IRpcHelloService.class);System.out.println(rpcHello.hello("Tom 老師"));IRpcService service = RpcProxy.create(IRpcService.class);System.out.println("8 + 2 = " + service.add(8, 2));System.out.println("8 - 2 = " + service.sub(8, 2));System.out.println("8 * 2 = " + service.mult(8, 2));System.out.println("8 / 2 = " + service.div(8, 2));} }?
總結(jié)
以上是生活随笔為你收集整理的RPC实现Consumer 远程调用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RPC实现Provider服务端业务逻辑
- 下一篇: Channel 与ChannelPipe