初识NIO之Java小Demo
Java中的IO、NIO、AIO:
BIO:在Java1.4之前,我們建立網(wǎng)絡(luò)連接均使用BIO,屬于同步阻塞IO。默認(rèn)情況下,當(dāng)有一條請求接入就有一條線程專門接待。所以,在客戶端向服務(wù)端請求時,會詢問是否有空閑線程進(jìn)行接待,如若沒有則一直等待或拒接。當(dāng)并發(fā)量小時還可以接受,當(dāng)請求量一多起來則會有許多線程生成,在Java中,多線程的上下文切換會消耗計算機(jī)有限的資源和性能,造成資源浪費。
NIO:NIO的出現(xiàn)是為了解決再BIO下的大并發(fā)量問題。其特點是能用一條線程管理所有連接。如下圖所示:
NIO是同步非阻塞模型,通過一條線程控制選擇器(Selector)來管理多個Channel,減少創(chuàng)建線程和上下文切換的浪費。當(dāng)線程通過選擇器向某條Channel請求數(shù)據(jù)但其沒有數(shù)據(jù)時,是不會阻塞的,直接返回,繼續(xù)干其他事。而一旦某Channel就緒,線程就能去調(diào)用請求數(shù)據(jù)等操作。當(dāng)該線程對某條Channel進(jìn)行寫操作時同樣不會被阻塞,所以該線程能夠?qū)Χ鄠€Channel進(jìn)行管理。NIO是面向緩沖流的,即數(shù)據(jù)寫入寫出都是通過 Channel —— Buffer 這一途徑。(雙向流通)
AIO:與之前兩個IO模型不同的是,AIO屬于異步非阻塞模型。當(dāng)進(jìn)行讀寫操作時只須調(diào)用api的read方法和write方法,這兩種方法均是異步。對于讀方法來說,當(dāng)有流可讀取時,操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū),并通知應(yīng)用程序;對于寫操作而言,當(dāng)操作系統(tǒng)將write方法傳遞的流寫入完畢時,操作系統(tǒng)主動通知應(yīng)用程序。換言之就是當(dāng)調(diào)用完api后,操作系統(tǒng)完成后會調(diào)用回調(diào)函數(shù)。
總結(jié):一般IO分為同步阻塞模型(BIO),同步非阻塞模型(NIO),異步阻塞模型,異步非阻塞模型(AIO)
同步阻塞模型指的是當(dāng)調(diào)用io操作時必須等到其io操作結(jié)束
同步非阻塞模型指當(dāng)調(diào)用io操作時不必等待可以繼續(xù)干其他事,但必須不斷詢問io操作是否完成。
異步阻塞模型指應(yīng)用調(diào)用io操作后,由操作系統(tǒng)完成io操作,但應(yīng)用必須等待或去詢問操作系統(tǒng)是否完成。
異步非阻塞指應(yīng)用調(diào)用io操作后,由操作系統(tǒng)完成io操作并調(diào)用回調(diào)函數(shù),應(yīng)用完成放手不管。
NIO的小Demo之服務(wù)端
首先,先看下服務(wù)端的大體代碼
public class ServerHandle implements Runnable{//帶參數(shù)構(gòu)造函數(shù)public ServerHandle(int port){}//停止方法public void shop(){}//寫方法private void write(SocketChannel socketChannel, String response)throws IOException{}//當(dāng)有連接進(jìn)來時的處理方法private void handleInput(SelectionKey key) throws IOException{} //服務(wù)端運(yùn)行主體方法@Overridepublic void run() {} } 復(fù)制代碼首先我們先看看該服務(wù)端的構(gòu)造函數(shù)的實現(xiàn):
public ServerHandle(int port){try {//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道serverSocketChannel = ServerSocketChannel.open();//設(shè)置為非阻塞模式serverSocketChannel.configureBlocking(false);//傳入端口,并設(shè)定連接隊列最大為1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//監(jiān)聽客戶端請求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//標(biāo)記啟動標(biāo)志started = true;System.out.println("服務(wù)器已啟動,端口號為:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}} 復(fù)制代碼在這里創(chuàng)建了選擇器和監(jiān)聽通道,并將該監(jiān)聽通道注冊到選擇器上并選擇其感興趣的事件(accept)。后續(xù)其他接入的連接都將通過該 監(jiān)聽通道 傳入。
然后就是寫方法的實現(xiàn):
private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);//將寫模式改為讀模式wirteBuffer.flip();//寫入管道channel.write(wirteBuffer);} 復(fù)制代碼其次是當(dāng)由事件傳入時,即對連接進(jìn)來的鏈接的處理方法
private void handleInput(SelectionKey key) throws IOException{//當(dāng)該鍵可用時if (key.isValid()){if (key.isAcceptable()){//返回該密鑰創(chuàng)建的通道。ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();通過該通道獲取鏈接進(jìn)來的通道SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){//返回該密鑰創(chuàng)建的通道。SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服務(wù)器收到的信息:" + expression);//此處是為了區(qū)別打印在工作臺上的數(shù)據(jù)是由客戶端產(chǎn)生還是服務(wù)端產(chǎn)生doWrite(socketChannel, "+++++" + expression + "+++++");} else if(readBytes == 0){//無數(shù)據(jù),忽略}else if (readBytes < 0){//資源關(guān)閉key.cancel();socketChannel.close();}}}} 復(fù)制代碼這里要說明的是,只要ServerSocketChannel及SocketChannel向Selector注冊了特定的事件,Selector就會監(jiān)控這些事件是否發(fā)生。 如在構(gòu)造方法中有一通道serverSocketChannel注冊了accept事件。當(dāng)其就緒時就可以通過調(diào)用selector的selectorKeys()方法,訪問”已選擇鍵集“中的就緒通道。
壓軸方法:
@Overridepublic void run() {//循環(huán)遍歷while (started) {try {//當(dāng)沒有就緒事件時阻塞selector.select();//返回就緒通道的鍵Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();//獲取后必須移除,否則會陷入死循環(huán)iterator.remove();try {//對就緒通道的處理方法,上述有描述handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}} 復(fù)制代碼此方法為服務(wù)端的主體方法。大致流程如下:
NIO的小Demo之客戶端
public class ClientHandle implements Runnable{//構(gòu)造函數(shù),構(gòu)造時順便綁定public ClientHandle(String ip, int port){}//處理就緒通道private void handleInput(SelectionKey key) throws IOException{}//寫方法(與服務(wù)端的寫方法一致)private void doWrite(SocketChannel channel,String request) throws IOException{}//連接到服務(wù)端private void doConnect() throws IOException{}//發(fā)送信息public void sendMsg(String msg) throws Exception{} } 復(fù)制代碼首先先看構(gòu)造函數(shù)的實現(xiàn):
public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道socketChannel = SocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}} 復(fù)制代碼接下來看對就緒通道的處理辦法:
private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){//這里的作用將在后面的代碼(doConnect方法)說明if(sc.finishConnect()){System.out.println("已連接事件");}else{System.exit(1);}}//讀消息if(key.isReadable()){//創(chuàng)建ByteBuffer,并開辟一個1k的緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節(jié)數(shù)int readBytes = sc.read(buffer);//讀取到字節(jié),對字節(jié)進(jìn)行編解碼if(readBytes>0){buffer.flip();//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組byte[] bytes = new byte[buffer.remaining()];//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);}lse if(readBytes==0){//忽略}else if(readBytes<0){//鏈路已經(jīng)關(guān)閉,釋放資源key.cancel();sc.close();}}}} 復(fù)制代碼在run方法之前需先看下此方法的實現(xiàn):
private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}} 復(fù)制代碼當(dāng)SocketChannel工作于非阻塞模式下時,調(diào)用connect()時會立即返回: 如果連接建立成功則返回的是true(比如連接localhost時,能立即建立起連接),否則返回false。
在非阻塞模式下,返回false后,必須要在隨后的某個地方調(diào)用finishConnect()方法完成連接。 當(dāng)SocketChannel處于阻塞模式下時,調(diào)用connect()時會進(jìn)入阻塞,直至連接建立成功或者發(fā)生IO錯誤時,才從阻塞狀態(tài)中退出。
所以該代碼在connect服務(wù)端后返回false(但還是有作用的),并在else語句將該通道注冊在選擇器上并選擇connect事件。
客戶端的run方法:
@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循環(huán)遍歷selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector關(guān)閉后會自動釋放里面管理的資源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}} 復(fù)制代碼發(fā)送信息到服務(wù)端的方法:
public void sendMsg(String msg) throws Exception{//覆蓋其之前感興趣的事件(connect),將其更改為OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);} 復(fù)制代碼完整代碼:
服務(wù)端:
/*** Created by innoyiya on 2018/8/20.*/ public class Service {private static int DEFAULT_POST = 12345;private static ServerHandle serverHandle;public static void start(){start(DEFAULT_POST);}public static synchronized void start(int post) {if (serverHandle != null){serverHandle.shop();}serverHandle = new ServerHandle(post);new Thread(serverHandle,"server").start();} } 復(fù)制代碼服務(wù)端主體:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/ public class ServerHandle implements Runnable{private Selector selector;private ServerSocketChannel serverSocketChannel;private volatile boolean started;public ServerHandle(int port){try {//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道serverSocketChannel = ServerSocketChannel.open();//設(shè)置為非阻塞模式serverSocketChannel.configureBlocking(false);//判定端口,并設(shè)定連接隊列最大為1024serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);//監(jiān)聽客戶端請求serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//標(biāo)記啟動標(biāo)志started = true;System.out.println("服務(wù)器已啟動,端口號為:" + port);} catch (IOException e){e.printStackTrace();System.exit(1);}}public void shop(){started = false;}private void doWrite(SocketChannel channel, String response) throws IOException {byte[] bytes = response.getBytes();ByteBuffer wirteBuffer = ByteBuffer.allocate(bytes.length);wirteBuffer.put(bytes);wirteBuffer.flip();channel.write(wirteBuffer);}private void handleInput(SelectionKey key) throws IOException{if (key.isValid()){if (key.isAcceptable()){ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();SocketChannel socketChannel = serverSocketChannel.accept();socketChannel.configureBlocking(false);socketChannel.register(selector, SelectionKey.OP_READ);}if (key.isReadable()){SocketChannel socketChannel = (SocketChannel) key.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int readBytes = socketChannel.read(byteBuffer);if (readBytes > 0){byteBuffer.flip();byte[] bytes = new byte[byteBuffer.remaining()];byteBuffer.get(bytes);String expression = new String(bytes, "UTF-8");System.out.println("服務(wù)器收到的信息:" + expression);doWrite(socketChannel, "+++++" + expression + "+++++");} else if (readBytes < 0){key.cancel();socketChannel.close();}}}}@Overridepublic void run() {//循環(huán)遍歷while (started) {try {selector.select();//System.out.println(selector.select());Set<SelectionKey> keys = selector.selectedKeys();//System.out.println(keys.size());Iterator<SelectionKey> iterator = keys.iterator();SelectionKey key;while (iterator.hasNext()){key = iterator.next();iterator.remove();try {handleInput(key);} catch (Exception e){if (key != null){key.cancel();if (key.channel() != null) {key.channel().close();}}}}}catch (Throwable throwable){throwable.printStackTrace();}}} } 復(fù)制代碼客戶端:
/*** Created by innoyiya on 2018/8/20.*/ public class Client {private static String DEFAULT_HOST = "localhost";private static int DEFAULT_PORT = 12345;private static ClientHandle clientHandle;private static final String EXIT = "exit";public static void start() {start(DEFAULT_HOST, DEFAULT_PORT);}public static synchronized void start(String ip, int port) {if (clientHandle != null){clientHandle.stop();}clientHandle = new ClientHandle(ip, port);new Thread(clientHandle, "Server").start();}//向服務(wù)器發(fā)送消息public static boolean sendMsg(String msg) throws Exception {if (msg.equals(EXIT)){return false;}clientHandle.sendMsg(msg);return true;}} 復(fù)制代碼客戶端主體代碼:
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;/*** Created by innoyiya on 2018/8/20.*/public class ClientHandle implements Runnable{private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean started;public ClientHandle(String ip,int port) {this.host = ip;this.port = port;try{//創(chuàng)建選擇器selector = Selector.open();//打開監(jiān)聽通道socketChannel = SocketChannel.open();//如果為 true,則此通道將被置于阻塞模式;如果為 false,則此通道將被置于非阻塞模式socketChannel.configureBlocking(false);started = true;}catch(IOException e){e.printStackTrace();System.exit(1);}}public void stop(){started = false;}private void handleInput(SelectionKey key) throws IOException{if(key.isValid()){SocketChannel sc = (SocketChannel) key.channel();if(key.isConnectable()){if(sc.finishConnect()){System.out.println("已連接事件");}else{System.exit(1);}}//讀消息if(key.isReadable()){//創(chuàng)建ByteBuffer,并開辟一個1M的緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(1024);//讀取請求碼流,返回讀取到的字節(jié)數(shù)int readBytes = sc.read(buffer);//讀取到字節(jié),對字節(jié)進(jìn)行編解碼if(readBytes>0){//將緩沖區(qū)當(dāng)前的limit設(shè)置為position=0,用于后續(xù)對緩沖區(qū)的讀取操作buffer.flip();//根據(jù)緩沖區(qū)可讀字節(jié)數(shù)創(chuàng)建字節(jié)數(shù)組byte[] bytes = new byte[buffer.remaining()];//將緩沖區(qū)可讀字節(jié)數(shù)組復(fù)制到新建的數(shù)組中buffer.get(bytes);String result = new String(bytes,"UTF-8");System.out.println("客戶端收到消息:" + result);} else if(readBytes<0){key.cancel();sc.close();}}}}//異步發(fā)送消息private void doWrite(SocketChannel channel,String request) throws IOException{byte[] bytes = request.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);//flip操作writeBuffer.flip();//發(fā)送緩沖區(qū)的字節(jié)數(shù)組channel.write(writeBuffer);}private void doConnect() throws IOException{if(socketChannel.connect(new InetSocketAddress(host,port))){System.out.println("connect");}else {socketChannel.register(selector, SelectionKey.OP_CONNECT);System.out.println("register");}}public void sendMsg(String msg) throws Exception{//覆蓋其之前感興趣的事件,將其更改為OP_READsocketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel, msg);}@Overridepublic void run() {try{doConnect();}catch(IOException e){e.printStackTrace();System.exit(1);}//循環(huán)遍歷selectorwhile(started){try{selector.select();Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();SelectionKey key ;while(it.hasNext()){key = it.next();it.remove();try{handleInput(key);}catch(Exception e){if(key != null){key.cancel();if(key.channel() != null){key.channel().close();}}}}}catch(Exception e){e.printStackTrace();System.exit(1);}}//selector關(guān)閉后會自動釋放里面管理的資源if(selector != null){try{selector.close();}catch (Exception e) {e.printStackTrace();}}} } 復(fù)制代碼測試類:
import java.util.Scanner;/*** Created by innoyiya on 2018/8/20.*/ public class Test {public static void main(String[] args) throws Exception {Service.start();Thread.sleep(1000);Client.start();while(Client.sendMsg(new Scanner(System.in).nextLine()));} } 復(fù)制代碼控制臺打印:
服務(wù)器已啟動,端口號為:12345 register 已連接事件 1234 服務(wù)器收到的信息:1234 客戶端收到消息:+++++1234+++++ 5678 服務(wù)器收到的信息:5678 客戶端收到消息:+++++5678+++++ 復(fù)制代碼如有不妥之處,請告訴我。
總結(jié)
以上是生活随笔為你收集整理的初识NIO之Java小Demo的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: while循环里嵌套一个if_if-el
- 下一篇: dw怎么做竖线_我花了5分钟,做了一份极