Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息
Netty實戰 IM即時通訊系統(十)實現客戶端和服務端收發消息
零、 目錄
- Netty 簡介
- Netty 環境配置
- 服務端啟動流程
- 客戶端啟動流程
- 實戰: 客戶端和服務端雙向通信
- 數據傳輸載體ByteBuf介紹
- 客戶端與服務端通信協議編解碼
- 實現客戶端登錄
- 實現客戶端與服務端收發消息
- pipeline與channelHandler
- 構建客戶端與服務端pipeline
- 拆包粘包理論與解決方案
- channelHandler的生命周期
- 使用channelHandler的熱插拔實現客戶端身份校驗
- 客戶端互聊原理與實現
- 群聊的發起與通知
- 群聊的成員管理(加入與退出,獲取成員列表)
- 群聊消息的收發及Netty性能優化
- 心跳與空閑檢測
- 總結
- 擴展
一、 實現需求
二、 代碼框架
在代碼框架中我們已經實現了 服務端啟動 、 客戶端啟動 、 客戶端與服務端雙向通信 、 客戶端與服務端通信協議編解碼 、 客戶端登錄的邏輯 , 接下來你可以把代碼框架粘貼到你的編輯器中跟我來一起實現客戶端與服務端收發消息
import java.lang.reflect.Method;import java.util.Arrays;import java.util.Date;import java.util.HashMap;import java.util.Map;import java.util.concurrent.TimeUnit;import com.alibaba.fastjson.JSONObject;import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code;import io.netty.bootstrap.Bootstrap;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandlerAdapter;import io.netty.channel.ChannelInitializer;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import lombok.Data;/*** 2019年1月3日* * @author outman* * 實現客戶端和服務端收發消息**/public class Test_11_實現客戶端與服務端收發消息 {public static void main(String[] args) {// 啟動服務端Test_11_server.start(8000);// 啟動客戶端Test_11_client.start("127.0.0.1", 8000, 5);}}/*** 2019年1月3日* * @author outman** 服務端*/class Test_11_server {/*** @desc 服務端啟動* @param port*/public static void start(int port) {NioEventLoopGroup bossgroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務端處理邏輯ch.pipeline().addLast(new Test_11_serverHandler());}});bind(serverBootstrap, port);}/*** @desc 自動綁定遞增并啟動服務端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】失敗,執行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 2019年1月3日* * @author outman** 客戶端*/class Test_11_client {/*** 客戶端啟動* * @param ip* 連接ip* @param port* 服務端端口* @param maxRetry* 最大重試次數*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_11_clientHandler());}});// 連接服務端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計數*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達到重連最大次數放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 客戶端處理邏輯* * @author outman*/class Test_11_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:" + new Date() + "開始登陸");// 創建登陸對象Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket();// 隨機取ID 1~999loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務端響應【" + loginResponsePacket.getMsg() + "】");break;default:break;}}}/*** 服務端處理邏輯* * @author outman*/class Test_11_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗成功System.out.println("服務端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給服務端響應Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}}/*** 數據包抽象類* * @author outman*/@Dataabstract class Test_11_Packet {// 協議版本號private byte version = 1;// 獲取指定標識public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_11_Serializer {// 獲取序列化算法標識byte getSerializerAlgorithm();// 序列化算法標識集合interface SerializerAlgorithm {// JSON 序列化算法標識public static final byte JSONSerializerAlgrothm = 1;}// 默認的序列化算法public Test_11_Serializer DEFAULT = new Test_11_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet);// 反序列化<T> T deSerialize(byte[] bs, Class<T> clazz);}/*** 數據包編解碼類* * @author outman*/class Test_11_PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_11_PacketCodec INSTANCE = new Test_11_PacketCodec();// 注冊 序列化類private Class[] serializerArray = new Class[] { Test_11_JSONSerializer.class };// 注冊抽象數據包類private Class[] packetArray = new Class[] { Test_11_LoginRequestPacket.class, Test_11_LoginResponsePacket.class };// 序列化算法標識 和對應的序列化類映射private static Map<Byte, Class<? super Test_11_Serializer>> serializerMap;// 指令標識和對應的數據包抽象類映射private static Map<Byte, Class<? super Test_11_Packet>> packetMap;// 初始化 兩個映射private Test_11_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_11_Serializer) clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_11_Packet) clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_11_Packet packet) {// 序列化數據包byte[] bs = Test_11_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數byteBuf.writeInt(MAGIC_NUMBER);// 寫入協議版本號byteBuf.writeByte(packet.getVersion());// 寫入指令標識byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標識byteBuf.writeByte(Test_11_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數據長度byteBuf.writeInt(bs.length);// 寫入數據byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_11_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過魔數校驗byteBuf.skipBytes(4);// 跳過版本號校驗byteBuf.skipBytes(1);// 獲取指令標識byte command = byteBuf.readByte();// 獲取序列化算法標識byte serializerAlgorthm = byteBuf.readByte();// 獲取數據長度int len = byteBuf.readInt();// 獲取數據byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對應的序列化算法類Test_11_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對應的數據包類Test_11_Packet packet = getPacket(command);if (serializer != null && packet != null) {// 反序列化數據包return serializer.deSerialize(bs, packet.getClass());} else {throw new RuntimeException("沒有找到對應的序列化實現或數據包實現");}}private static Test_11_Packet getPacket(byte command) throws Exception {if(packetMap.get(command) == null) {throw new RuntimeException("未注冊的數據包類型");}return (Test_11_Packet) packetMap.get(command).newInstance();}private static Test_11_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_11_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請求數據包實體類* * @author outman*/@Dataclass Test_11_LoginRequestPacket extends Test_11_Packet {private int userId;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應數據包實體類* * @author outman*/@Dataclass Test_11_LoginResponsePacket extends Test_11_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應碼集合*/interface Code {// 成功的響應碼public static final int SUCCESS = 10000;// 失敗的響應碼public static final int FAIL = 10001;}}/*** Json序列化實現類* * @author outman*/class Test_11_JSONSerializer implements Test_11_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T> T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}三、 收發消息對象
首先 ,Test_11_packet的指令集合中添加 發送消息指令為3
/*** 數據包抽象類* * @author outman*/@Dataabstract class Test_11_Packet {// 協議版本號private byte version = 1;// 獲取指定標識public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應指令public static final byte LOGIN_RESPONSE = 2;// 發送消息指令public static final byte MESSAGE_REQUEST = 3;// 回復消息指令public static final byte MESSAGE_RESPONSE = 4;}}我們來定義一下客戶端與服務端收發消息對象 , 我們把客戶端發送至服務端的消息對象定義為Test_11_MessageRequestPacket
/*** 2019年1月3日* @author outman** 發送消息對象*/@Dataclass Test_11_MessageRequestPacket extends Test_11_Packet{private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_REQUEST;}}我們把服務端發送至客戶端的消息對象定義為 Test_11_messageResponsePacket
/*** 2019年1月3日* @author outman* 回復消息對象*/@Dataclass Test_11_MessageResponsePacket extends Test_11_Packet{private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_RESPONSE;}}四、 判斷登錄是否成功
在前面一小節 , 我們在文末出了一道思考題: 如何判斷客戶端是否已經登錄?
在客戶端啟動流程這一章節 , 我們有提到可以給客戶端連接也就是channel 綁定屬性 , 通過channel.attr(XXX).set(xxx)的方式 , 那么我們是否可以在登錄成功之后 , 給channel綁定一個登錄成功的標志 , 然后在判斷是否登錄成功的時候取出這個標志? 答案十肯定的。
我們來定義一下登錄成功的標志
/*** 2019年1月21日* @author outman* * 連接 屬性**/interface Test_11_ChannelAttributes {// 連接登錄標識屬性AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");}然后我們在登錄成功之后給連接綁定登錄成功標識
Test_11_clientHandler.java
/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務端響應【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標識if(Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println(new Date() + "登錄成功");}else {// 登錄失敗System.out.println(new Date() + "登錄失敗,原因-->"+loginResponsePacket.getMsg());}break;default:break;}}登錄相關工具類
/*** 2019年1月21日* @author outman* * 登錄相關工具類**/class Test_11_LoginUtil{/*** @desc 判斷登錄成功* @param loginResponsePacket* @return 是否登錄成功*/public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) {boolean flag = false;if(loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) {flag = true;}return flag;}/*** @desc 標識連接登錄成功* @param channel* @return*/public static void markAsLogin(Channel channel) {channel.attr(Test_11_ChannelAttributes.LOGIN).set(true);}/*** @desc 判斷是否登錄* @param channel* @return*/public static boolean hasLogin(Channel channel) {boolean flag = false;Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get();if(attr == null) return flag;return attr;}}如以上代碼所示 , 我們出去出LoginUtils用于設置登錄成功標志位已經判斷是否有標志位
五、 控制臺輸入消息并發送
在客戶端啟動這小節中 , 我們已經學到了客戶端啟動流程 , 現在 , 我們在客戶端連接上服務端之后啟動控制臺線程 , 從控制臺讀取消息然后發送到服務端。
Test_11_client.java /*** @desc 連接服務端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計數*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");// 啟動控制臺線程Channel channel = ((ChannelFuture) future).channel();startConsoleThread(channel);} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達到重連最大次數放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}/*** @desc 啟動控制臺線程* @param channel*/private static void startConsoleThread(Channel channel) {System.out.println("客戶端:啟動控制臺線程");new Thread(() -> {while (Thread.interrupted()) {if (Test_11_LoginUtil.hasLogin(channel)) {System.out.println("輸入消息發送至服務端");Scanner sc = new Scanner(System.in);String msg = sc.nextLine();Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket();messageRequestPacket.setMessage(msg);ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(),messageRequestPacket);channel.writeAndFlush(byteBuf);} else {System.out.println("您還未登錄,請登錄...");}}}).start();}六、 服務端收到消息處理
Test_11_serverHandler.java/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗成功System.out.println("服務端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給客戶端響應Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);break;case Test_11_Packet.Command.MESSAGE_REQUEST :// 處理消息Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet;System.out.println("服務端:"+ new Date() + "收到客戶端消息 --> "+ messageRequestPacket.getMessage());Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket();String msg = messageRequestPacket.getMessage();msg = msg.replace("?", "!");msg = msg.replace("?", "!");messageResponsePacket.setMessage("服務端回復:【"+msg+"】");Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket);ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}七、 客戶端消息處理
Test_11_clientHandler.java@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務端響應【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標識if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println("客戶端:" + new Date() + "登錄成功");} else {// 登錄失敗System.out.println("客戶端:" + new Date() + "登錄失敗,原因-->" + loginResponsePacket.getMsg());}break;case Test_11_Packet.Command.MESSAGE_RESPONSE :Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet;System.out.println("客戶端:"+new Date()+ "收到服務端消息 --> "+ messageResponsePacket.getMessage());break;default:break;}}八、 執行結果
九、 完整代碼
import java.lang.reflect.Method; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Scanner; import java.util.concurrent.TimeUnit;import com.alibaba.fastjson.JSONObject; import com.tj.NIO_test_maven.Test_11_LoginResponsePacket.Code;import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.AttributeKey; import lombok.Data;/*** 2019年1月3日* * @author outman* * 實現客戶端和服務端收發消息**/ public class Test_11_實現客戶端與服務端收發消息 {public static void main(String[] args) {// 啟動服務端Test_11_server.start(8000);// 啟動客戶端Test_11_client.start("127.0.0.1", 8000, 5);}}/*** 2019年1月3日* * @author outman** 服務端*/ class Test_11_server {/*** @desc 服務端啟動* @param port*/public static void start(int port) {NioEventLoopGroup bossgroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossgroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服務端處理邏輯ch.pipeline().addLast(new Test_11_serverHandler());}});bind(serverBootstrap, port);}/*** @desc 自動綁定遞增并啟動服務端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】成功");} else {System.out.println("服務端:" + new Date() + "綁定端口【" + port + "】失敗,執行遞增綁定");bind(serverBootstrap, port + 1);}});}}/*** 2019年1月3日* * @author outman** 客戶端*/ class Test_11_client {/*** 客戶端啟動* * @param ip* 連接ip* @param port* 服務端端口* @param maxRetry* 最大重試次數*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客戶端處理邏輯ch.pipeline().addLast(new Test_11_clientHandler());}});// 連接服務端connect(bootstrap, ip, port, maxRetry);}/*** @desc 連接服務端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex* 重試計數*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重連計數if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判斷連接狀態if (future.isSuccess()) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】成功");// 啟動控制臺線程Channel channel = ((ChannelFuture) future).channel();startConsoleThread(channel);} else if (maxRetry <= 0) {System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗,達到重連最大次數放棄重連");} else {// 重連使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客戶端:" + new Date() + "連接【" + ip + ":" + port + "】失敗," + delay + "秒后執行重試");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}/*** @desc 啟動控制臺線程* @param channel*/private static void startConsoleThread(Channel channel) {System.out.println("客戶端:啟動控制臺線程");new Thread(() -> {while (!Thread.interrupted()) {if (Test_11_LoginUtil.hasLogin(channel)) {System.out.println("輸入消息發送至服務端");Scanner sc = new Scanner(System.in);String msg = sc.nextLine();Test_11_MessageRequestPacket messageRequestPacket = new Test_11_MessageRequestPacket();messageRequestPacket.setMessage(msg);ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(channel.alloc().buffer(),messageRequestPacket);channel.writeAndFlush(byteBuf);} else {System.out.println("您還未登錄,請登錄...");}}}).start();} }/*** 客戶端處理邏輯* * @author outman*/ class Test_11_clientHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端:" + new Date() + "開始登陸");// 創建登陸對象Test_11_LoginRequestPacket loginRequestPacket = new Test_11_LoginRequestPacket();// 隨機取ID 1~999loginRequestPacket.setUserId((int) (Math.random() * 1000) + 1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 編碼ByteBuf byteBuf = Test_11_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 數據包解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據不同的指令選擇對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_RESPONSE:Test_11_LoginResponsePacket loginResponsePacket = (Test_11_LoginResponsePacket) packet;System.out.println("客戶端:" + new Date() + "收到服務端響應【" + loginResponsePacket.getMsg() + "】");// 給 連接綁定登錄成功標識if (Test_11_LoginUtil.isSuccess(loginResponsePacket)) {// 登錄成功Test_11_LoginUtil.markAsLogin(ctx.channel());System.out.println("客戶端:" + new Date() + "登錄成功");} else {// 登錄失敗System.out.println("客戶端:" + new Date() + "登錄失敗,原因-->" + loginResponsePacket.getMsg());}break;case Test_11_Packet.Command.MESSAGE_RESPONSE :Test_11_MessageResponsePacket messageResponsePacket = (Test_11_MessageResponsePacket)packet;System.out.println("客戶端:"+new Date()+ "收到服務端消息 --> "+ messageResponsePacket.getMessage());break;default:break;}}}/*** 服務端處理邏輯* * @author outman*/ class Test_11_serverHandler extends ChannelInboundHandlerAdapter {/*** 連接成功時觸發*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有數據可讀時觸發*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解碼Test_11_Packet packet = Test_11_PacketCodec.INSTANCE.deCode(byteBuf);// 根據指令執行對應的處理邏輯switch (packet.getCommand()) {case Test_11_Packet.Command.LOGIN_REQUEST:Test_11_LoginRequestPacket loginRequestPacket = (Test_11_LoginRequestPacket) packet;// 模擬校驗成功System.out.println("服務端:" + new Date() + "【" + loginRequestPacket.getUserName() + "】 登陸成功");// 給客戶端響應Test_11_LoginResponsePacket loginResponsePacket = new Test_11_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陸成功!");// 編碼byteBuf = Test_11_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);// 寫出數據ctx.channel().writeAndFlush(byteBuf);break;case Test_11_Packet.Command.MESSAGE_REQUEST :// 處理消息Test_11_MessageRequestPacket messageRequestPacket = (Test_11_MessageRequestPacket)packet;System.out.println("服務端:"+ new Date() + "收到客戶端消息 --> "+ messageRequestPacket.getMessage());Test_11_MessageResponsePacket messageResponsePacket = new Test_11_MessageResponsePacket();String msg = messageRequestPacket.getMessage();msg = msg.replace("?", "!");msg = msg.replace("?", "!");messageResponsePacket.setMessage("服務端回復:【"+msg+"】");Test_11_PacketCodec.INSTANCE.enCode(byteBuf, messageResponsePacket);ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服務端:" + new Date() + "收到未知的指令【" + packet.getCommand() + "】");break;}}}/*** 數據包抽象類* * @author outman*/ @Data abstract class Test_11_Packet {// 協議版本號private byte version = 1;// 獲取指定標識public abstract byte getCommand();// 指令集合public interface Command {// 登錄指令public static final byte LOGIN_REQUEST = 1;// 登陸響應指令public static final byte LOGIN_RESPONSE = 2;// 發送消息指令public static final byte MESSAGE_REQUEST = 3;// 回復消息指令public static final byte MESSAGE_RESPONSE = 4;} }/*** 序列化抽象接口* * @author outman*/ interface Test_11_Serializer {// 獲取序列化算法標識byte getSerializerAlgorithm();// 序列化算法標識集合interface SerializerAlgorithm {// JSON 序列化算法標識public static final byte JSONSerializerAlgrothm = 1;}// 默認的序列化算法public Test_11_Serializer DEFAULT = new Test_11_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet);// 反序列化<T> T deSerialize(byte[] bs, Class<T> clazz);}/*** 數據包編解碼類* * @author outman*/ class Test_11_PacketCodec {// 魔數private static final int MAGIC_NUMBER = 0x12345678;// 單例public static Test_11_PacketCodec INSTANCE = new Test_11_PacketCodec();// 注冊 序列化類private Class[] serializerArray = new Class[] { Test_11_JSONSerializer.class };// 注冊抽象數據包類private Class[] packetArray = new Class[] { Test_11_LoginRequestPacket.class, Test_11_LoginResponsePacket.class ,Test_11_MessageRequestPacket.class ,Test_11_MessageResponsePacket.class};// 序列化算法標識 和對應的序列化類映射private static Map<Byte, Class<? super Test_11_Serializer>> serializerMap;// 指令標識和對應的數據包抽象類映射private static Map<Byte, Class<? super Test_11_Packet>> packetMap;// 初始化 兩個映射private Test_11_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_11_Serializer) clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_11_Packet) clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 編碼public ByteBuf enCode(ByteBuf byteBuf, Test_11_Packet packet) {// 序列化數據包byte[] bs = Test_11_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 寫入魔數byteBuf.writeInt(MAGIC_NUMBER);// 寫入協議版本號byteBuf.writeByte(packet.getVersion());// 寫入指令標識byteBuf.writeByte(packet.getCommand());// 寫入序列化算法標識byteBuf.writeByte(Test_11_Serializer.DEFAULT.getSerializerAlgorithm());// 寫入數據長度byteBuf.writeInt(bs.length);// 寫入數據byteBuf.writeBytes(bs);return byteBuf;}// 解碼public Test_11_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳過魔數校驗byteBuf.skipBytes(4);// 跳過版本號校驗byteBuf.skipBytes(1);// 獲取指令標識byte command = byteBuf.readByte();// 獲取序列化算法標識byte serializerAlgorthm = byteBuf.readByte();// 獲取數據長度int len = byteBuf.readInt();// 獲取數據byte[] bs = new byte[len];byteBuf.readBytes(bs);// 獲取對應的序列化算法類Test_11_Serializer serializer = getSerializer(serializerAlgorthm);// 獲取對應的數據包類Test_11_Packet packet = getPacket(command);if (serializer != null && packet != null) {// 反序列化數據包return serializer.deSerialize(bs, packet.getClass());} else {throw new RuntimeException("沒有找到對應的序列化實現或數據包實現");}}private static Test_11_Packet getPacket(byte command) throws Exception {if(packetMap.get(command) == null) {throw new RuntimeException("未注冊的數據包類型-->"+ command);}return (Test_11_Packet) packetMap.get(command).newInstance();}private static Test_11_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_11_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登錄請求數據包實體類* * @author outman*/ @Data class Test_11_LoginRequestPacket extends Test_11_Packet {private int userId;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登錄響應數據包實體類* * @author outman*/ @Data class Test_11_LoginResponsePacket extends Test_11_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 響應碼集合*/interface Code {// 成功的響應碼public static final int SUCCESS = 10000;// 失敗的響應碼public static final int FAIL = 10001;} }/*** Json序列化實現類* * @author outman*/ class Test_11_JSONSerializer implements Test_11_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_11_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T> T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}/*** 2019年1月3日* * @author outman** 發送消息對象*/ @Data class Test_11_MessageRequestPacket extends Test_11_Packet {private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_REQUEST;}}/*** 2019年1月3日* * @author outman 回復消息對象*/ @Data class Test_11_MessageResponsePacket extends Test_11_Packet {private String message;@Overridepublic byte getCommand() {return Command.MESSAGE_RESPONSE;}}/*** 2019年1月21日* * @author outman* * 連接 屬性**/ interface Test_11_ChannelAttributes {// 連接登錄標識屬性AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login"); }/*** 2019年1月21日* * @author outman* * 登錄相關工具類**/ class Test_11_LoginUtil {/*** @desc 判斷登錄成功* @param loginResponsePacket* @return 是否登錄成功*/public static boolean isSuccess(Test_11_LoginResponsePacket loginResponsePacket) {boolean flag = false;if (loginResponsePacket.getCode() == Test_11_LoginResponsePacket.Code.SUCCESS) {flag = true;}return flag;}/*** @desc 標識連接登錄成功* @param channel* @return*/public static void markAsLogin(Channel channel) {channel.attr(Test_11_ChannelAttributes.LOGIN).set(true);}/*** @desc 判斷是否登錄* @param channel* @return*/public static boolean hasLogin(Channel channel) {boolean flag = false;Boolean attr = channel.attr(Test_11_ChannelAttributes.LOGIN).get();if (attr != null)flag = attr;return flag;} }十、 總結
十一 、 思考
總結
以上是生活随笔為你收集整理的Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 玩转LogBack
- 下一篇: Netty实战 IM即时通讯系统(十一)