基于Netty实现群聊功能
有你的日子你就是一切,沒你的日子一切都是你
又是一天新的開始,讓我來帶領你打開任督二脈。。。
前言:
在前面有了對NIO、BIO知識的學習,以及對netty結構組的基本了解,接下來將學習一下如何使用netty去實現一個群聊功能,讀者可自行去對比基于NIO、BIO、Netty實現群聊功能的不同方式,以更深刻的理解IO網絡編程。
學習內容:
整體思路:
1) 編寫一個Netty群聊系統,實現服務器端和客戶端之間的數據簡單通訊(非阻塞)
2) 實現多人聊天
3) 服務器端:可以監測用戶上線,離線,并實現消息轉發
4) 客戶端: 通過channel可以無阻塞發送消息給其他所有用戶,同時可接收到其他用戶發送的消息)
5)目的:進一步理解Netty非阻塞網絡編程機制
具體實現
服務端代碼
import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import java.util.Scanner;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:05 * @description: 群聊服務端 * @Version: 1.0 */ public class NettyChartServer {/*** 服務端口號*/private int port;public NettyChartServer(int port) {this.port = port;}/*** 服務執行:用于處理客戶端請求*/public void run() {//服務線程組EventLoopGroup bossgroup = new NioEventLoopGroup(1);//工作組線程池,默認為CPU核數*2EventLoopGroup workgroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossgroup, workgroup).channel(NioServerSocketChannel.class) //可以監聽新進來的TCP連接的通道.option(ChannelOption.SO_BACKLOG, 128) // 設置線程隊列得到連接個數.childOption(ChannelOption.SO_KEEPALIVE, true) // 設置保持活動連接狀態.handler(new LoggingHandler(LogLevel.INFO))//心跳檢測日志.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//獲取到piepline管道ChannelPipeline pipeline = socketChannel.pipeline();//向pipeline加入解碼器pipeline.addLast("decoder", new StringDecoder());//向pipeline加入編碼器pipeline.addLast("encoder", new StringEncoder());//心跳檢測handlepipeline.addLast("idcheck",new IdleStateHandler(3,5,8, TimeUnit.SECONDS));//加入自己的業務處理handlerpipeline.addLast("handle",new MyNettyServerHandler());}});ChannelFuture ch = bootstrap.bind(port).sync();System.out.println("服務器 is ready-------");//異步監聽端口ch.addListeners(e->{if(e.isSuccess()){System.out.println("監聽端口 "+port+" 成功");}else {System.out.println("監聽端口 "+port+" 失敗");}});//異步關閉通道ch.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//優雅退出線程池bossgroup.shutdownGracefully();workgroup.shutdownGracefully();}}public static void main(String[] args) {NettyChartServer server=new NettyChartServer(8888);server.run();} }服務端主要作用是對8888端口進行監聽,等待客戶端的請求。使用了主從Reactor模式,去實現接收處理多個客戶端請求。利用自定義業務處理handler 完成對信息的獲取,以及客戶端直接信息轉發。
服務端業務處理器
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.text.SimpleDateFormat; import java.util.HashMap;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:43 * @description: 業務自定義的邏輯處理類:它是入站 ChannelInboundHandler 類型的處理器,負責接收解碼后的 HTTP 請求數據,并將請求處理結果寫回客戶端。 * @Version: 1.0 */ public class MyNettyServerHandler extends SimpleChannelInboundHandler<String> {//定義一個channerl集合,類似于List<Channel> ,用于存儲不同的channel通道轉發信息private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);//設計一個集合可存儲通道通道對應的用戶,未開發改功能private static HashMap<String,Channel> channelHashMap=new HashMap<>();private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 該channel建立連接后: 生命周期 handlerAdded》 channelRegistered-》channelActive,* 表示連接建立,一旦連接,第一個執行* @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerAdded------");Channel channel = ctx.channel();channelGroup.add(channel);channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"加入聊天");}/*** 連接中斷* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerRemoved------");Channel channel=ctx.channel();//服務下線,移除該channelchannelGroup.remove(channel);//并通知,已離線channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"離開了~");}/*** 表示該channel處于活動狀態* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelActive------");System.out.println(ctx.channel().remoteAddress() + " 上線了~");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelInactive------");System.out.println(ctx.channel().remoteAddress() + " 下線了~");}/*** 讀取消息* @param channelHandlerContext* @param s* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println("------------channelRead0------");Channel channel = channelHandlerContext.channel();//將消息轉發給其他客戶端,并且排除自己channelGroup.forEach(e->{if (channel!=e){//e.writeAndFlush("【客戶端】"+channel.remoteAddress()+"說:"+msg);}else {e.writeAndFlush("【自己】"+"說:"+msg);}});}/*** 消息讀取后* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelReadComplete------");}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelRegistered------");}/*** 離線生命:* @param ctx exceptionCaught-》channelInactive》channelUnregistered》handlerRemoved* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelUnregistered------");}/*** 處理心跳檢測* @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// System.out.println("------------userEventTriggered------");if (evt instanceof IdleStateEvent){//向下轉型IdleStateEvent event=(IdleStateEvent)evt;String eventType=null;switch (event.state()){case READER_IDLE:eventType="讀空閑";break;case WRITER_IDLE:eventType="寫空閑";break;case ALL_IDLE:eventType="讀寫空閑";break;default:break;}System.out.println(ctx.channel().remoteAddress()+"---超時時間---"+eventType);System.out.println("服務器做相應操作");}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("------------exceptionCaught------");} }業務自定義的邏輯處理類:它是入站 ChannelInboundHandler 類型的處理器,負責接收解碼后的 HTTP 請求數據,并將請求處理結果寫回客戶端。當channle通道建立以后,便開啟了生命周期。其中 channelRegistered 是用于channnel通道注冊,channelActive用于查看通道是否活躍 、channelRead0用于讀取客戶端信息、exceptionCaught用于異常處理
客戶端代碼
客戶端啟動類
import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil;import java.util.Scanner;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 10:06 * @description: 客戶端 * @Version: 1.0 */ public class NettyChartClient {private String ip;private int port;public NettyChartClient(String ip, int port) {this.ip = ip;this.port = port;}/*** 客戶端啟動*/public void run() {//創建工作線程池 CPU核數*2EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//得到PipelineChannelPipeline pipeline = socketChannel.pipeline();//加入相關handlerpipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", (ChannelHandler) new MyNettyClientHandler());}});System.out.println("---客戶端啟動了----");//連接服務端ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();Scanner can=new Scanner(System.in);while (can.hasNext()){String str=can.nextLine();channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(""+str, CharsetUtil.UTF_8));}//異步關閉通道channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}public static void main(String[] args) {NettyChartClient chartClient=new NettyChartClient("127.0.0.1",8888);chartClient.run();}}客戶端處理器
import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil;/** * @PackageName: com.netty.demo.chart * @author: youjp * @create: 2021-01-22 11:16 * @description: 客戶端處理器 * @Version: 1.0 */ public class MyNettyClientHandler extends SimpleChannelInboundHandler<String> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(""+msg);} }客戶端只需要獲取服務端傳來的消息,并且可以自己編寫消息發送即可。
運行測試
接下來,先將服務端啟動類啟動,然后再運行多個客戶端。查看控制臺,并發送消息,查看channel的生命周期
分別在各自的客戶端控制臺輸入信息,測試。基于Netty實現的群聊功能就這樣實現了
有興趣的老爺,可以關注我的公眾號【一起收破爛】,回復【006】獲取2021最新java面試資料以及簡歷模型120套哦~
總結
以上是生活随笔為你收集整理的基于Netty实现群聊功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jade选峰之后怎么去掉_教程丨用Jad
- 下一篇: 【修电脑】ctfmon.exe停止工作以