谈谈java的bio、nio、aio模型
目錄
socket
IO(BIO)和NIO的區(qū)別
同步和異步
bio:同步阻塞式IO
NIO:同步非阻塞IO(工作中用的很少)
Buffer使用
NIO代碼
AIO
socket
?? ?Socket又稱“套接字”,應用程序通常通過“套接字”向網(wǎng)絡發(fā)出請求或者應答請求。
? ? 套接字之間的鏈接過程可以分為四個步驟:服務器監(jiān)聽、客戶端請求服務器、服務器確認、客戶端確認、進行通信。
IO(BIO)和NIO的區(qū)別
其本質就是阻塞和非阻塞的區(qū)別。
? ? 阻塞概念:應用程序在獲取網(wǎng)絡數(shù)據(jù)的時候,如果網(wǎng)絡傳輸數(shù)據(jù)很慢,那么程序就一直等著,直到傳輸完畢為止。
? ? 非阻塞概念:應用程序直接可以獲取已經(jīng)準備就緒好的數(shù)據(jù),無需等待。
? ? IO為同步阻塞形式,NIO為同步非阻塞形式。NIO并沒有實現(xiàn)異步,在JDK1.7之后,升級了NIO庫包,支持異步非阻塞通信模型即NIO2.0(AIO)。
同步和異步
同步和異步一般是面向操作系統(tǒng)與應用程序對IO操作的層面上來區(qū)別的。
? ? 同步時,應用程序會直接參與IO讀寫操作,并且我們的應用程序會直接阻塞到某一個方法上,直到數(shù)據(jù)準備就緒:或者采用輪詢的策略實時監(jiān)察數(shù)據(jù)的就緒狀態(tài),如果就緒則獲取數(shù)據(jù)。
? ? 異步時,則所有的IO讀寫操作交給操作系統(tǒng)處理,與我們的應用程序沒有直接關系,我們程序不需要關心IO讀寫,當操作系統(tǒng)完成了IO讀寫操作時,會給我們應用程序發(fā)送通知,我們的應用程序直接拿走數(shù)據(jù)即可。
同步說的是你的server服務器端的執(zhí)行方式。
阻塞說的是具體的技術,接收數(shù)據(jù)的方式、狀態(tài)(IO、NIO)
bio:同步阻塞式IO
//客戶端 public class Client {final static String ADDRESS = "127.0.0.1";final static int PORT = 8765;public static void main(String[] args) {Socket socket = null;BufferedReader in = null;PrintWriter out = null;try {socket = new Socket(ADDRESS, PORT);in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(), true);//向服務器端發(fā)送數(shù)據(jù)out.println("接收到客戶端的請求數(shù)據(jù)...");String response = in.readLine();System.out.println("Client: " + response);} catch (Exception e) {e.printStackTrace();} finally {if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}}if(out != null){try {out.close();} catch (Exception e) {e.printStackTrace();}}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}}socket = null;}} }//服務器端 public class Server {final static int PROT = 8765;public static void main(String[] args) {ServerSocket server = null;try {server = new ServerSocket(PROT);System.out.println(" server start .. ");//進行阻塞Socket socket = server.accept();//新建一個線程執(zhí)行客戶端的任務new Thread(new ServerHandler(socket)).start();} catch (Exception e) {e.printStackTrace();} finally {if(server != null){try {server.close();} catch (IOException e) {e.printStackTrace();}}server = null;}} }public class ServerHandler implements Runnable{private Socket socket ;public ServerHandler(Socket socket){this.socket = socket;}@Overridepublic void run() {BufferedReader in = null;PrintWriter out = null;try {in = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));out = new PrintWriter(this.socket.getOutputStream(), true);String body = null;while(true){body = in.readLine();if(body == null) break;System.out.println("Server :" + body);out.println("服務器端回送響的應數(shù)據(jù).");}} catch (Exception e) {e.printStackTrace();} finally {if(in != null){try {in.close();} catch (IOException e) {e.printStackTrace();}}if(out != null){try {out.close();} catch (Exception e) {e.printStackTrace();}}if(socket != null){try {socket.close();} catch (IOException e) {e.printStackTrace();}}socket = null;}} } //(JDK1.5之前沒有出NIO的解決辦法):BIO+線程池,降低服務器端的壓力 {server = new ServerSocket(PORT);System.out.println("server start");Socket socket = null;HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000);//自定義線程池while(true){socket = server.accept();executorPool.execute(new ServerHandler(socket));} }//自定義線程池 public class HandlerExecutorPool {private ExecutorService executor;public HandlerExecutorPool(int maxPoolSize, int queueSize){this.executor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),maxPoolSize,120L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(queueSize));}public void execute(Runnable task){this.executor.execute(task);} }NIO:同步非阻塞IO(工作中用的很少)
幾個概念:Buffer(緩沖區(qū))、Channel(管道、通道)、Selector(選擇器、多路復用器)
NIO的本質就是避免原始的TCP建立連接使用3次握手的操作,減少連接的開銷。
Buffer使用
?? ?Buffer是一個對象,它包含一些要寫入或者要讀取的數(shù)據(jù)。在NIO類庫中加入Buffer對象,體現(xiàn)了新庫與原IO的一個重要的區(qū)別。在面向流的IO中,可以將數(shù)據(jù)直接寫入或讀取到Stream對象中。在NIO庫中,所有數(shù)據(jù)都是用緩沖區(qū)處理的(讀寫)。緩沖區(qū)實質上是一個數(shù)組,通常它是一個字節(jié)數(shù)組(ByteBuffer),也可以使用其他類型的數(shù)據(jù)。這個數(shù)組為緩沖區(qū)提供了數(shù)據(jù)的訪問讀寫等操作屬性,如位置、容量、上限等概念,參考API文檔。
? ? Buffer類型:我們最常用的就是ByteBuffer,實際上每一種java基本類型都對應了一種緩沖區(qū)(除了Boolean類型)
?? ?ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer
/** 各種類型的buffer用法是一樣的。 pos(位置)、lim(限制)、cap(容量) put(x)方法會將該元素放置到當前位置,并且位置+1;但是put(x, x)方法不會改變當前位置。 capacity()獲取容量;limit()獲取限制,而且是從當前位置開始獲取的限制。 get()獲取當前位置的數(shù)據(jù),并且位置+1。 flip()復位當前位置,并且修改限制為復位之前的位置。最好在put完畢之后flip()一下。 position(0)設置當前位置為0。 remaining()可讀數(shù)據(jù)長度,位置到限制(pos-lim)數(shù)量。 */ // 1 基本操作 //創(chuàng)建指定長度的緩沖區(qū) IntBuffer buf = IntBuffer.allocate(10); //put還有很多重載方法 buf.put(13);// position位置:0 - > 1 buf.put(21);// position位置:1 - > 2 buf.put(35);// position位置:2 - > 3 //把位置復位為0,也就是position位置:3 - > 0 buf.flip(); System.out.println("使用flip復位:" + buf);//[pos=3 lim=10 cap=10]位置、限制、容量 System.out.println("容量為: " + buf.capacity()); //容量一旦初始化后不允許改變(warp方法包裹數(shù)組除外) System.out.println("限制為: " + buf.limit()); //由于只裝載了三個元素,所以可讀取或者操作的元素為3 則limit=3 System.out.println("獲取下標為1的元素:" + buf.get(1)); System.out.println("get(index)方法,position位置不改變:" + buf); buf.put(1, 4);//把下標為1的位置賦值4 System.out.println("put(index, change)方法,position位置不變:" + buf);; for (int i = 0; i < buf.limit(); i++) {//調用get方法會使其緩沖區(qū)位置(position)向后遞增一位System.out.print(buf.get() + "\t"); } System.out.println("buf對象遍歷之后為: " + buf); // 2 wrap方法使用 // wrap方法會包裹一個數(shù)組: 一般這種用法不會先初始化緩存對象的長度,因為沒有意義,最后還會被wrap所包裹的數(shù)組覆蓋掉。 // 并且wrap方法修改緩沖區(qū)對象的時候,數(shù)組本身也會跟著發(fā)生變化。 int[] arr = new int[]{1,2,5}; IntBuffer buf1 = IntBuffer.wrap(arr);//[pos=0 lim=3 cap=3] System.out.println(buf1); IntBuffer buf2 = IntBuffer.wrap(arr, 0 , 2);//截取0-2[pos=0 lim=2 cap=3] //這樣使用表示容量為數(shù)組arr的長度,但是可操作的元素只有實際進入緩存區(qū)的元素長度 System.out.println(buf2); // 3 其他方法 IntBuffer buf1 = IntBuffer.allocate(10); int[] arr = new int[]{1,2,5}; buf1.put(arr); System.out.println(buf1); //一種復制方法,完全復制 IntBuffer buf3 = buf1.duplicate(); System.out.println(buf3); //設置buf1的位置屬性注意這倆的區(qū)別 buf1.position(0); buf1.flip(); System.out.println(buf1); System.out.println("可讀數(shù)據(jù)為:" + buf1.remaining()); int[] arr2 = new int[buf1.remaining()]; //將緩沖區(qū)數(shù)據(jù)放入arr2數(shù)組中去 buf1.get(arr2); for(int i : arr2){System.out.print(Integer.toString(i) + ","); }NIO代碼
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class Server implements Runnable{//1 多路復用器(管理所有的通道)private Selector seletor;//2 建立緩沖區(qū)private ByteBuffer readBuf = ByteBuffer.allocate(1024);//3private ByteBuffer writeBuf = ByteBuffer.allocate(1024);public Server(int port){try {//1 打開路復用器this.seletor = Selector.open();//2 打開服務器通道ServerSocketChannel ssc = ServerSocketChannel.open();//3 設置服務器通道為非阻塞模式ssc.configureBlocking(false);//4 綁定地址ssc.bind(new InetSocketAddress(port));//5 把服務器通道注冊到多路復用器上,并且監(jiān)聽阻塞事件ssc.register(this.seletor, SelectionKey.OP_ACCEPT);System.out.println("Server start, port :" + port);} catch (IOException e) {e.printStackTrace();}}@Overridepublic void run() {while(true){try {//1 必須要讓多路復用器開始監(jiān)聽this.seletor.select();//2 返回多路復用器已經(jīng)選擇的結果集Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();//3 進行遍歷while(keys.hasNext()){//4 獲取一個選擇的元素SelectionKey key = keys.next();//5 直接從容器中移除就可以了keys.remove();//6 如果是有效的if(key.isValid()){//7 如果為阻塞狀態(tài)if(key.isValid() && key.isAcceptable()){this.accept(key);}//8 如果為可讀狀態(tài)if(key.isValid() && key.isReadable()){this.read(key);}//9 寫數(shù)據(jù)if(key.isValid() && key.isWritable()){//this.write(key); //ssc}}}} catch (IOException e) {e.printStackTrace();}}}private void write(SelectionKey key){//ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//ssc.register(this.seletor, SelectionKey.OP_WRITE);}private void read(SelectionKey key) {try {//1 清空緩沖區(qū)舊的數(shù)據(jù)this.readBuf.clear();//2 獲取之前注冊的socket通道對象SocketChannel sc = (SocketChannel) key.channel();//3 讀取數(shù)據(jù)int count = sc.read(this.readBuf);//4 如果沒有數(shù)據(jù)if(count == -1){key.channel().close();key.cancel();return;}//5 有數(shù)據(jù)則進行讀取 讀取之前需要進行復位方法(把position 和limit進行復位)this.readBuf.flip();//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長度創(chuàng)建相應大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)byte[] bytes = new byte[this.readBuf.remaining()];//7 接收緩沖區(qū)數(shù)據(jù)this.readBuf.get(bytes);//8 打印結果String body = new String(bytes).trim();System.out.println("Server : " + body);// 9..可以寫回給客戶端數(shù)據(jù)// 回發(fā)數(shù)據(jù),并關閉channelsc.configureBlocking(false);//注冊到多路復用器上,并設置讀取標識sc.register(this.seletor, SelectionKey.OP_READ);ByteBuffer sendBuffer = ByteBuffer.wrap(("我是" + body).getBytes());sc.write(sendBuffer);//sc.close();} catch (IOException e) {//e.printStackTrace();SocketChannel sc = (SocketChannel) key.channel();try {System.out.println("關閉連接");key.cancel(); sc.socket().close(); sc.close();} catch (IOException e1) {//e1.printStackTrace();}}}private void accept(SelectionKey key) {try {//1 獲取服務通道ServerSocketChannel ssc = (ServerSocketChannel) key.channel();//2 執(zhí)行阻塞方法SocketChannel sc = ssc.accept();//3 設置阻塞模式sc.configureBlocking(false);//4 注冊到多路復用器上,并設置讀取標識sc.register(this.seletor, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) {new Thread(new Server(8765)).start();;} } import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; public class Client {//需要一個Selectorpublic static void main(String[] args) {//創(chuàng)建連接的地址InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8765);//聲明連接通道SocketChannel sc = null;//建立緩沖區(qū)ByteBuffer buf = ByteBuffer.allocate(1024);try {//打開通道sc = SocketChannel.open();//進行連接sc.connect(address);while(true){//定義一個字節(jié)數(shù)組,然后使用系統(tǒng)錄入功能:byte[] bytes = new byte[1024];System.in.read(bytes);//把數(shù)據(jù)放到緩沖區(qū)中buf.put(bytes);//對緩沖區(qū)進行復位buf.flip();//寫出數(shù)據(jù)sc.write(buf);//清空緩沖區(qū)數(shù)據(jù)buf.clear();//從服務端讀取消息int readLenth = sc.read(buf);//讀取模式buf.flip();byte[] bytesrtn = new byte[readLenth];buf.get(bytesrtn);System.out.println(new String(bytesrtn));buf.clear();}} catch (IOException e) {e.printStackTrace();} finally {if(sc != null){try {sc.close();} catch (IOException e) {e.printStackTrace();}}}} }AIO
AIO編程,在NIO基礎之上引入了異步通道的概念,并提供了異步文件和異步套接字通道的實現(xiàn),從而在真正意義上實現(xiàn)了異步非阻塞,NIO只是非阻塞而并非異步。而AIO它不需要通過多路復用器對注冊的通道進行輪詢操作即可實現(xiàn)異步讀寫,從而簡化了NIO編程模型。也可稱之為NIO2.0,這種模式才真正的屬于我們異步非阻塞的模型。
import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.ExecutionException; public class Client implements Runnable{private AsynchronousSocketChannel asc ;public Client() throws Exception {asc = AsynchronousSocketChannel.open();}public void connect(){asc.connect(new InetSocketAddress("127.0.0.1", 8765));}public void write(String request){try {asc.write(ByteBuffer.wrap(request.getBytes())).get();//.get()表示異步read();} catch (Exception e) {e.printStackTrace();}}private void read() {ByteBuffer buf = ByteBuffer.allocate(1024);try {asc.read(buf).get();//.get()表示異步buf.flip();byte[] respByte = new byte[buf.remaining()];buf.get(respByte);System.out.println(new String(respByte,"utf-8").trim());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} catch (UnsupportedEncodingException e) {e.printStackTrace();}}@Overridepublic void run() {while(true){//就是不讓它停止}}public static void main(String[] args) throws Exception {Client c1 = new Client();c1.connect();Client c2 = new Client();c2.connect();Client c3 = new Client();c3.connect();new Thread(c1, "c1").start();new Thread(c2, "c2").start();new Thread(c3, "c3").start();Thread.sleep(1000);c1.write("c1 aaa");c2.write("c2 bbbb");c3.write("c3 ccccc");}} import java.net.InetSocketAddress; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Server {//線程池private ExecutorService executorService;//線程組private AsynchronousChannelGroup threadGroup;//服務器通道public AsynchronousServerSocketChannel assc;public Server(int port){try {//創(chuàng)建一個緩存池executorService = Executors.newCachedThreadPool();//創(chuàng)建線程組threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);//創(chuàng)建服務器通道assc = AsynchronousServerSocketChannel.open(threadGroup);//進行綁定assc.bind(new InetSocketAddress(port));System.out.println("server start , port : " + port);//進行阻塞(其實不是阻塞。)assc.accept(this, new ServerCompletionHandler());//一直阻塞 不讓服務器停止Thread.sleep(Integer.MAX_VALUE);} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {Server server = new Server(8765);} }import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> {@Overridepublic void completed(AsynchronousSocketChannel asc, Server attachment) {//當有下一個客戶端接入的時候 直接調用Server的accept方法,這樣反復執(zhí)行下去,保證多個客戶端都可以阻塞attachment.assc.accept(attachment, this);read(asc);}private void read(final AsynchronousSocketChannel asc) {//讀取數(shù)據(jù)ByteBuffer buf = ByteBuffer.allocate(1024);asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() {@Overridepublic void completed(Integer resultSize, ByteBuffer attachment) {//進行讀取之后,重置標識位attachment.flip();//獲得讀取的字節(jié)數(shù)System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)長度為:" + resultSize);//獲取讀取的數(shù)據(jù)String resultData = new String(attachment.array()).trim();System.out.println("Server -> " + "收到客戶端的數(shù)據(jù)信息為:" + resultData);String response = "服務器響應, 收到了客戶端發(fā)來的數(shù)據(jù): " + resultData;write(asc, response);}@Overridepublic void failed(Throwable exc, ByteBuffer attachment) {exc.printStackTrace();}});}private void write(AsynchronousSocketChannel asc, String response) {try {ByteBuffer buf = ByteBuffer.allocate(1024);buf.put(response.getBytes());buf.flip();asc.write(buf).get();//.get()表示異步} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}@Overridepublic void failed(Throwable exc, Server attachment) {exc.printStackTrace();} }?
總結
以上是生活随笔為你收集整理的谈谈java的bio、nio、aio模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 谈谈java并发锁(重入锁、读写锁、公平
- 下一篇: java使用sigar监控服务器