Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline
Netty實(shí)戰(zhàn) IM即時(shí)通訊系統(tǒng)(十二)構(gòu)建客戶端與服務(wù)端pipeline
零、 目錄
- Netty 簡(jiǎn)介
- Netty 環(huán)境配置
- 服務(wù)端啟動(dòng)流程
- 客戶端啟動(dòng)流程
- 實(shí)戰(zhàn): 客戶端和服務(wù)端雙向通信
- 數(shù)據(jù)傳輸載體ByteBuf介紹
- 客戶端與服務(wù)端通信協(xié)議編解碼
- 實(shí)現(xiàn)客戶端登錄
- 實(shí)現(xiàn)客戶端與服務(wù)端收發(fā)消息
- pipeline與channelHandler
- 構(gòu)建客戶端與服務(wù)端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實(shí)現(xiàn)客戶端身份校驗(yàn)
- 客戶端互聊原理與實(shí)現(xiàn)
- 群聊的發(fā)起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發(fā)及Netty性能優(yōu)化
- 心跳與空閑檢測(cè)
- 總結(jié)
- 擴(kuò)展
一、 ChannelInboundHandlerAdapter 與 ChannelOutboundHandlerAdapter
首先是ChannelInboundHandlerAdapter , 這個(gè)適配器只用用于實(shí)現(xiàn)其接口ChannelInboundHandler 的所有方法,這樣我們?cè)诰帉懽约旱膆andler時(shí)就不需要實(shí)現(xiàn)handler里的每一個(gè)方法,而只需要實(shí)現(xiàn)我們關(guān)心的方法 , 默認(rèn)情況下 , 對(duì)于ChannelInboundHandlerAdapter , 我們比較關(guān)心的是他的channelRead()
ChannelInboundHandlerAdapter.java@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ctx.fireChannelRead(msg);}與ChannelinboundHandlerAdapter相似的是ChannelOutboundHandlerAdapter , 他的核心方法是 write() 方法
ChannelOutboundHandlerAdapter.java@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ctx.write(msg, promise);}我們往pipeline中添加的第一個(gè)handler中的channelRead方法中 , msg對(duì)象其實(shí)就是ByteBuf , 服務(wù)端在接收數(shù)據(jù)之后 , 應(yīng)該首先把這個(gè)ByteBuf解碼 , 然后把解碼之后的結(jié)果傳遞給下一個(gè)handler :
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf requestByteBuf = (ByteBuf) msg;// 解碼Packet packet = PacketCodeC.INSTANCE.decode(requestByteBuf);// 解碼后的對(duì)象傳遞到下一個(gè) handler 處理ctx.fireChannelRead(packet)}在開始解碼之前我們來了解另外一個(gè)特殊的handler
二 、 ByteToMessageDecoder
通常情況下 , 無論是我們?cè)诳蛻舳诉€是在服務(wù)端 , 當(dāng)我們接收到數(shù)據(jù)之后 , 首先要做的就是事情就是把二進(jìn)制數(shù)據(jù)轉(zhuǎn)換成我們所需要的java 對(duì)象 , 所以Netty 很貼心的幫我們寫了一個(gè)父類 ,來專門做這個(gè)事情 , 我們來看一下如何使用這個(gè)類來實(shí)現(xiàn)服務(wù)端二進(jìn)制數(shù)據(jù)解碼:
public class PacketDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) {out.add(PacketCodeC.INSTANCE.decode(in));}}當(dāng)我們通過解碼器把二進(jìn)制數(shù)據(jù)轉(zhuǎn)換到j(luò)ava 對(duì)象即指令數(shù)據(jù)包之后 , 就可以針對(duì)每一種指令數(shù)據(jù)包編寫邏輯了 。
三 、 SimpleChannelInboundHandler
回顧一下我們之前處理java 對(duì)象的邏輯
if (packet instanceof LoginRequestPacket) {// ...} else if (packet instanceof MessageRequestPacket) {// ...} else if ...我們通過if - else 來進(jìn)行邏輯處理 , 當(dāng)我們需要處理的指令越來越多的時(shí)候 , 代碼就會(huì)顯得越來越臃腫 ,這個(gè)時(shí)候我們可以通過給pipeline 添加多個(gè)handler(集成 ChannelInboundHandlerAdapter) 來解決多if-else 的問題
XXXHandler.javaif (packet instanceof XXXPacket) {// ...處理} else {ctx.fireChannelRead(packet); }接下來我們看一看 , 如何使用 SimpleChannelInboundHandler 簡(jiǎn)化我們的指令處理邏輯
LoginRequestHandler.javapublic class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {// 登錄邏輯}}四 、 MessageToByteEncoder
在前面的幾個(gè)小節(jié) , 我們已經(jīng)實(shí)現(xiàn)了登錄和消息處理邏輯 , 處理完請(qǐng)求之后 , 我們都會(huì)給客戶端一個(gè)響應(yīng) , 在寫響應(yīng)之前 , 我們需要把響應(yīng)對(duì)象編碼成ByteBuf , 結(jié)合本小節(jié)的內(nèi)容 , 最后的邏輯框架如下:
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {LoginResponsePacket loginResponsePacket = login(loginRequestPacket);ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginResponsePacket);ctx.channel().writeAndFlush(responseByteBuf);}}public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {MessageResponsePacket messageResponsePacket = receiveMessage(messageRequestPacket);ByteBuf responseByteBuf = PacketCodeC.INSTANCE.encode(ctx.alloc(), messageRequestPacket);ctx.channel().writeAndFlush(responseByteBuf);}}我們注意到 , 我們處理完每一種指令之后的邏輯都是相似的 , 都需要進(jìn)行解碼 , 然后調(diào)用 writeAndFlush() 將數(shù)據(jù)寫到對(duì)端 , 這個(gè)編碼的過程其實(shí)也是重復(fù)的邏輯 , 而且在編碼的過程中國(guó) , 我們還需要手動(dòng)去創(chuàng)建一個(gè)ByteBuf :
PacketCodeC.javapublic ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {// 1. 創(chuàng)建 ByteBuf 對(duì)象ByteBuf byteBuf = byteBufAllocator.ioBuffer();// 2. 序列化 java 對(duì)象// 3. 實(shí)際編碼過程return byteBuf;}而Netty 提供了一個(gè)特殊的channelHandler 來專門處理這種邏輯 , 我們不需要每一次將響應(yīng)寫到對(duì)端的時(shí)候調(diào)用一次編碼邏輯進(jìn)行編碼 , 也不需要自行創(chuàng)建ByteBuf , 這個(gè)類叫做 MessageToByteEncoder , 從字面意思可以看出 , 他的功能就是將對(duì)象轉(zhuǎn)化到二進(jìn)制數(shù)據(jù) 。
我們來看一下如何使用 MessageToByteEncoder 來實(shí)現(xiàn)編碼邏輯
public class PacketEncoder extends MessageToByteEncoder<Packet> {@Overrideprotected void encode(ChannelHandlerContext ctx, Packet packet, ByteBuf out) {PacketCodeC.INSTANCE.encode(out, packet);}}PacketEncoder 集成自 MessageToByteEncoder , 泛型參數(shù) Packet 表示這個(gè)類的作用是 將Packet 類型對(duì)象到二進(jìn)制的轉(zhuǎn)化 。
這里我們只需要實(shí)現(xiàn)encode() 方法 , 我們注意到 , 這個(gè)方法的第二個(gè)參數(shù)是java對(duì)象 , 而第三個(gè)參數(shù)是ByteBuf 對(duì)象 , 我們?cè)谶@個(gè)方法里面要做的事情就是把java對(duì)象里面的字段寫到ByteBuf , 我們不在需要自行去分配ByteBuf , 因此大家注意到 , PacketCodeC 的 encode() 方法 的定義也改了 , 下面是更改前后的對(duì)比:
PacketCodeC.java// 更改前的定義public ByteBuf encode(ByteBufAllocator byteBufAllocator, Packet packet) {// 1. 創(chuàng)建 ByteBuf 對(duì)象ByteBuf byteBuf = byteBufAllocator.ioBuffer();// 2. 序列化 java 對(duì)象// 3. 實(shí)際編碼過程return byteBuf;}// 更改后的定義public void encode(ByteBuf byteBuf, Packet packet) {// 1. 序列化 java 對(duì)象// 2. 實(shí)際編碼過程}我們可以看到 , PacketCodeC 不在需要手動(dòng)創(chuàng)建ByteBuf對(duì)象 , 不在需要把創(chuàng)建完ByteBuf 的進(jìn)行返回 , 當(dāng)我們向pipeline 中添加了這個(gè)編碼器之后 , 我們?cè)谥噶钐幚硗戤呏缶椭恍枰獁riteAndFlush java 對(duì)象即可
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {ctx.channel().writeAndFlush(login(loginRequestPacket));}}public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageRequestPacket) {ctx.channel().writeAndFlush(receiveMessage(messageRequestPacket));}}通過我們前面的分析 , 可以看到 , Netty 為了讓我們邏輯更加清晰簡(jiǎn)潔 , 幫我們做了很多工作 嗎能直接用Netty 自帶的handler 來解決問題 , 不要重復(fù)制造輪子 , 在下面的小節(jié)中 , 我們會(huì)繼續(xù)探討Netty還有哪些開箱即用的handler
分析完服務(wù)端的pipeline 與 handler 組成結(jié)構(gòu) , 相信你們也不難自行分析出客戶端handler 的結(jié)構(gòu)了 , 最后我們來看一下服務(wù)端和客戶端完整的pipeline 與handler結(jié)構(gòu)
五、 構(gòu)建服務(wù)端和客戶端 pipeline 與handler
對(duì)應(yīng)我們的代碼
服務(wù)端serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {protected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());ch.pipeline().addLast(new LoginRequestHandler());ch.pipeline().addLast(new MessageRequestHandler());ch.pipeline().addLast(new PacketEncoder());}});客戶端bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new PacketDecoder());ch.pipeline().addLast(new LoginResponseHandler());ch.pipeline().addLast(new MessageResponseHandler());ch.pipeline().addLast(new PacketEncoder());}});六 、 完整代碼
七 、 總結(jié)
八 、 思考
答: channelRead0(ChannelHandlerContext ctx, MessageRequestPacket msg) 的msg是由父類SimpleChannelInboundHandler的channelRead() 方法判斷是需要類型后, 強(qiáng)轉(zhuǎn)類型后傳遞進(jìn)來的
I imsg = (I) msg;channelRead0(ctx, imsg);總結(jié)
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty实战 IM即时通讯系统(十一)
- 下一篇: Git《一》简介及安装使用