NIO详解(四):NIO编程
1. NIO類庫簡介
1.1 緩沖區Buffer
Buffer是一個對象,它包含了一些要寫入或者要讀出的數據。在NIO類庫中加入Buffer對象,體現了新庫和原來I/O的一個重要區別。在NIO庫中,所有的數據都是用緩沖區處理的。在讀取數據時,它是直接讀取到緩沖區中的;在寫入到緩沖區時。任何時候訪問NIO中的數據,都是通過緩沖區進行操作。
緩沖區實質上是一個數組。通常它是一個字節數據(ByteBuffer),也可以使用其他種類的數組。但是一個緩沖區不僅僅是一個數組,緩沖區還提供了對數據的結構化訪問以及維護讀寫位置(limit)等信息。
- ByteBuffer:字節緩沖區
- CharBuffer:字符緩沖區
- ShortBuffer:短整形緩沖區
- IntBuffer:整形緩沖區
- LongBuffer:長整形緩沖區
- FloatBuffer:浮點型緩沖區
- DoubleBuffer:雙精度浮點型緩沖區
緩沖區的繼承關系如下:
1.2 通道Channel
Channel是一個通道,它就像自來水管道一樣,網絡數據通過Channel讀取和寫入。通道與流不同之處在于通道它是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),而通道可以用于讀、寫或者二者同時進行。因為Channel是全雙工的,所以它可以比流更加映射底層操作系統地API。從類圖中可以看出,實際上Channel可以分為兩大類:用于網絡讀寫的SelectabaleChannel和用于文件操作的FileChannel。ServerSocketChannel是一個可以監聽新進來的TCP連接的通道,就像標準IO中的ServerSocket一樣。
1.3 多路復用器Selector
Select會不斷地輪詢注冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處于就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel集合,進行后續的I/O操作。一個多路復用器Selector可以同時輪詢多個Channel,由于JDK使用了epoll()代替傳統的select實現,所以它并沒有最大連接句柄1024/2048的輪詢,就可以接入成千上萬的客戶端。
2. NIO服務端序列圖
一。 打開ServerSocketChannel,用于監聽客戶端的連接。
ServerSocketChannel servChannel=ServerSocketChannel.open();二。綁定監聽端口,設置連接為非阻塞狀態。
servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024);三。創建Reactor線程,創建多路復用器并啟動線程。
Selector selector = Selector.open();四。將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監聽ACCEPT事件
servChannel.register(selector, SelectionKey.OP_ACCEPT);五。多路復用器在線程run方法的無線循環體內輪詢準備就緒的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路。
if (key.isAcceptable()) {// Accept the new connectionServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept(); }七。設置客戶端鏈路為非阻塞模式
sc.configureBlocking(false);八。將接入的客戶端連接注冊到Reactor線程的多路復用器上,監聽讀操作,讀取客戶端發送的網絡消息。
sc.register(selector, SelectionKey.OP_READ);九。異步讀取客戶端請求消息到緩沖區。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}十。對ByteBuffer進行編碼解碼,如果有半包消息指針reset,繼續讀取后續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }十一。將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
socketChannel.write(buffer).3. NIO客戶端序列圖
一。打開SocketChannel,綁定客戶端本地地址。
SocketChannel clientChannel = SocketChannel.open();二。設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數。
socketChannel.configureBlocking(false); socketChannel.socket().setReuseAddress(true);三。異步連接服務器。判斷是否連接成功,如果連接成功,則直接注冊讀取狀態到多路復用器中,如果當前沒有連接成功,則向Reactor的多路復用器注冊OP_CONNECT狀態為,監聽服務器端的TCP ACK應答。
// 如果直接連接成功,則注冊到多路復用器上,發送請求消息,讀應答if (socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel);} elsesocketChannel.register(selector, SelectionKey.OP_CONNECT);四。創建Reactor線程,創建多路復用器并啟動線程。
Selector selector = Selector.open();五。多路復用器在線程run方法的無線循環體內輪詢準備就緒的Key。
while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}六。接受connect事件處理。判斷連接結果,如果連接成功,注冊連接事件到多路復用器。注冊讀事件到多路復用器中。 ```java if (key.isConnectable()) {if (sc.finishConnect()) {sc.register(selector, SelectionKey.OP_READ);doWrite(sc);} elseSystem.exit(1);// 連接失敗,進程退出}七。異步讀取客戶端請求消息到緩沖區。
if (key.isReadable()) {// Read the dataSocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);.....}八。對ByteBuffer進行編碼解碼,如果有半包消息指針reset,繼續讀取后續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排。
Object message=null; while(buffer.hasRemain()){bytebuffer.mark();Object message=decode(byteBuffer);if(message==null){byteBufer.reset();break;}messageList.add(message); } if(!bytebuffer.hasRemain()){byteBuffer.clear(); }elsebyteBuffer.compact();if(messageList!=null & !messageList.isEmpty()){for(Obbject messageE:messagList){handlerTask(messageE)} }九。將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
socketChannel.write(buffer).4. 總結
通過源碼分析,我們發現NIO編程的難度確實比同步阻塞BIO的大很多,我們的NIO程序中還沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼會更加復雜。使用NIO編程的優點如下:
- 客戶端發起連接的操作是異步的,可以通過多路復用器注冊OP_CONNECT等待后續結果,不需要像之前的客戶端那樣被同步阻塞。
- SocketChannel的讀寫操作是異步的,如果沒有可讀寫的數據它不會等待,直接返回,這樣I/O通信線程就可以處理其他鏈路,不需要同步等待這個鏈路可用。
- 線程模型的優化:由于JDK的Selector在Linux等主流操作系統上通過epoll實現,它沒有連接句柄的限制。
總結
以上是生活随笔為你收集整理的NIO详解(四):NIO编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Netty详解(二)Linux 网络IO
- 下一篇: NIO详解(五):Buffer详解