Netty源码分析--NIO(一)
? ? ? 好久沒寫博客了,最近打算花些時間把Netty的源碼好好讀一讀,下面是本人在學習的過程中的一些筆記,不能確保自己思考的完全是正確的,如果有錯誤,歡迎大家指正。
? ? ? 由于本人的語文功底爛的很,通篇使用大白話來講解0.0,有一些概念上的東西,博主可能不會明確的給出定義,建議使用過Netty的同學一起來研究。
? ? ? 好了,我們一起來看下吧。
? ? ? Netty 是一款用于快速開發的高性能的網絡應用程序的Java框架。說到Netty, 我們先對幾種I/O模型進行一下比對:
? ? ???
? ? ? ?那么偽異步IO是啥呢?
? ? ? ?其實就是加入了線程池(ThreadPoolExecutor),對接入的客戶端的Socket封裝成task,實現了Runnable接口,然后投遞到線程池中處理,這樣就避免了BIO那種一個客戶端連接一個IO線程的情況,防止資源耗盡和宕機。但是這種方式底層的通信依然采用了同步阻塞模型,無法從根本上解決問題。
? ? ? ?那么AIO又是啥呢?
? ? ? ?NIO2.0 引入了新的一步通道的概念,并提供了異步文件通道和異步套接字的實現。它不需要通過多路復用器對注冊的通道進行輪詢操作即可實現異步讀寫,屬于真正意義上的異步非阻塞IO。
? ? ? ?1、通過java.util.concurrent.Future 類來異步獲取操作的結果。
? ? ? ?2、在執行異步操作的時候傳入一個CompletionHandler接口的實現類,作為操作完成的回調。
? ? ? ? ? ? 接口有以下兩個方法。? ? ?
1 /** 2 * Invoked when an operation has completed. 3 * 4 * @param result 5 * The result of the I/O operation. 6 * @param attachment 7 * The object attached to the I/O operation when it was initiated. 8 */ 9 void completed(V result, A attachment); 10 11 /** 12 * Invoked when an operation fails. 13 * 14 * @param exc 15 * The exception to indicate why the I/O operation failed 16 * @param attachment 17 * The object attached to the I/O operation when it was initiated. 18 */ 19 void failed(Throwable exc, A attachment);?
? ? ? ?好的,下面也稍微回顧一下NIO,以及NIO涉及的幾個關鍵組件:
- ?緩沖區 Buffer?
- ?通道 Channel
- ?多路復用器 Selector
? ? ? ? ??
? ? ? ? ?里面講述了,buffer抽象類 是一個數據容器,除了內容,還有一些屬性,capacity、limit、position。
? ? ? ? ?capacity 是容器的容量,這個值一旦被創建,就無法修改。 limit 是 不應該被讀或寫的第一個元素的位置。 position 是指下一個將會被讀或寫的位置,這個值一定小于等于limit。
? ? ? ? ?另外javadoc中還提到了mark和reset, 其中mark其實就是打一個標記,把當前的position賦給mark。? 那么 reset 的 描述是這樣的 把當前的position 改成之前mark的位置。
? ? ? ? ?
? ? ? ? ?ok,由上面的文檔可以得出下面的順序??0 <= mark <= position <= limit <= capacity
? ? ? ? ?其實Buffer中還有一個非常重要的方法必須要說一下,那就是 flip() ,看下javadoc
? ? ? ? ?
? ? ? ? ?這個其實就是把 當前的limit = position, position = 0, 當然如果之前有mark也會失效,設置成-1, 當你往buffer中寫了數據的時候,只有執行flip()方法, 才可以正確的讀取數據,? doc中還指出這個方法經常和compact()方法連著用。同樣,貼出javadoc:
? ? ? ??
? ? ? ?相當于什么呢,就相當于是清理掉已經讀取過得數據,比如 position = 5 , limit = 10,前5個數據經讀取過了,那么將新建一個buffer,將當前position到limit的數據拷貝到一個新的Buffer中,那么新的buffer的postion = limit-postion, limit = capacity, 好了,看源碼是這樣的,接下來就是驗證一下了:
1 ByteBuffer buffer = ByteBuffer.allocate(10); 2 buffer.put("helloworld".getBytes()); 3 System.out.println(buffer.position() + ":" + buffer.limit()); 4 buffer.flip(); 5 System.out.println(buffer.position() + ":" + buffer.limit()); 6 byte[] bytes = new byte[buffer.limit() + 1]; 7 for(int i=0; i<6; i++) { 8 bytes[i] = buffer.get(); 9 } 10 System.out.println(new String(bytes)); 11 System.out.println(buffer.position() + ":" + buffer.limit()); 12 System.out.println(buffer); 13 buffer.compact(); 14 System.out.println(buffer.position() + ":" + buffer.limit()); 15 System.out.println(buffer);測試結果如下:
10:10 0:10 hellow 6:10 java.nio.HeapByteBuffer[pos=6 lim=10 cap=10] 4:10 java.nio.HeapByteBuffer[pos=4 lim=10 cap=10]?好了,Buffer的源碼看到這里也算是差不多了。
2、Channel
? ? ? Channel是一個通道, 它就像自來水管一樣,網絡數據通過Channel讀取與寫入,通道與流的不同之處在于通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutStream的子類),而通道可以用于讀、寫或者二者同時進行, 屬于全雙工。
? ? ?這里我們也來看下源碼吧,就看ServerSocketChannel
? ? ?提供了幾個比較重要的api:??
? ? ?public static ServerSocketChannel open() throws IOException; // 通過該方法創建一個Channel
? ? ?看下javadoc , 明確說明了 新創建的channel是沒有任何綁定的,在進行accepted之前需要綁定一個地址。
? ??
? ? ?public final ServerSocketChannel bind(SocketAddress local);// 綁定一個端口號
? ? ?public abstract SocketChannel accept() throws IOException; // 接收新的客戶端
? ? ?
?3、Selector 多路復用器 ,簡單來說呢,Selector 會不斷的輪訓注冊在其上的Channel, 如果某個Channel上面發生了讀寫等事件,這個Channel就會處理就緒狀態, 會被Selector輪訓出來,然后拿到SelectionKey Set集合,從而獲取到每一個就緒狀態的Channel,進行后續的I/O操作。
? ? ? ?由于JDK使用了epoll() 代替傳統的select實現,所以沒有最大句柄的1024/2048的限制, 只需要一個線程負責Selector的輪訓,就可以接入成千上萬的客戶端。NB
? ? ? ?
? ? ? ? channel將會通過一個SelectionKey注冊到一個selector上,一個selector 通過 open方法去創建。
? ? ? ?
? ? ? ? 這一段著重指出,selectionKey集合只能通過 set 集合的 remove() 方法 或者 一個迭代器的 remove() 方法來移除。其余的方法都不可以修改 selected-key 。
? ? ? ? 好了,看到這里,有些朋友可能似懂非懂,但是看下下面的單元測試一下子就懂了。
? ? ? ? 這段代碼實現了Nio的服務器端,接收到客戶端消息后,然后通知所有的客戶端。
1 private static final Map<String, SocketChannel> clientMap = new ConcurrentHashMap(); 2 3 public static void main(String[] args) { 4 5 try { 6 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 創建一個Channel 7 serverSocketChannel.configureBlocking(false); // 設置為非阻塞 8 serverSocketChannel.bind(new InetSocketAddress(8899)); // 綁定端口 9 10 Selector selector = Selector.open(); // 創建一個Selector 11 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 將Channel注冊到Selector上,設置selectionKey 為 accept, 準備接收新的客戶端連接 12 13 while (true) { // 死循環不斷輪訓,查看 是否有準備就緒的channel 14 selector.select(); // 阻塞等到就緒的channel 15 Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 獲取到就緒的selectionKeys集合 16 selectionKeys.forEach(value -> { 17 try { 18 if(value.isAcceptable()) { // 接收新的客戶端事件 19 ServerSocketChannel channel = (ServerSocketChannel)value.channel(); // 獲取channel 20 SocketChannel clientChannel = channel.accept(); // 獲取客戶端的 socketChannel 21 clientChannel.configureBlocking(false); // 設置為非阻塞 22 String clientId = UUID.randomUUID().toString(); 23 System.out.println("客戶端接入" + clientId); 24 clientMap.put(clientId, clientChannel); 25 clientChannel.register(selector, SelectionKey.OP_READ); // 這里重點說下, 當接收到新的客戶端后,接下來就是準備接收數據,所以這里就是注冊的是Read事件// 并且這里注冊到selector上的是客戶端對應的SocketChannel, 而不是ServerSocketChannel,
// 因為ServerScoketChannel只負責接收新的客戶端 26 } else if(value.isReadable()) { // 接收到read事件 27 SocketChannel clientChannel = (SocketChannel)value.channel(); // 所以這里是SocketChannel 28 ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配內存 29 int count = clientChannel.read(buffer); // 寫channel中的數據到Buffer中 30 if (count > 0) { 31 buffer.flip(); // 寫完之后,一定要執行flip。轉化成讀 32 Charset charset = Charset.forName("utf-8"); 33 String receiveMsg = String.valueOf(charset.decode(buffer).array()); 34 System.out.println("receiveMsg = " +receiveMsg); 35 Iterator<Map.Entry<String, SocketChannel>> it = clientMap.entrySet().iterator(); 36 String sendClient = null; 37 while (it.hasNext()) { 38 Map.Entry<String, SocketChannel> next = it.next(); 39 if(next.getValue() == clientChannel) { 40 sendClient = next.getKey(); 41 break; 42 } 43 } 44 it = clientMap.entrySet().iterator(); 45 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); 46 while (it.hasNext()) { 47 SocketChannel socketChannel = it.next().getValue(); 48 writeBuffer.clear(); 49 writeBuffer.put(("sendClient:" + sendClient + "發送了消息").getBytes()); 50 writeBuffer.flip(); 51 socketChannel.write(writeBuffer); 52 } 53 } 54 } 55 } catch (Exception e) { 56 e.printStackTrace(); 57 } 58 }); 59 selectionKeys.clear(); // 每次處理完這一批selectionKeys,一定要清空掉集合。 60 } 61 62 } catch (IOException e) { 63 e.printStackTrace(); 64 } finally { 65 } 66 }
? ? ok, 上面是我自己的一些理解,如果有問題歡迎大家指正。下一篇,我們將開始學習Netty的源碼。
?
轉載于:https://www.cnblogs.com/huxipeng/p/10714404.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Netty源码分析--NIO(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink 异步IO访问外部数据(mys
- 下一篇: Linux acl权限