浅谈(Java)AIO-异步IO
👦博主介紹:程序員悟啦(烏拉~)
?個人倉庫:碼云
🔊座右銘:“懶”對一個人的毀滅性有多大,早起的重要性就多大。
📚免責聲明:文章由博主原創、部分文章整理于網絡,僅供學習和知識分享
💬相遇是緣,既然來了就拎著小板凳🪑坐下來一起嘮會兒👁?🗨,如果在文中有所收獲,請別忘了一鍵三連,動動你發財的小手👍,你的鼓勵,是我創作的動力🤤!
文章目錄
- Java AIO - 異步IO詳解
- 異步IO
- JAVA對AIO的支持
- JAVA AIO框架簡析
- 代碼實例
- 要點講解
- 還有改進可能
- 為什么還有Netty?
Java AIO - 異步IO詳解
主要對異步IO和Java中對AIO的支持詳解。
異步IO
阻塞式同步IO、非阻塞式同步IO、多路復用IO 這三種IO模型,以及JAVA對于這三種IO模型的支持。重點說明了IO模型是由操作系統提供支持,且這三種IO模型都是同步IO,都是采用的“應用程序不詢問我,我絕不會主動通知”的方式。
異步IO則是采用“訂閱-通知”模式: 即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,并且準備好數據后,在主動通知應用程序,觸發相應的函數:
和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術: IOCP(I/O Completion Port,I/O完成端口);
Linux下由于沒有這種異步IO技術,所以使用的是epoll(上文介紹過的一種多路復用IO技術的實現)對異步IO進行模擬。
JAVA對AIO的支持
JAVA AIO框架簡析
這里通過這個結構分析要告訴各位讀者JAVA AIO中類設計和操作系統的相關性
說明JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路復用IO技術模擬異步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,其引用的IOCP類型文檔注釋如是:
/** * Windows implementation of AsynchronousChannelGroup encapsulating an I/O * completion port. */全部完整代碼(建議從“java.nio.channels.spi.AsynchronousChannelProvider”這個類看起)。
特別說明一下,請注意圖中的“java.nio.channels.NetworkChannel”接口,這個接口同樣被JAVA NIO框架實現了,如下圖所示:
代碼實例
下面,通過一個代碼示例,來講解JAVA AIO框架的具體使用,先上代碼,在針對代碼編寫和運行中的要點進行講解:
package testASocket;import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator;/*** @author yinwenjie*/ public class SocketServer {static {BasicConfigurator.configure();}private static final Object waitObject = new Object();/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {/** 對于使用的線程池技術,我一定要多說幾句* 1、Executors是線程池生成工具,通過這個工具我們可以很輕松的生成“固定大小的線程池”、“調度池”、“可伸縮線程數量的池”。具體請看API Doc* 2、當然您也可以通過ThreadPoolExecutor直接生成池。* 3、這個線程池是用來得到操作系統的“IO事件通知”的,不是用來進行“得到IO數據后的業務處理的”。要進行后者的操作,您可以再使用一個池(最好不要混用)* 4、您也可以不使用線程池(不推薦),如果決定不使用線程池,直接AsynchronousServerSocketChannel.open()就行了。* */ExecutorService threadPool = Executors.newFixedThreadPool(20);AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);final AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group);//設置要監聽的端口“0.0.0.0”代表本機所有IP設備serverSocket.bind(new InetSocketAddress("0.0.0.0", 83));//為AsynchronousServerSocketChannel注冊監聽,注意只是為AsynchronousServerSocketChannel通道注冊監聽//并不包括為 隨后客戶端和服務器 socketchannel通道注冊的監聽serverSocket.accept(null, new ServerSocketChannelHandle(serverSocket));//等待,以便觀察現象(這個和要講解的原理本身沒有任何關系,只是為了保證守護線程不會退出)synchronized(waitObject) {waitObject.wait();}} }/*** 這個處理器類,專門用來響應 ServerSocketChannel 的事件。* @author yinwenjie*/ class ServerSocketChannelHandle implements CompletionHandler<AsynchronousSocketChannel, Void> {/*** 日志*/private static final Log LOGGER = LogFactory.getLog(ServerSocketChannelHandle.class);private AsynchronousServerSocketChannel serverSocketChannel;/*** @param serverSocketChannel*/public ServerSocketChannelHandle(AsynchronousServerSocketChannel serverSocketChannel) {this.serverSocketChannel = serverSocketChannel;}/*** 注意,我們分別觀察 this、socketChannel、attachment三個對象的id。* 來觀察不同客戶端連接到達時,這三個對象的變化,以說明ServerSocketChannelHandle的監聽模式*/@Overridepublic void completed(AsynchronousSocketChannel socketChannel, Void attachment) {ServerSocketChannelHandle.LOGGER.info("completed(AsynchronousSocketChannel result, ByteBuffer attachment)");//每次都要重新注冊監聽(一次注冊,一次響應),但是由于“文件狀態標示符”是獨享的,所以不需要擔心有“漏掉的”事件this.serverSocketChannel.accept(attachment, this);//為這個新的socketChannel注冊“read”事件,以便操作系統在收到數據并準備好后,主動通知應用程序//在這里,由于我們要將這個客戶端多次傳輸的數據累加起來一起處理,所以我們將一個stringbuffer對象作為一個“附件”依附在這個channel上//ByteBuffer readBuffer = ByteBuffer.allocate(50);socketChannel.read(readBuffer, new StringBuffer(), new SocketChannelReadHandle(socketChannel , readBuffer));}/* (non-Javadoc)* @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)*/@Overridepublic void failed(Throwable exc, Void attachment) {ServerSocketChannelHandle.LOGGER.info("failed(Throwable exc, ByteBuffer attachment)");} }/*** 負責對每一個socketChannel的數據獲取事件進行監聽。<p>* * 重要的說明: 一個socketchannel都會有一個獨立工作的SocketChannelReadHandle對象(CompletionHandler接口的實現),* 其中又都將獨享一個“文件狀態標示”對象FileDescriptor、* 一個獨立的由程序員定義的Buffer緩存(這里我們使用的是ByteBuffer)、* 所以不用擔心在服務器端會出現“竄對象”這種情況,因為JAVA AIO框架已經幫您組織好了。<p>* * 但是最重要的,用于生成channel的對象: AsynchronousChannelProvider是單例模式,無論在哪組socketchannel,* 對是一個對象引用(但這沒關系,因為您不會直接操作這個AsynchronousChannelProvider對象)。* @author yinwenjie*/ class SocketChannelReadHandle implements CompletionHandler<Integer, StringBuffer> {/*** 日志*/private static final Log LOGGER = LogFactory.getLog(SocketChannelReadHandle.class);private AsynchronousSocketChannel socketChannel;/*** 專門用于進行這個通道數據緩存操作的ByteBuffer<br>* 當然,您也可以作為CompletionHandler的attachment形式傳入。<br>* 這是,在這段示例代碼中,attachment被我們用來記錄所有傳送過來的Stringbuffer了。*/private ByteBuffer byteBuffer;public SocketChannelReadHandle(AsynchronousSocketChannel socketChannel , ByteBuffer byteBuffer) {this.socketChannel = socketChannel;this.byteBuffer = byteBuffer;}/* (non-Javadoc)* @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object)*/@Overridepublic void completed(Integer result, StringBuffer historyContext) {//如果條件成立,說明客戶端主動終止了TCP套接字,這時服務端終止就可以了if(result == -1) {try {this.socketChannel.close();} catch (IOException e) {SocketChannelReadHandle.LOGGER.error(e);}return;}SocketChannelReadHandle.LOGGER.info("completed(Integer result, Void attachment) : 然后我們來取出通道中準備好的值");/** 實際上,由于我們從Integer result知道了本次channel從操作系統獲取數據總長度* 所以實際上,我們不需要切換成“讀模式”的,但是為了保證編碼的規范性,還是建議進行切換。* * 另外,無論是JAVA AIO框架還是JAVA NIO框架,都會出現“buffer的總容量”小于“當前從操作系統獲取到的總數據量”,* 但區別是,JAVA AIO框架中,我們不需要專門考慮處理這樣的情況,因為JAVA AIO框架已經幫我們做了處理(做成了多次通知)* */this.byteBuffer.flip();byte[] contexts = new byte[1024];this.byteBuffer.get(contexts, 0, result);this.byteBuffer.clear();try {String nowContent = new String(contexts , 0 , result , "UTF-8");historyContext.append(nowContent);SocketChannelReadHandle.LOGGER.info("================目前的傳輸結果: " + historyContext);} catch (UnsupportedEncodingException e) {SocketChannelReadHandle.LOGGER.error(e);}//如果條件成立,說明還沒有接收到“結束標記”if(historyContext.indexOf("over") == -1) {return;}//=========================================================================// 和上篇文章的代碼相同,我們以“over”符號作為客戶端完整信息的標記//=========================================================================SocketChannelReadHandle.LOGGER.info("=======收到完整信息,開始處理業務=========");historyContext = new StringBuffer();//還要繼續監聽(一次監聽一次通知)this.socketChannel.read(this.byteBuffer, historyContext, this);}/* (non-Javadoc)* @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)*/@Overridepublic void failed(Throwable exc, StringBuffer historyContext) {SocketChannelReadHandle.LOGGER.info("=====發現客戶端異常關閉,服務器將關閉TCP通道");try {this.socketChannel.close();} catch (IOException e) {SocketChannelReadHandle.LOGGER.error(e);}} }要點講解
注意在JAVA NIO框架中,說到了一個重要概念“selector”(選擇器)。它負責代替應用查詢中所有已注冊的通道到操作系統中進行IO事件輪詢、管理當前注冊的通道集合,定位發生事件的通道等操操作;但是在JAVA AIO框架中,由于應用程序不是“輪詢”方式,而是訂閱-通知方式,所以不再需要“selector”(選擇器)了,改由channel通道直接到操作系統注冊監聽。
JAVA AIO框架中,只實現了兩種網絡IO通道“AsynchronousServerSocketChannel”(服務器監聽通道)、“AsynchronousSocketChannel”(socket套接字通道)。但是無論哪種通道他們都有獨立的fileDescriptor(文件標識符)、attachment(附件,附件可以使任意對象,類似“通道上下文”),并被獨立的SocketChannelReadHandle類實例引用。我們通過debug操作來看看它們的引用結構:
在測試過程中,啟動了兩個客戶端(客戶端用什么語言來寫都行,用阻塞或者非阻塞方式也都行,只要是支持 TCP Socket套接字的就行,然后我們觀察服務器端對這兩個客戶端通道的處理情況:
可以看到,在服務器端分別為客戶端1和客戶端2創建的兩個WindowsAsynchronousSocketChannelImpl對象為:
客戶端1: WindowsAsynchronousSocketChannelImpl: 760 | FileDescriptor: 762
客戶端2: WindowsAsynchronousSocketChannelImpl: 792 | FileDescriptor: 797
接下來,讓兩個客戶端發送信息到服務器端,并觀察服務器端的處理情況??蛻舳?發來的消息和客戶端2發來的消息,在服務器端的處理情況如下圖所示:
客戶端1: WindowsAsynchronousSocketChannelImpl: 760 | FileDescriptor: 762 | SocketChannelReadHandle: 803 | HeapByteBuffer: 808
客戶端2: WindowsAsynchronousSocketChannelImpl: 792 | FileDescriptor: 797 | SocketChannelReadHandle: 828 | HeapByteBuffer: 833
可以明顯看到,服務器端處理每一個客戶端通道所使用的SocketChannelReadHandle(處理器)對象都是獨立的,并且所引用的SocketChannel對象都是獨立的。
JAVA NIO和JAVA AIO框架,除了因為操作系統的實現不一樣而去掉了Selector外,其他的重要概念都是存在的,例如上文中提到的Channel的概念,還有演示代碼中使用的Buffer緩存方式。實際上JAVA NIO和JAVA AIO框架您可以看成是一套完整的“高并發IO處理”的實現。
還有改進可能
當然,以上代碼是示例代碼,目標是為了讓您了解JAVA AIO框架的基本使用。所以它還有很多改造的空間,例如:
在生產環境下,我們需要記錄這個通道上“用戶的登錄信息”。那么這個需求可以使用JAVA AIO中的“附件”功能進行實現。
記住JAVA AIO 和 JAVA NIO 框架都是要使用線程池的(當然您也可以不用),線程池的使用原則,一定是只有業務處理部分才使用,使用后馬上結束線程的執行(還回線程池或者消滅它)。JAVA AIO框架中還有一個線程池,是拿給“通知處理器”使用的,這是因為JAVA AIO框架是基于“訂閱-通知”模型的,“訂閱”操作可以由主線程完成,但是您總不能要求在應用程序中并發的“通知”操作也在主線程上完成吧_。
最好的改進方式,當然就是使用Netty或者Mina咯。
為什么還有Netty?
- 既然JAVA NIO / JAVA AIO已經實現了各主流操作系統的底層支持,那么為什么現在主流的JAVA NIO技術會是Netty和MINA呢? 答案很簡單: 因為更好用,這里舉幾個方面的例子:
- 雖然JAVA NIO 和 JAVA AIO框架提供了 多路復用IO/異步IO的支持,但是并沒有提供上層“信息格式”的良好封裝。例如前兩者并沒有提供針對 Protocol Buffer、JSON這些信息格式的封裝,但是Netty框架提供了這些數據格式封裝(基于責任鏈模式的編碼和解碼功能)
- 要編寫一個可靠的、易維護的、高性能的(注意它們的排序)NIO/AIO 服務器應用。除了框架本身要兼容實現各類操作系統的實現外。更重要的是它應該還要處理很多上層特有服務,例如: 客戶端的權限、還有上面提到的信息格式封裝、簡單的數據讀取。這些Netty框架都提供了響應的支持。
- JAVA NIO框架存在一個poll/epoll bug: Selector doesn’t block on Selector.select(timeout),不能block意味著CPU的使用率會變成100%(這是底層JNI的問題,上層要處理這個異常實際上也好辦)。當然這個bug只有在Linux內核上才能重現。
- 這個問題在JDK 1.7版本中還沒有被完全解決: http://bugs.java.com/bugdatabase/view_bug.do?bug_id=2147719。雖然Netty 4.0中也是基于JAVA NIO框架進行封裝的(上文中已經給出了Netty中NioServerSocketChannel類的介紹),但是Netty已經將這個bug進行了處理。
- 其他原因,用過Netty后,可以自己進行比較了。
Java系列文章摘自Java全棧知識體系,這是一個非常棒的Java網站,知識體系全面,由淺入深的講解知識點,官網地址:https://pdai.tech/
文章轉載已取得站長本人同意,僅用于學習和知識分享,切勿商用,違者必究。網站上站長的聯系方式,進群有福利喲~
如在文中有所收獲,請點贊👍+關注,傳統美德不能丟🙌
總結
以上是生活随笔為你收集整理的浅谈(Java)AIO-异步IO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RS(2)--从文本数据到用户画像
- 下一篇: 网络技巧:分享几个路由器设置小技巧,总有