Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信
##
Netty實戰 IM即時通訊系統(六)實戰: 客戶端和服務端雙向通信零、 目錄
- Netty 簡介
- Netty 環境配置
- 服務端啟動流程
- 實戰: 客戶端和服務端雙向通信
- 數據傳輸載體ByteBuf介紹
- 客戶端與服務端通信協議編解碼
- 實現客戶端登錄
- 實現客戶端與服務端收發消息
- pipeline與channelHandler
- 構建客戶端與服務端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實現客戶端身份校驗
- 客戶端互聊原理與實現
- 群聊的發起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發及Netty性能優化
- 心跳與空閑檢測
- 總結
- 擴展
###六、 實戰: 客戶端和服務端雙向通信
本節我們要實現的功能是客戶端連接成功后,向服務端寫出一段數據 , 服務端收到數據后打印 , 并向客戶端回復一段數據 。
我們先做一個代碼框架 , 然后在框架上面做修改
public class Test_07_客戶端和服務端雙向通信 {public static void main(String[] args) {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1" , 8000 ,5);}}class Test_07_Client{public static void start(String IP , int port ,int maxRetry){NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});connect(bootstrap , IP , port , maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry , int... retryIndex) {bootstrap.connect(IP , port).addListener(future ->{int[] finalRetryIndex;if(future.isSuccess()) {System.out.println("連接成功");}else if(maxRetry ==0) {System.out.println("達到最大重試此時,放棄重試");}else {// 初始化 重試計數if(retryIndex.length == 0) {finalRetryIndex = new int[]{0};}else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執行重試System.out.println(new Date() +" 連接失敗,剩余重試次數:"+ maxRetry + ","+delay+"秒后執行重試");bootstrap.config().group().schedule(()->{connect(bootstrap , IP, port , maxRetry -1 , finalRetryIndex[0]+1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server{public static void start(int port){NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務端:端口【"+port+"】綁定成功!");}else {System.out.println("服務端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}客戶端發送數據到服務端
在《客戶端啟動流程》這一小節 , 我們提到 客戶端相關的數據讀寫邏輯是通過BootStrap的handler()方法指定
bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {}});現在我們在initChannel()中給客戶端添加一個邏輯處理器 , 這個處理器的作用就是負責向服務端寫數據
bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業務處理邏輯 可以添加自定義的業務處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});ch.pipeline()方法返回的是和這條連接相關的邏輯處理鏈 , 采用了責任鏈處理模式 , 這里不理解沒關系 , 后面會講到。
然后再調用addLast()方法添加一個邏輯處理器 , 這個邏輯處理器為的就是在客戶端建立連接成功之后向服務端寫數據 , 下面是這個邏輯處理器的代碼:
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println(new Date() + " 客戶端寫出數據...");// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數據byte[] bs = "你好,奧特曼!".getBytes(Charset.forName("UTF-8"));// 把數據填充到 bufbuf.writeBytes(bs);return buf;}}服務端讀取客戶端數據
服務端的數據處理邏輯 是通過ServerBootStrap 的childHandler()方法指定
serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// TODO Auto-generated method stub}})現在 , 我們在initChannel() 中 給服務端添加一個邏輯處理器 , 這個處理器 的作用就是負責客戶端讀數據
serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}})這個方法里的邏輯和客戶端類似 , 獲取服務端關于這條連接的邏輯處理鏈pipeline , 然后添加一個邏輯處理器 , 負責讀取客戶端發來的數據
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據->"+ buf.toString(Charset.forName("UTF-8")));}}運行測試
完整代碼
import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業務處理邏輯 可以添加自定義的業務處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執行重試System.out.println(new Date() + " 連接失敗,剩余重試次數:" + maxRetry + "," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務端:端口【"+port+"】綁定成功!");}else {System.out.println("服務端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數據:"+content);// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據->"+ buf.toString(Charset.forName("UTF-8")));}}運行結果:
服務端回復數據給客戶端
服務端向客戶端寫數據的邏輯與客戶端向服務端寫數據的邏輯一樣 , 先創建一個ByteBuf , 然后填充二進制數據 , 最后調用writeAndFlush()方法寫出去
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數據String content = "你好,田先森!";System.out.println(new Date() +":服務端寫出數據-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}現在輪到客戶端了 , 客戶端讀取數據的邏輯和服務端讀數據的邏輯一樣 , 同樣是覆蓋channelRead() 方法
class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數據 ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數據:"+content);// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到 bufbuf.writeBytes(bs);return buf;}}現在 客戶端和服務端就實現了雙向通信
完整代碼:
import java.nio.charset.Charset;import java.util.Date;import java.util.concurrent.TimeUnit;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class Test_07_客戶端和服務端雙向通信 {public static void main(String[] args) throws Exception {Test_07_Server.start(8000);Test_07_Client.start("127.0.0.1", 8000, 5);}}class Test_07_Client {public static void start(String IP, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加業務處理邏輯 可以添加自定義的業務處理邏輯也可以添加 Netty自帶的簡單通用的處理邏輯ch.pipeline().addLast(new Test_07_ClientHandler());}});connect(bootstrap, IP, port, maxRetry);}private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {bootstrap.connect(IP, port).addListener(future -> {int[] finalRetryIndex;if (future.isSuccess()) {System.out.println("客戶端連接【"+IP+":"+port+"】成功");} else if (maxRetry == 0) {System.out.println("達到最大重試此時,放棄重試");} else {// 初始化 重試計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 計算時間間隔int delay = 1 << finalRetryIndex[0];// 執行重試System.out.println(new Date() + " 連接失敗,剩余重試次數:" + maxRetry + "," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}class Test_07_Server {public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ch.pipeline().addLast(new Test_07_ServerHandler());}});bind(serverBootstrap, port);}private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if(future.isSuccess()) {System.out.println("服務端:端口【"+port+"】綁定成功!");}else {System.out.println("服務端:端口【"+port+"】綁定失敗,嘗試綁定【"+(port+1)+"】!");bind(serverBootstrap, port+1);}});}}class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;System.out.println(new Date()+": 客戶端讀到數據 ->"+ byteBuf.toString(Charset.forName("UTF-8")));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "你好,奧特曼!";System.out.println(new Date() + " 客戶端寫出數據:"+content);// 1. 獲取數據ByteBuf buffer = getByteBuf(ctx , content);// 2. 寫數據ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {// 獲取二進制抽象 ByteBufferByteBuf buf = ctx.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到 bufbuf.writeBytes(bs);return buf;}}class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數據String content = "你好,田先森!";System.out.println(new Date() +":服務端寫出數據-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}執行結果
總結
思考: 如何實現在新連接介入的時候 , 服務端主動向客戶端推送消息 , 客戶端回復服務端消息?
解答: 在服務器端的邏輯處理其中也實現 channelActive() 在有新的連接接入時 會回調此方法
class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {String content = "是不是你連我了?";System.out.println(new Date() +":服務端寫出數據-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(new Date() + ": 服務端讀到數據->"+ buf.toString(Charset.forName("UTF-8")));// 向客戶端回復數據String content = "你好,田先森!";System.out.println(new Date() +":服務端寫出數據-> "+content);ByteBuf byteBuf = getByteBuf(ctx , content);ctx.channel().writeAndFlush(byteBuf);}private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {// 獲取 二進制抽象 ByteBufByteBuf byteBuf = cxt.alloc().buffer();// 準備數據byte[] bs = content.getBytes(Charset.forName("UTF-8"));// 把數據填充到buf中byteBuf.writeBytes(bs);return byteBuf;}}總結
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty实战 IM即时通讯系统(五)客
- 下一篇: Netty实战 IM即时通讯系统(七)数