Java NIO 介绍和基本demo
IO操作主要可分為兩階段
1)把磁盤或者網(wǎng)絡(luò)數(shù)據(jù)加載到內(nèi)核的內(nèi)存空間
2)把內(nèi)核的內(nèi)存空間數(shù)據(jù)復(fù)制到用戶進(jìn)程的內(nèi)存空間中
阻塞、非阻塞的區(qū)別是在于第一階段,即數(shù)據(jù)準(zhǔn)備階段。如果在數(shù)據(jù)準(zhǔn)備時(shí),主線程必須等待,就為阻塞;不需要一直等待可以執(zhí)行其他操作,就是非阻塞。
同步、異步的區(qū)別在于第二階段,如果是用戶進(jìn)程需要主動(dòng)復(fù)制數(shù)據(jù)到用戶內(nèi)存,則為同步;如果由內(nèi)核完成數(shù)據(jù)報(bào)復(fù)制之后主動(dòng)返回?cái)?shù)據(jù)則為異步
前面說到,java中I/O編程,大致可以分為三種,阻塞IO(BIO)、非阻塞IO(NIO)和異步IO(AIO)。
BIO
BIO就是同步阻塞IO。在傳統(tǒng)的同步阻塞模型開發(fā)中,ServerSocket負(fù)責(zé)綁定IP地址,啟動(dòng)監(jiān)聽端口;Socket負(fù)責(zé)發(fā)起連接操作。連接成功后,雙方通過輸入和輸出流進(jìn)行同步阻塞式通信。?傳統(tǒng)的socket通訊都是阻塞的,服務(wù)端接收到客戶端請求直到復(fù)制完數(shù)據(jù)都是阻塞的
AIO
異步非阻塞IO,式真正的異步IO,將數(shù)據(jù)報(bào)復(fù)制等操作交給內(nèi)核完成,用戶進(jìn)程可以處理其他事情而不需要干涉
底層過程同 NIO,區(qū)別在于,AIO 使用的命令是?epoll?,使用事件驅(qū)動(dòng)的方式來代替輪詢的方式,當(dāng)監(jiān)聽的 I/O 準(zhǔn)備好了,采用事件驅(qū)動(dòng)(事件回調(diào))的方式通知進(jìn)程去獲取數(shù)據(jù)
NIO
同步非阻塞的IO,底層是采用操作系統(tǒng)的IO多路復(fù)用模型,通過操作系統(tǒng)的select()/epoll()方法監(jiān)聽多個(gè)通道,一旦有一個(gè)channel數(shù)據(jù)報(bào)準(zhǔn)備好,就通知應(yīng)用程序去復(fù)制數(shù)據(jù)報(bào)。
非阻塞體現(xiàn):一個(gè) select 處理多個(gè)客戶應(yīng)用進(jìn)程的 I/O,如果第一個(gè) I/O 數(shù)據(jù)沒有準(zhǔn)備好,那么就去處理第二個(gè)客戶端的 I/O,依此類推,客戶端之間誰的數(shù)據(jù)先準(zhǔn)備好就先處理誰的,不存在第二個(gè)要等第一個(gè)處理完才能開始處理的情況;
IO多路復(fù)用是阻塞在select,epoll這樣的系統(tǒng)調(diào)用之上,而沒有阻塞在真正的I/O系統(tǒng)調(diào)用之上。
提供了與傳統(tǒng)BIO模型中的Socket和ServerSocket相對應(yīng)的SocketChannel和ServerSocketChannel兩種不同的套接字通道實(shí)現(xiàn)。兩種通道都支持阻塞和非阻塞兩種模式,默認(rèn)采用阻塞的實(shí)現(xiàn)方式
1.緩沖區(qū) Buffer
? ? 在 NIO 中,所有數(shù)據(jù)都是用緩沖區(qū)處理的。在讀取數(shù)據(jù)時(shí),它是直接讀到緩沖區(qū)中的; 在寫入數(shù)據(jù)時(shí),它也是寫入到緩沖區(qū)中的。所有的緩沖區(qū)類型都繼承于抽象類 Buffer,比方說:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer.都實(shí)現(xiàn)了相同的Buffer接口。
緩沖區(qū)本質(zhì)上是一個(gè)數(shù)組,并提供了跟蹤和記錄緩沖區(qū)的狀態(tài)變化的信息。
? 其中最重要的屬性是下面三個(gè),它們一起合作完成對緩沖區(qū)內(nèi)部狀態(tài)的變化跟蹤:
? position:當(dāng)前操作數(shù)據(jù)所在的位置,也可以理解做游標(biāo),當(dāng)調(diào)用?get()/put()方法讀取或者寫入緩沖區(qū)的時(shí)候,position會(huì)自 ? ?動(dòng)更新,在新創(chuàng)建一個(gè) Buffer 對象時(shí),position 初始值為0
? limit:指定還有多少數(shù)據(jù)需要取出(在從緩沖區(qū)寫入通道時(shí)),或者還有多少空間可以放入數(shù)據(jù)(在從通道讀入緩沖區(qū)時(shí))。
??讀取緩沖區(qū)的數(shù)據(jù)時(shí),如果limit > position 則認(rèn)為在緩沖區(qū)還有數(shù)據(jù)可以讀取
? capacity:指定可以存儲(chǔ)在緩沖區(qū)中的最大數(shù)據(jù)容量,實(shí)際上,它指定了底層數(shù)組的大小
? 這三個(gè)屬性之間滿足:0 <=position <= limit <=capacity 的關(guān)系
使用緩沖區(qū)讀取和寫入數(shù)據(jù)通常遵循以下四個(gè)步驟:
- ?寫數(shù)據(jù)到緩沖區(qū);
- 調(diào)用buffer.flip()方法;
- 從緩沖區(qū)中讀取數(shù)據(jù);
- 調(diào)用buffer.clear()
? 在向buffer中寫入數(shù)據(jù)時(shí),position會(huì)記錄下當(dāng)前數(shù)據(jù)寫入的位置,如果寫入完成需要讀取數(shù)據(jù),那么就需要通過flip()方法將Buffer從寫模式切換到讀模式,其實(shí)就是鎖定操作范圍,讓數(shù)據(jù)操作范圍索引只能在position - limit 之間,源碼如下。讀取完所有的數(shù)據(jù)后,就需要清空緩沖區(qū),使得buffer可以再次被寫入
//完成兩件事: //1. 把limit 設(shè)置為當(dāng)前的 position 值 //2. 把position 設(shè)置為0 public final Buffer flip() {limit = position;position = 0;mark = -1;return this;}2.通道 Channel
????我們對數(shù)據(jù)的讀取和寫入要通過Channel,它就像水管一樣,是一個(gè)通道。通道區(qū)別于流的地方在于通道是雙向的,可以用于讀、寫或者同時(shí)讀寫操作。 NIO中,任何時(shí)候讀取數(shù)據(jù),都不是直接從通道讀取,而是從通道讀取到緩沖區(qū)。然后再操作緩沖區(qū)中的數(shù)據(jù)
????操作系統(tǒng)底層的通道一般都是全雙工的,所以全雙工的Channel比Stream能更好的映射底層操作系統(tǒng)的API。
????Channel主要分兩大類:
????SelectableChannel:用戶網(wǎng)絡(luò)讀寫
????FileChannel:用于文件操作
????后面代碼會(huì)涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
3.多路復(fù)用器 Selector
????Selector是Java ?NIO 編程的基礎(chǔ)。
????Selector--選擇器,顧名思義就是提供選擇已經(jīng)就緒的任務(wù)的能力:Selector會(huì)不斷輪詢注冊在其上的Channel,查看是否某個(gè)Channel讀或者寫事件就緒。我們可以通過SelectionKey獲取就緒Channel的集合,然后根據(jù)其狀態(tài)進(jìn)行后續(xù)的操作。
????一個(gè)Selector可以同時(shí)輪詢多個(gè)Channel,因?yàn)镴DK使用了epoll()代替?zhèn)鹘y(tǒng)的select實(shí)現(xiàn),所以沒有最大連接句柄1024/2048的限制。所以,只需要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬的客戶端。
服務(wù)端
/*** 2020/3/8* created by chenpp*/ public class NioServer {private int port;private static Selector selector = null;/*** 指定端口號啟動(dòng)服務(wù)* */public boolean startServer(int port){try {this.port = port;selector = Selector.open();//打開監(jiān)聽通道ServerSocketChannel server = ServerSocketChannel.open();//綁定端口server.bind(new InetSocketAddress(this.port));//默認(rèn)configureBlocking為true,如果為 true,此通道將被置于阻塞模式;如果為 false.則此通道將被置于非阻塞模式server.configureBlocking(false);//創(chuàng)建選擇器selector = Selector.open();//監(jiān)聽客戶端連接請求server.register(selector, SelectionKey.OP_ACCEPT);System.out.println("服務(wù)端啟動(dòng)成功,監(jiān)聽端口:" + port);}catch (Exception e){System.out.println("服務(wù)器啟動(dòng)失敗");return false;}return true;}public void listen() throws IOException {while(true){//阻塞方法,輪詢注冊的channel,當(dāng)至少一個(gè)channel就緒的時(shí)候才會(huì)繼續(xù)往下執(zhí)行int keyCount = selector.select();System.out.println("當(dāng)前有:"+keyCount+"channel有事件就緒");//獲取就緒的SelectionKeySet<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;//迭代就緒的keywhile(it.hasNext()){key = it.next();it.remove();//SelectionKey相當(dāng)于是一個(gè)Channel的表示,標(biāo)記當(dāng)前channel處于什么狀態(tài)// 按照channel的不同狀態(tài)處理數(shù)據(jù)process(key);}}}private void process(SelectionKey key) throws IOException {//該channel已就緒,可接收消息if(key.isAcceptable()){System.out.println("accept事件就緒...");doAccept(key);}else if(key.isReadable()){System.out.println("read事件就緒...");doRead(key);}else if(key.isWritable()){System.out.println("write事件就緒...");doWrite(key);}}private void doWrite(SelectionKey key) throws IOException {//獲取對應(yīng)的socketSocketChannel socket = (SocketChannel)key.channel();//獲取key上的附件String content = (String)key.attachment();socket.write(ByteBuffer.wrap(content.getBytes()));socket.close();}private void doRead(SelectionKey key) throws IOException {//獲取對應(yīng)的socketSocketChannel socket = (SocketChannel)key.channel();//設(shè)置一個(gè)讀取數(shù)據(jù)的Buffer 大小為1024ByteBuffer buff = ByteBuffer.allocate(1024);StringBuilder content = new StringBuilder();while(socket.read(buff) > 0) {buff.flip();content.append(new String(buff.array(),"utf-8"));}//注冊selector,并設(shè)置為可寫模式key = socket.register(selector,SelectionKey.OP_WRITE);//在key上攜帶一個(gè)附件(附近就是之后要寫的內(nèi)容)key.attach("服務(wù)端已收到:"+content);System.out.println("讀取內(nèi)容:" + content);}private void doAccept(SelectionKey key) throws IOException {//獲取對應(yīng)的channelServerSocketChannel server = (ServerSocketChannel)key.channel();//從channel中獲取socket信息SocketChannel socket = server.accept();//設(shè)置為非阻塞模式socket.configureBlocking(false);//注冊selector,并設(shè)置為可讀模式socket.register(selector, SelectionKey.OP_READ);}} /*** 2020/3/8* created by chenpp*/ public class NioServerStarter {public static void main(String[] args) throws IOException {NioServer nioServer = new NioServer();nioServer.startServer(8080);nioServer.listen();} }客戶端
package com.chenpp.nio;import javax.sound.midi.SoundbankResource; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.UUID;/*** 2020/3/8* created by chenpp*/ public class NioClient {private static Selector selector = null;public void start(String ip, int port) throws IOException {//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道SocketChannel socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);//連接對應(yīng)的服務(wù)器 ip , portsocketChannel.connect(new InetSocketAddress(ip, port));//注冊select為連接狀態(tài)socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("客戶端,啟動(dòng)成功...");}public void listen() throws IOException {while (true) {//阻塞方法,輪詢注冊的channel,當(dāng)至少一個(gè)channel就緒的時(shí)候才會(huì)繼續(xù)往下執(zhí)行selector.select();//獲取就緒的SelectionKeySet<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key = null;//迭代就緒的keywhile (it.hasNext()) {key = it.next();it.remove();//SelectionKey相當(dāng)于是一個(gè)Channel的表示,標(biāo)記當(dāng)前channel處于什么狀態(tài)// 按照channel的不同狀態(tài)處理數(shù)據(jù)process(key);}}}private void process(SelectionKey key) throws IOException {//channel處于可連接狀態(tài),發(fā)送消息給服務(wù)端if (key.isConnectable()) {System.out.println("connect事件就緒 ....");SocketChannel clientChannel = (SocketChannel) key.channel();if (clientChannel.isConnectionPending()) {clientChannel.finishConnect();}clientChannel.configureBlocking(false);String name = UUID.randomUUID().toString();System.out.println("客戶端發(fā)送數(shù)據(jù):{}" + name);ByteBuffer buffer = ByteBuffer.wrap(name.getBytes());clientChannel.write(buffer);clientChannel.register(key.selector(), SelectionKey.OP_READ);} else if (key.isReadable()) {//獲取對應(yīng)的socketSystem.out.println("read事件就緒 ....");SocketChannel socket = (SocketChannel) key.channel();//設(shè)置一個(gè)讀取數(shù)據(jù)的Buffer 大小為1024ByteBuffer buff = ByteBuffer.allocate(1024);StringBuilder content = new StringBuilder();int len = socket.read(buff);if (len > 0) {buff.flip();content.append(new String(buff.array(), "utf-8"));//讓客戶端讀取下一次readSystem.out.println("客戶端收到反饋:" + content);key.interestOps(SelectionKey.OP_READ);}else if(len <= 0){key.cancel();socket.close();}}}} /*** 2020/3/8* created by chenpp*/ public class NioClientStarter {public static void main(String[] args) throws IOException {NioClient client = new NioClient();client.start("localhost",8080);client.listen();} }NIO工作原理:
- 由一個(gè)專門的線程來處理所有的 IO 事件,并負(fù)責(zé)分發(fā)。
- 事件驅(qū)動(dòng)機(jī)制:事件到達(dá)的時(shí)候觸發(fā),而不是同步的去監(jiān)視事件。
參考:
https://blog.csdn.net/anxpp/article/details/51512200
總結(jié)
以上是生活随笔為你收集整理的Java NIO 介绍和基本demo的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IO中的阻塞、非阻塞、同步、异步概念分析
- 下一篇: IntelliJ IDEA不好用?那是因