Netty之自定义RPC
生活随笔
收集整理的這篇文章主要介紹了
Netty之自定义RPC
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
需求分析
使用netty實(shí)現(xiàn)方法遠(yuǎn)程調(diào)用, 在client調(diào)用本地接口中方法時(shí), 使用反射進(jìn)行遠(yuǎn)程調(diào)用, server執(zhí)行完結(jié)果后, 將執(zhí)行結(jié)果進(jìn)行封裝, 發(fā)送到client
RPC調(diào)用模型:
1. 服務(wù)消費(fèi)方(client)以本地調(diào)用方式調(diào)用服務(wù) 2. client stub 接收到調(diào)用后負(fù)責(zé)將方法、參數(shù)等封裝成能夠進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw 3. client stub 將消息進(jìn)行編碼并發(fā)送到服務(wù)端 4. server stub 收到消息后進(jìn)行解碼 5. server stub 根據(jù)解碼結(jié)果調(diào)用本地的服務(wù) 6. 本地服務(wù)執(zhí)行并將結(jié)果返回給 server stub 7. server stub 將返回導(dǎo)入結(jié)果進(jìn)行編碼并發(fā)送至消費(fèi)方 8. client stub 接收到消息并進(jìn)行解碼 9. 服務(wù)消費(fèi)方(client)得到結(jié)果自定義RPC結(jié)構(gòu)
根據(jù)上面執(zhí)行流程圖, 編寫代碼
定義request消息結(jié)構(gòu):
Client
client接口:
public interface HelloRpc {String hello(String name); }client的代理類:
public class NettyRpcProxy {/*** 創(chuàng)建代理對(duì)象* @param target* @return*/public static Object create(Class target) {//動(dòng)態(tài)代理, 在代理過程中執(zhí)行遠(yuǎn)程數(shù)據(jù)發(fā)送return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target},(proxy, method, args) -> {//定義要調(diào)用哪一個(gè)方法的信息ClassInfo classInfo = new ClassInfo(target.getName(), method.getName(), method.getParameterTypes(), args);NioEventLoopGroup workGroup = new NioEventLoopGroup();MyClientResultHandler myClientResultHandler = new MyClientResultHandler();Bootstrap bootstrap = new Bootstrap().group(workGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(myClientResultHandler);}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync();//client進(jìn)行發(fā)送channelFuture.channel().writeAndFlush(classInfo).sync();channelFuture.channel().closeFuture().sync();return myClientResultHandler.getResponse();});} }client的handler:
public class MyClientResultHandler extends ChannelInboundHandlerAdapter {private Object response;public Object getResponse() {return response;}//讀取從Server發(fā)送過來的執(zhí)行結(jié)果@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {response = msg;ctx.close();} }Server
server接口(該接口與client的接口是一樣的):
public interface HelloRpc {String hello(String name); }server接口實(shí)現(xiàn)類:
public class HelloRpcImpl implements HelloRpc {@Overridepublic String hello(String name) {return "hello" + name;} }server端:
/*** 網(wǎng)絡(luò)處理服務(wù)器*/ public class NettyRpcServer {private final int port;public NettyRpcServer(int port) {this.port = port;}public void start() {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();//對(duì)象編碼器, 底層使用Java序列化, 效率低下, 通常使用protobufpipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast(new MyInvokeHandler());}});try {ChannelFuture channelFuture = serverBootstrap.bind(port).sync();System.out.println("server is ready");channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {new NettyRpcServer(9999).start();}}server的MyInvokeHandler(解析從client發(fā)送的內(nèi)容):
/*** 封裝client調(diào)用方法后執(zhí)行結(jié)果*/ public class MyInvokeHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//使用反射機(jī)制, 調(diào)用本地方法實(shí)現(xiàn)類, 將方法執(zhí)行結(jié)果封裝, 發(fā)送至clientClassInfo classInfo = (ClassInfo) msg;Object clazz = Class.forName(this.getImplClassName(classInfo)).newInstance();Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getType());Object result = method.invoke(clazz, classInfo.getObjects());ctx.writeAndFlush(result);}//根據(jù)ClassInfo反射獲取對(duì)應(yīng)method執(zhí)行結(jié)果private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {String interfacePath = "com.regotto.test.netty_test_rpc.server.fun";int lastIndexOf = classInfo.getClassName().lastIndexOf(".");String interfaceName = classInfo.getClassName().substring(lastIndexOf);Class superClass = Class.forName(interfacePath + interfaceName);Reflections reflections = new Reflections(interfacePath);//獲得該接口下的所有實(shí)現(xiàn)類Set<Class<?>> implClassSet = reflections.getSubTypesOf(superClass);if (implClassSet.size() == 0) {System.out.println("未找到實(shí)現(xiàn)類, erro");return null;} else if (implClassSet.size() > 1) {System.out.println("實(shí)現(xiàn)類存在多個(gè), 未指明使用哪一個(gè)");return null;}return (implClassSet.toArray(new Class[0]))[0].getName();} }Client測(cè)試類
/*** 測(cè)試*/ public class TestNettyRpc {public static void main(String[] args) {//反射調(diào)用HelloNetty helloNetty = (HelloNetty) NettyRpcProxy.create(HelloNetty.class);System.out.println(helloNetty.hello());} }執(zhí)行結(jié)果:
總結(jié)
RPC機(jī)制:
1.Client使用動(dòng)態(tài)代理機(jī)制, 將本地接口信息編碼封裝發(fā)送至Server
2.Server將接收到的信息解碼, 根據(jù)反射機(jī)制獲得方法執(zhí)行結(jié)果, 然后將執(zhí)行結(jié)果編碼封裝發(fā)送至Client
3.Client解析Server發(fā)送的執(zhí)行結(jié)果
總結(jié)
以上是生活随笔為你收集整理的Netty之自定义RPC的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: centos查看网络速率_CentOS
- 下一篇: 递归神经网络