生活随笔
收集整理的這篇文章主要介紹了
手撕 RPC 2
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
把單個客戶端改成多個呢?
在成功連接幾個client后報錯了,是拆解包的時候出錯,為什么呢?看一下多個客戶端連接時候的 netty 的模型:
因為服務(wù)程序什么時候從內(nèi)核的buffer里讀走數(shù)據(jù)跟客戶端往buffer里寫數(shù)據(jù)是兩個獨立的動作,所以在多客戶端(多個socket)的場景下是不能整齊的讀到正確的包,所以報錯解碼錯誤,IO是雙向的,服務(wù)端解碼錯誤的問題在客戶端也一樣,怎么處理?
處理通信流的解碼問題
出現(xiàn)這種問題底層的原因是
因為多個socket往一個buffer里寫,所以要保證程序讀取的時候每次都能跟包的頭部(header和body的頭部)對齊,而且每次readChannel()的之前程序把該次請求的包發(fā)完整。強大的 netty 給了我們 ByteToMessageDecoder ,在 pipeline 的業(yè)務(wù)事件之前加上解碼事件就可以了。
另外解決一些問題
1.服務(wù)器使用20個線程處理 listen socket 連接和 IO socket。
2.多個 client 連接,維護一個線程池來管理這些連接。
3.使用 netty 的 ByteToMessageDecoder 解決解碼不正確問題。
4.使用 CompletableFuture 來獲取客戶端調(diào)用方法的返回。
package rpc
;import io
.netty
.bootstrap
.Bootstrap
;
import io
.netty
.bootstrap
.ServerBootstrap
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.PooledByteBufAllocator
;
import io
.netty
.channel
.*
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.nio
.NioServerSocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
import io
.netty
.handler
.codec
.ByteToMessageDecoder
;
import org
.junit
.Test
;import java
.io
.*
;
import java
.lang
.reflect
.InvocationHandler
;
import java
.lang
.reflect
.Method
;
import java
.lang
.reflect
.Proxy
;
import java
.net
.InetSocketAddress
;
import java
.util
.List
;
import java
.util
.Random
;
import java
.util
.UUID
;
import java
.util
.concurrent
.CompletableFuture
;
import java
.util
.concurrent
.ConcurrentHashMap
;
import java
.util
.concurrent
.atomic
.AtomicInteger
;
public class MyRpcTest {@Testpublic void startServer() {
NioEventLoopGroup eventExecutors
= new NioEventLoopGroup(20);NioEventLoopGroup worker
= eventExecutors
;ServerBootstrap serverBootstrap
= new ServerBootstrap();ChannelFuture localhost
= serverBootstrap
.group(eventExecutors
, worker
).channel(NioServerSocketChannel
.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel
) {ChannelPipeline pipeline
= nioSocketChannel
.pipeline();pipeline
.addLast(new MyDecoder());pipeline
.addLast(new ServerRequestHandler());}}).bind(new InetSocketAddress("localhost", 9090));try {localhost
.sync().channel().closeFuture().sync();} catch (InterruptedException e
) {e
.printStackTrace();}}@Testpublic void get() {new Thread(() -> {startServer();}).start();try {Thread
.sleep(2000);} catch (InterruptedException e
) {e
.printStackTrace();}System
.out
.println("server start ....");AtomicInteger atomicInteger
= new AtomicInteger();int size
= 30;Thread
[] threads
= new Thread[size
];for (int i
= 0; i
< size
; i
++) {threads
[i
] = new Thread(() -> {Car car
= proxyGet(Car
.class);int args
= atomicInteger
.incrementAndGet();String s
= car
.race("client: " + args
);System
.out
.println("client got args: " + s
+ " --- " + args
);});}for (Thread thread
: threads
) {thread
.start();}try {System
.in
.read();} catch (IOException e
) {e
.printStackTrace();}}private static <T> T
proxyGet(Class
<T> interfaceInfo
) {ClassLoader classLoader
= interfaceInfo
.getClassLoader();Class
<?>[] interfaces
= {interfaceInfo
};return (T
) Proxy
.newProxyInstance(classLoader
, interfaces
, new InvocationHandler() {@Overridepublic Object
invoke(Object proxy
, Method method
, Object
[] args
) throws Throwable
{String name
= interfaceInfo
.getName(); String methodName
= method
.getName(); Class
<?>[] parameterTypes
= method
.getParameterTypes(); MyContent content
= new MyContent();content
.setName(name
);content
.setMethodName(methodName
);content
.setParameterTypes(parameterTypes
);content
.setArgs(args
);ByteArrayOutputStream bos
= new ByteArrayOutputStream();ObjectOutputStream oos
= new ObjectOutputStream(bos
);oos
.writeObject(content
);byte[] msgBody
= bos
.toByteArray();MyHeader myHeader
= createHeader(msgBody
);bos
.reset();oos
= new ObjectOutputStream(bos
);oos
.writeObject(myHeader
);byte[] msgHeader
= bos
.toByteArray();ClientFactory factory
= ClientFactory
.getFactory();NioSocketChannel client
= factory
.getClient(new InetSocketAddress("localhost", 9090));ByteBuf byteBuf
= PooledByteBufAllocator
.DEFAULT
.directBuffer(msgHeader
.length
+ msgBody
.length
);CompletableFuture
<String> future
= new CompletableFuture<>();ResponseMappingHandler
.addCallBack(myHeader
.getRequestId(), future
);byteBuf
.writeBytes(msgHeader
);byteBuf
.writeBytes(msgBody
);ChannelFuture channelFuture
= client
.writeAndFlush(byteBuf
);channelFuture
.sync();return future
.get();}});}static MyHeader
createHeader(byte[] msgBytes
) {MyHeader header
= new MyHeader();int size
= msgBytes
.length
;int f
= 0x14141414;long requestId
= Math
.abs(UUID
.randomUUID().getLeastSignificantBits());header
.setFlag(f
);header
.setRequestId(requestId
);header
.setDataLen(size
);return header
;}
}
interface Car {String
race(String msg
);
}
class MyHeader implements Serializable {int flag
;long requestId
;long dataLen
;public int getFlag() {return flag
;}public void setFlag(int flag
) {this.flag
= flag
;}public long getRequestId() {return requestId
;}public void setRequestId(long requestId
) {this.requestId
= requestId
;}public long getDataLen() {return dataLen
;}public void setDataLen(long dataLen
) {this.dataLen
= dataLen
;}
}
class MyContent implements Serializable {String name
;String methodName
;Class
<?>[] parameterTypes
;Object
[] args
;String res
;public String
getRes() {return res
;}public void setRes(String res
) {this.res
= res
;}public String
getName() {return name
;}public void setName(String name
) {this.name
= name
;}public String
getMethodName() {return methodName
;}public void setMethodName(String methodName
) {this.methodName
= methodName
;}public Class
<?>[] getParameterTypes() {return parameterTypes
;}public void setParameterTypes(Class
<?>[] parameterTypes
) {this.parameterTypes
= parameterTypes
;}public Object
[] getArgs() {return args
;}public void setArgs(Object
[] args
) {this.args
= args
;}
}
class ClientFactory {int pollSize
= 50; NioEventLoopGroup clientWorker
;Random random
= new Random();private static final ClientFactory factory
;private ClientFactory() {}static {factory
= new ClientFactory();}public static ClientFactory
getFactory() {return factory
;}ConcurrentHashMap
<InetSocketAddress, ClientPool> outboxes
= new ConcurrentHashMap<InetSocketAddress, ClientPool>();public synchronized NioSocketChannel
getClient(InetSocketAddress address
) {ClientPool clientPool
= outboxes
.get(address
);if (clientPool
== null
) {outboxes
.putIfAbsent(address
, new ClientPool(pollSize
));clientPool
= outboxes
.get(address
);}int i
= random
.nextInt(pollSize
);if (clientPool
.clients
[i
] != null
&& clientPool
.clients
[i
].isActive()) {return clientPool
.clients
[i
];}synchronized (clientPool
.locks
[i
]) {return clientPool
.clients
[i
] = create(address
);}}private NioSocketChannel
create(InetSocketAddress address
) {clientWorker
= new NioEventLoopGroup(1);Bootstrap bs
= new Bootstrap();ChannelFuture connect
= bs
.group(clientWorker
).channel(NioSocketChannel
.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel nioSocketChannel
) {ChannelPipeline pipeline
= nioSocketChannel
.pipeline();pipeline
.addLast(new MyDecoder());pipeline
.addLast(new ClientResponses());}}).connect(address
);try {NioSocketChannel client
= (NioSocketChannel
) connect
.sync().channel();return client
;} catch (InterruptedException e
) {e
.printStackTrace();}return null
;}
}
class ClientPool {NioSocketChannel
[] clients
;Object
[] locks
;ClientPool(int size
) {clients
= new NioSocketChannel[size
];locks
= new Object[size
];for (int i
= 0; i
< size
; i
++) {在這里插入代碼片locks
[i
] = new Object();}}
}class ClientResponses extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{MyDataPackage data
= (MyDataPackage
) msg
;ResponseMappingHandler
.runCallBack(data
);}
}
class ResponseMappingHandler {static ConcurrentHashMap
<Long, CompletableFuture> mapping
= new ConcurrentHashMap<>();public static void addCallBack(long requestId
, CompletableFuture cb
) {mapping
.putIfAbsent(requestId
, cb
);}public static void runCallBack(MyDataPackage data
) {mapping
.get(data
.getHeader().getRequestId()).complete(data
.getContent().getRes());removeCallBack(data
.getHeader().getRequestId());}public static void removeCallBack(long requestId
) {mapping
.remove(requestId
);}
}
class ServerRequestHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{MyDataPackage data
= (MyDataPackage
) msg
;String ioThreadName
= Thread
.currentThread().getName();ctx
.executor().parent().next().execute(() -> {
String execThreadName
= Thread
.currentThread().getName();MyContent content
= new MyContent();String s
= "io thread: " + ioThreadName
+ " exec thread: " + execThreadName
+ " from args: " + data
.getContent().getArgs()[0];content
.setRes(s
);byte[] contentByte
= MySerializeUtil
.serialize(content
);MyHeader header
= new MyHeader();header
.setFlag(0x14141424);header
.setRequestId(data
.getHeader().getRequestId());header
.setDataLen(contentByte
.length
);byte[] headerByte
= MySerializeUtil
.serialize(header
);ByteBuf byteBuf
= PooledByteBufAllocator
.DEFAULT
.directBuffer(headerByte
.length
+ contentByte
.length
);byteBuf
.writeBytes(headerByte
);byteBuf
.writeBytes(contentByte
);ctx
.writeAndFlush(byteBuf
);});}
}
class MyDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext
, ByteBuf byteBuf
, List
<Object> list
) throws Exception
{while (byteBuf
.readableBytes() >= 82) {byte[] bytes
= new byte[82];byteBuf
.getBytes(byteBuf
.readerIndex(), bytes
);ByteArrayInputStream bis
= new ByteArrayInputStream(bytes
);ObjectInputStream objectInputStream
= new ObjectInputStream(bis
);MyHeader myHeader
= (MyHeader
) objectInputStream
.readObject();if (byteBuf
.readableBytes() >= myHeader
.getDataLen()) {byteBuf
.readBytes(82); byte[] data
= new byte[(int) myHeader
.getDataLen()];byteBuf
.readBytes(data
);ByteArrayInputStream byteArrayInputStream
= new ByteArrayInputStream(data
);ObjectInputStream ois
= new ObjectInputStream(byteArrayInputStream
);if (myHeader
.getFlag() == 0x14141414) {MyContent myContent
= (MyContent
) ois
.readObject();list
.add(new MyDataPackage(myHeader
, myContent
));} else if (myHeader
.getFlag() == 0x14141424) {MyContent myContent
= (MyContent
) ois
.readObject();list
.add(new MyDataPackage(myHeader
, myContent
));}} else {break;}}}
}class MyDataPackage {private MyHeader header
;private MyContent content
;public MyDataPackage(MyHeader myHeader
, MyContent myContent
) {this.header
= myHeader
;this.content
= myContent
;}public MyHeader
getHeader() {return header
;}public void setHeader(MyHeader header
) {this.header
= header
;}public MyContent
getContent() {return content
;}public void setContent(MyContent content
) {this.content
= content
;}
}
總結(jié)
以上是生活随笔為你收集整理的手撕 RPC 2的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。