NIO介绍与Netty通信简单入门
NIO同步阻塞與同步非阻塞
BIO與NIO
IO(BIO)和NIO區別:其本質就是阻塞和非阻塞的區別
阻塞概念:應用程序在獲取網絡數據的時候,如果網絡傳輸數據很慢,就會一直等待,直到傳輸完畢為止。
非阻塞概念:應用程序直接可以獲取已經準備就緒好的數據,無需等待。
IO為同步阻塞形式,NIO為同步非阻塞形式,NIO并沒有實現異步,在JDK1.7后升級NIO庫包,支持異步非阻塞
同學模型NIO2.0(AIO)
BIO:同步阻塞式IO,服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善。?
NIO:同步非阻塞式IO,服務器實現模式為一個請求一個線程,即客戶端發送的連接請求都會注冊到多路復用器上,多路復用器輪詢到連接有I/O請求時才啟動一個線程進行處理。?
AIO(NIO.2):異步非阻塞式IO,服務器實現模式為一個有效請求一個線程,客戶端的I/O請求都是由OS先完成了再通知服務器應用去啟動線程進行處理。?
?
同步時,應用程序會直接參與IO讀寫操作,并且我們的應用程序會直接阻塞到某一個方法上,直到數據準備就緒:
或者采用輪訓的策略實時檢查數據的就緒狀態,如果就緒則獲取數據.
異步時,則所有的IO讀寫操作交給操作系統,與我們的應用程序沒有直接關系,我們程序不需要關系IO讀寫,當操作
系統完成了IO讀寫操作時,會給我們應用程序發送通知,我們的應用程序直接拿走數據極即可。
偽異步
由于BIO一個客戶端需要一個線程去處理,因此我們進行優化,后端使用線程池來處理多個客戶端的請求接入,形成客戶端個數M:線程池最大的線程數N的比例關系,其中M可以遠遠大于N,通過線程池可以靈活的調配線程資源,設置線程的最大值,防止由于海量并發接入導致線程耗盡。
原理:
當有新的客戶端接入時,將客戶端的Socket封裝成一個Task(該Task任務實現了java的Runnable接口)投遞到后端的線程池中進行處理,由于線程池可以設置消息隊列的大小以及線程池的最大值,因此,它的資源占用是可控的,無論多少個客戶端的并發訪問,都不會導致資源的耗盡或宕機。
??
使用多線程支持多個請求
服務器實現模式為一個連接一個線程,即客戶端有連接請求時服務器端就需要啟動一個線程進行處理,如果這個連接不做任何事情會造成不必要的線程開銷,當然可以通過線程池機制改善
//tcp服務器端...class?TcpServer {public?static?void?main(String[]?args)?throws?IOException {System.out.println("socket tcp服務器端啟動....");ServerSocket?serverSocket?=?new?ServerSocket(8080);//?等待客戶端請求try?{while?(true) {Socket?accept?=?serverSocket.accept();new?Thread(new?Runnable() {@Overridepublic?void?run() {try?{InputStream?inputStream?=?accept.getInputStream();//?轉換成string類型byte[]?buf?=?new?byte[1024];int?len?=?inputStream.read(buf);String?str?=?new?String(buf, 0,?len);System.out.println("服務器接受客戶端內容:"?+?str);}?catch?(Exception?e) {//?TODO: handle exception}}}).start();}}?catch?(Exception?e) {e.printStackTrace();}?finally?{serverSocket.close();}}}public?class?TcpClient {public?static?void?main(String[]?args)?throws?UnknownHostException, IOException {System.out.println("socket tcp?客戶端啟動....");Socket?socket?=?new?Socket("127.0.0.1", 8080);OutputStream?outputStream?=?socket.getOutputStream();outputStream.write("我是螞蟻課堂".getBytes());socket.close();}}?
使用線程池管理線程
//tcp服務器端...class?TcpServer {public?static?void?main(String[]?args)?throws?IOException {ExecutorService?newCachedThreadPool?= Executors.newCachedThreadPool();System.out.println("socket tcp服務器端啟動....");ServerSocket?serverSocket?=?new?ServerSocket(8080);//?等待客戶端請求try?{while?(true) {Socket?accept?=?serverSocket.accept();//使用線程newCachedThreadPool.execute(new?Runnable() {@Overridepublic?void?run() {try?{InputStream?inputStream?=?accept.getInputStream();//?轉換成string類型byte[]?buf?=?new?byte[1024];int?len?=?inputStream.read(buf);String?str?=?new?String(buf, 0,?len);System.out.println("服務器接受客戶端內容:"?+?str);}?catch?(Exception?e) {//?TODO: handle exception}}});}}?catch?(Exception?e) {e.printStackTrace();}?finally?{serverSocket.close();}}}public?class?TcpClient {public?static?void?main(String[]?args)?throws?UnknownHostException, IOException {System.out.println("socket tcp?客戶端啟動....");Socket?socket?=?new?Socket("127.0.0.1", 8080);OutputStream?outputStream?=?socket.getOutputStream();outputStream.write("我是螞蟻課堂".getBytes());socket.close();}}??
IO模型關系
什么是阻塞
阻塞概念:應用程序在獲取網絡數據的時候,如果網絡傳輸很慢,那么程序就一直等著,直接到傳輸完畢。
什么是非阻塞
應用程序直接可以獲取已經準備好的數據,無需等待.
IO為同步阻塞形式,NIO為同步非阻塞形式。NIO沒有實現異步,在JDK1.7之后,升級了NIO庫包
,支持異步費阻塞通訊模型NIO2.0(AIO)
?
Netty快速入門
什么是Netty
?Netty?是一個基于?JAVA NIO?類庫的異步通信框架,它的架構特點是:異步非阻塞、基于事件驅動、高性能、高可靠性和高可定制性。
Netty應用場景
1.分布式開源框架中dubbo、Zookeeper,RocketMQ底層rpc通訊使用就是netty。
2.游戲開發中,底層使用netty通訊。
為什么選擇netty
在本小節,我們總結下為什么不建議開發者直接使用JDK的NIO類庫進行開發的原因:
1)????? NIO的類庫和API繁雜,使用麻煩,你需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;
2)??????需要具備其它的額外技能做鋪墊,例如熟悉Java多線程編程,因為NIO編程涉及到Reactor模式,你必須對多線程和網路編程非常熟悉,才能編寫出高質量的NIO程序;
3)??????可靠性能力補齊,工作量和難度都非常大。例如客戶端面臨斷連重連、網絡閃斷、半包讀寫、失敗緩存、網絡擁塞和異常碼流的處理等等,NIO編程的特點是功能開發相對容易,但是可靠性能力補齊工作量和難度都非常大;
4)????? JDK NIO的BUG,例如臭名昭著的epoll bug,它會導致Selector空輪詢,最終導致CPU 100%。官方聲稱在JDK1.6版本的update18修復了該問題,但是直到JDK1.7版本該問題仍舊存在,只不過該bug發生概率降低了一些而已,它并沒有被根本解決。
?
Netty通信實踐
?
Netty服務器端?
package com.xiaofeng.netty.server;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel;/*** @author xiaofeng* @version V1.0* @title: NettyServer* @package: com.xiaofeng.netty.server* @description: netty server* @date 2019/12/17 11:24*/ public class NettyServer {private static class SingletionNettyServer {static final NettyServer instance = new NettyServer();}public static NettyServer getInstance() {return SingletionNettyServer.instance;}private EventLoopGroup mainGroup;private EventLoopGroup subGroup;private ServerBootstrap server;private ChannelFuture future;public NettyServer() {//主線程組,用于接受客戶端的連接,但是不做任何處理,跟老板一樣,不做事mainGroup = new NioEventLoopGroup();//從線程組, 老板線程組會把任務丟給他,讓手下線程組去做任務subGroup = new NioEventLoopGroup();//server啟動類server = new ServerBootstrap();//建立聯系server.group(mainGroup, subGroup).channel(NioServerSocketChannel.class).childHandler(new NettyServerInitialzer());}public void start() {this.future = server.bind(9999);System.err.println("netty server 啟動完畢...");}public static void main(String[] args) {NettyServer.getInstance().start();} } package com.xiaofeng.netty.server;import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;/*** @author xiaofeng* @version V1.0* @title: NettyServerInitialzer* @package: com.xiaofeng.netty.server* @description: 服務端初始化器* @date 2019/12/17 11:30*/ public class NettyServerInitialzer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("stringD", new StringDecoder());pipeline.addLast("stringC", new StringEncoder());pipeline.addLast("http", new HttpClientCodec());// 自定義的handlerpipeline.addLast(new ServerChatHandler());}} package com.xiaofeng.netty.server;import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor;/*** @author xiaofeng* @version V1.0* @title: ServerChatHandler* @package: com.xiaofeng.netty.server* @description: 處理消息的handler* @date 2019/12/17 11:38*/ public class ServerChatHandler extends SimpleChannelInboundHandler<String> {// 用于記錄和管理所有客戶端的channlepublic static ChannelGroup users =new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s)throws Exception {Channel channel = ctx.channel();for (Channel ch : users) {if (ch == channel) {System.out.println("收到來自客戶端[" + channel.id().asShortText() + "]的消息:" + s);ctx.writeAndFlush("[你說]:" + s + "\n");} else {ctx.writeAndFlush("[" + channel.remoteAddress() + "]" + s + "\n");}}}/*** 當客戶端連接服務端之后(打開連接)* 獲取客戶端的channle,并且放到ChannelGroup中去進行管理*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();System.out.println("客戶端加入,channelId為:" + channelId);users.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {String channelId = ctx.channel().id().asShortText();System.out.println("客戶端被移除,channelId為:" + channelId);// 當觸發handlerRemoved,ChannelGroup會自動移除對應客戶端的channelusers.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();// 發生異常之后關閉連接(關閉channel),隨后從ChannelGroup中移除ctx.channel().close();users.remove(ctx.channel());} }?
Netty客戶端
?
package com.xiaofeng.netty.client;import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader;/*** @author xiaofeng* @version V1.0* @title: NettyClient* @package: com.xiaofeng.netty.client* @description: netty client* @date 2019/12/17 11:14*/ public class NettyClient {private static class SingletionNettyClient {static final NettyClient instance = new NettyClient();}public static NettyClient getInstance() {return SingletionNettyClient.instance;}private EventLoopGroup eventGroup;private Bootstrap client;public NettyClient() {eventGroup = new NioEventLoopGroup();//創建客戶端啟動類client = new Bootstrap();client.group(eventGroup).channel(NioSocketChannel.class).handler(new NettyClientInitialzer());}public void start() throws InterruptedException, IOException {Channel channel = client.connect("127.0.0.1", 9999).sync().channel();while (true) {BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));String input = reader.readLine();if (input != null) {if ("quit".equals(input)) {System.exit(1);}channel.writeAndFlush(input);}}}public static void main(String[] args) throws InterruptedException, IOException {NettyClient.getInstance().start();} }?
package com.xiaofeng.netty.client;import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder;/*** @author xiaofeng* @version V1.0* @title: NettyClientInitialzer* @package: com.xiaofeng.netty.client* @description: 客戶端初始化器* @date 2019/12/17 11:29*/ public class NettyClientInitialzer extends ChannelInitializer<Channel> {@Overrideprotected void initChannel(Channel channel) throws Exception {//獲取管道ChannelPipeline pipeline = channel.pipeline();//添加編解碼器pipeline.addLast("stringD", new StringDecoder());pipeline.addLast("stringC", new StringEncoder());pipeline.addLast("http", new HttpClientCodec());// 自定義的handlerpipeline.addLast(new ClientChatHandler());} } package com.xiaofeng.netty.client;import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;/*** @author xiaofeng* @version V1.0* @title: ClientChatHandler* @package: com.xiaofeng.netty.client* @description: 處理消息的handler* @date 2019/12/17 11:38*/ public class ClientChatHandler extends SimpleChannelInboundHandler<String> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String s)throws Exception {System.out.println(s);}}TCP粘包、拆包問題解決方案
什么是粘包/拆包
???一個完整的業務可能會被TCP拆分成多個包進行發送,也有可能把多個小的包封裝成一個大的數據包發送,這個就是TCP的拆包和封包問題。
下面可以看一張圖,是客戶端向服務端發送包:
1.?第一種情況,Data1和Data2都分開發送到了Server端,沒有產生粘包和拆包的情況。
2.?第二種情況,Data1和Data2數據粘在了一起,打成了一個大的包發送到Server端,這個情況就是粘包。
3.?第三種情況,Data2被分離成Data2_1和Data2_2,并且Data2_1在Data1之前到達了服務端,這種情況就產生了拆包。
由于網絡的復雜性,可能數據會被分離成N多個復雜的拆包/粘包的情況,所以在做TCP服務器的時候就需要首先解決拆包/
解決辦法
? ? ?1.消息定長,報文大小固定長度,不夠空格補全,發送和接收方遵循相同的約定,這樣即使粘包了通過接收方編程實現獲取定長報文也能區分。
sc.pipeline().addLast(new FixedLengthFrameDecoder(10));2.包尾添加特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字符作為報文分隔符,接收方通過特殊分隔符切分報文區分。
ByteBuf?buf?= Unpooled.copiedBuffer("_mayi".getBytes());sc.pipeline().addLast(new?DelimiterBasedFrameDecoder(1024,?buf));將消息分為消息頭和消息體,消息頭中包含表示信息的總長度(或者消息體長度)的字段?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的NIO介绍与Netty通信简单入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 消息称华为车 BU 王军被停职,余承东独
- 下一篇: ChatGPT 很牛?其实它只是个老版本