手撕 RPC 1
rpc要素
rpc 最 low 的描述:調用遠程服務像調用本地方法一樣,也就是面向 interface 開發,最基本的,要做到像本地方法一樣調用,意味著 consumer 要知道 provider 有什么服務( interface 名字)是什么,方法( 接口里的方法)是什么,參數(方法的傳參)是什么,返回(方法的返回類型)什么。要解決這些問題,需要的知識點有:
- provider 和 consumer 的通信
- provider(至少一個)和 consumer(一般是多個)之間連接的數量,管理(多個 consumer 不能相互打擾),
- 拆包:連接建立后,怎么拆成正確的對象以獲取正確的信息
- 動態代理:服務要用代理通過數據包分發
- 協議封裝:不同的服務可以自己封裝不同的協議
- 連接池
- 其他 ....
摘要
這是第一版,所有代碼寫在一個類里。單機模擬客戶端和服務器,只有一個 consumer 和一個 provider。線程池管理只有一個線程,沒有服務分發,注冊和發現,通信用netty,代理用jdk的動態代理,序列化用jdk的Serializable,用header模擬協議,拆包寫死。
本來想用門閂模擬線程阻塞,進而控制客戶端的連接,但是 netty 的事件是異步的,無論在獲取客戶端之前還是之后聲明門閂,程序總能正確解開門閂,模擬失敗,不知道原因,所以啊,在寫多線程的程序時,線程的調度要千萬千萬小心啊,程序很可能不按自己預想的運行.
package rpc;import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Test;import java.io.*; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.Random; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch;/*** 1.假設一個需求,寫一個 rpc* 2.來回通信,連接數量,拆包* 3.動態代理,序列化,協議封裝* 4.連接池*/ public class MyRpcTest {public void startServer() {NioEventLoopGroup eventExecutors = new NioEventLoopGroup(1);NioEventLoopGroup worker = eventExecutors;ServerBootstrap serverBootstrap = new ServerBootstrap();ChannelFuture localhost = serverBootstrap.group(eventExecutors, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ServerRequestHandler());}}).bind(new InetSocketAddress("localhost", 9090));try {localhost.sync().channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}@Testpublic void get() {// 開啟一個線程模擬服務器new Thread(() -> {startServer();}).start();// 線程等待一下,讓服務端啟動起來先try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("server start ....");// 獲取服務Car car = ProxyGet(Car.class);// 使用服務car.race("BMW 530Li");}private static <T> T ProxyGet(Class<T> interfaceInfo) {// 實現各個版本的動態代理ClassLoader classLoader = interfaceInfo.getClassLoader();Class<?>[] interfaces = {interfaceInfo};// 用 jdk 的動態代理實現return (T) Proxy.newProxyInstance(classLoader, interfaces, new InvocationHandler() {@Overridepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 1.調用,服務、方法、參數,封裝成 contentString name = interfaceInfo.getName(); // 服務名String methodName = method.getName(); // 方法名Class<?>[] parameterTypes = method.getParameterTypes(); // 方法的返回類型// 2.把調用服務的信息封裝成一個可以序列化的對象// 先封裝 bodyMyContent content = new MyContent();content.setName(name);content.setMethodName(methodName);content.setParameterTypes(parameterTypes);content.setArgs(args);// 把 content 做成字節數組準備寫出去ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(content);byte[] msgBody = bos.toByteArray();// 再封裝 header,header 需要 body 信息,MyHeader myHeader = createHeader(msgBody);bos.reset();oos = new ObjectOutputStream(bos);oos.writeObject(myHeader);byte[] msgHeader = bos.toByteArray();// 3.服務準備好了,接下來準備連接,模擬一個 size 是1的連接池ClientFactory factory = ClientFactory.getFactory();// 想模擬主線程不指令沖排序執行,但是 netty 的時間是異步的,試了好多次,無論門閂在獲取客戶端之前創建還是之后創建,不知道為什么每次都能正確解開門閂,模擬不成功synchronized (this) {NioSocketChannel client = factory.getClient(new InetSocketAddress("localhost", 9090));// 加個門閂,阻塞住,一個 client 一個 client 的處理,在處理返回的后再打開門閂,使程序繼續運行,門閂和創建CountDownLatch countDownLatch = new CountDownLatch(1);// 4.發送,走 IO,走 nettylong requestId = myHeader.getRequestId();ResponseHandler.addCallBack(requestId, () -> {countDownLatch.countDown();});System.out.println("shuan zhu ....");ByteBuf byteBuf = PooledByteBufAllocator.DEFAULT.directBuffer(msgHeader.length + msgBody.length);byteBuf.writeBytes(msgHeader);byteBuf.writeBytes(msgBody);ChannelFuture channelFuture = client.writeAndFlush(byteBuf);channelFuture.sync();System.out.println("before await ....");countDownLatch.await();System.out.println("after await ....");}return null;}});}static MyHeader createHeader(byte[] msgBytes) {MyHeader header = new MyHeader();int size = msgBytes.length;// 用16進制的,32 位可以做很多事情int f = 0x14141414;long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());header.setFlag(f);header.setRequestId(requestId);header.setDataLen(size);return header;} }/*** 模擬服務*/ interface Car {void race(String msg); }/*** 頭部定義三個標志* 1.方法的標記,用32位的位標記* 2.請求的 id* 3.請求體的長度*/ class MyHeader implements Serializable {int flag;long requestId;long dataLen;public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}public long getRequestId() {return requestId;}public void setRequestId(long requestId) {this.requestId = requestId;}public long getDataLen() {return dataLen;}public void setDataLen(long dataLen) {this.dataLen = dataLen;} }/*** 模擬請求體*/ class MyContent implements Serializable {// 服務名String name;// 方法名String methodName;// 返回值類型Class<?>[] parameterTypes;// 參數Object[] args;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMethodName() {return methodName;}public void setMethodName(String methodName) {this.methodName = methodName;}public Class<?>[] getParameterTypes() {return parameterTypes;}public void setParameterTypes(Class<?>[] parameterTypes) {this.parameterTypes = parameterTypes;}public Object[] getArgs() {return args;}public void setArgs(Object[] args) {this.args = args;} }/*** 模擬客戶端的創建,用單例*/ class ClientFactory {int pollSize = 1;NioEventLoopGroup clientWorker;Random random = new Random();private static final ClientFactory factory;private ClientFactory() {}static {factory = new ClientFactory();}public static ClientFactory getFactory() {return factory;}ConcurrentHashMap<InetSocketAddress, ClientPool> outboxes = new ConcurrentHashMap<InetSocketAddress, ClientPool>();public synchronized NioSocketChannel getClient(InetSocketAddress address) {ClientPool clientPool = outboxes.get(address);if (clientPool == null) {outboxes.putIfAbsent(address, new ClientPool(pollSize));clientPool = outboxes.get(address);}int i = random.nextInt(pollSize);// 如果有就返回if (clientPool.clients[i] != null && clientPool.clients[i].isActive()) {return clientPool.clients[i];}// 沒有就創建synchronized (clientPool.locks[i]) {return clientPool.clients[i] = create(address);}}private NioSocketChannel create(InetSocketAddress address) {// 基于 netty 的客戶端創建方式clientWorker = new NioEventLoopGroup(1);Bootstrap bs = new Bootstrap();ChannelFuture connect = bs.group(clientWorker).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {nioSocketChannel.pipeline().addLast(new ClientResponses()); // 根據 requestId 找到給誰}}).connect(address);try {NioSocketChannel client = (NioSocketChannel) connect.sync().channel();return client;} catch (InterruptedException e) {e.printStackTrace();}return null;} }/*** 模擬線連接池*/ class ClientPool {NioSocketChannel[] clients;Object[] locks;ClientPool(int size) {clients = new NioSocketChannel[size];locks = new Object[size];for (int i = 0; i < size; i++) {locks[i] = new Object();}} }/*** 客戶端注冊,連接成功后放開門閂,讓主線程繼續運行*/ class ClientResponses extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;if (byteBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];byteBuf.readBytes(bytes); // 指針偏移到 msgHeader 的末尾ByteArrayInputStream bos = new ByteArrayInputStream(bytes);ObjectInputStream oin = new ObjectInputStream(bos);MyHeader header = (MyHeader) oin.readObject();System.out.println("client reponse id : " + header.getRequestId());System.out.println("channelRead ....");// 然后放開門閂ResponseHandler.runCallBack(header.getRequestId());}} }/*** 用于主線程的阻塞的控制*/ class ResponseHandler {static ConcurrentHashMap<Long, Runnable> mapping = new ConcurrentHashMap<>();public static void addCallBack(long requestId, Runnable cb) {mapping.putIfAbsent(requestId, cb);}public static void runCallBack(long requestId) {Runnable runnable = mapping.get(requestId);runnable.run();removeCallBack(requestId);}public static void removeCallBack(long requestId) {mapping.remove(requestId);} }/*** 服務端注冊的事件* 沒有具體的業務邏輯,只接收客戶端連接并打印出客戶端要請求的服務(方法都不打印)*/ class ServerRequestHandler extends ChannelInboundHandlerAdapter {// provider@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf b = (ByteBuf) msg;ByteBuf sendBuf = b.copy();// 82 是打斷點跟蹤得出來的 header 的長度if (sendBuf.readableBytes() >= 82) {byte[] bytes = new byte[82];// 指針移動到 header 的末尾b.readBytes(bytes);ByteArrayInputStream bis = new ByteArrayInputStream(bytes);ObjectInputStream objectInputStream = new ObjectInputStream(bis);MyHeader myHeader = (MyHeader) objectInputStream.readObject();System.out.println("server response id : " + myHeader.getRequestId());// 讀取請求體,并答應服務名if (b.readableBytes() >= myHeader.getDataLen()) {byte[] data = new byte[(int) myHeader.getDataLen()];b.readBytes(data);ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(byteArrayInputStream);MyContent myContent = (MyContent) ois.readObject();System.out.println("server response name : " + myContent.getName());}}// 把數據包打印回客戶端ChannelFuture channelFuture = ctx.writeAndFlush(sendBuf);channelFuture.sync();} }很明顯這是不能用的,第一,客戶端只有一個,那么客戶端變成多個呢?緊接著手撕 RPC 2
總結
- 上一篇: 系统调用回答为什么要用buffer写
- 下一篇: 手撕 RPC 2