大数据 NIO
NIO
一、基礎回顧
a 、 進程與線程
b、 Socket
二、 NIO
Buffer子類 ByteBuffer
代碼1
import java.nio.ByteBuffer;public class BufferDemo {public static void main(String[] args) { // //創建緩沖區 , 并指定了大小為1024個字節//當創建好緩沖區的時候 , 就有了一下屬性//1. capacity 容量位 --- 表示緩沖區容量//2. position 操作位 --- 表示要操作的位置 ---- 當緩沖區剛剛創建的時候 , 操作位默認為0 , 每添加一個字節的數據 , position就會向后挪一位//3. limit 限制位 ---- 表示position 所能達到的最大位置 --- 當緩沖區剛剛創建的時候 , limit就是容量位 。//獲取數據時 , 默認是從操作位開始獲取的 // ByteBuffer buffer = ByteBuffer.allocate(1024);//最多能存放1k數據 // //向緩沖區添加數據 // buffer.put("hello".getBytes());//以上方法存在資源浪費//******************************************************* // //在已知具體數據的情況下 , 建議使用這種方法創建緩沖區//使用wrap方式創建緩沖區 , 參數實際上是一個字節數組 , 底層實際上就是將參數字節數組復制給底層的實際存儲數據的數組 , 此時操作位并沒有改變還是0 //為什么是數組使用復制 , 而不是直接使用賦值?//保持數據的不變和唯一 // ByteBuffer buffer = ByteBuffer.wrap("hello".getBytes());//創建與數據大小相對應的緩沖區 // // //獲取數據 , 每一次獲取 , 只能獲取一個字節byte b = buffer.get();System.out.println(b); // // //獲取緩沖區所有數據 // while(buffer.hasRemaining()) {//判斷是否還有剩余數據 // // byte b = buffer.get(); // System.out.println(b); // }//*******************************************************//但是使用固定緩沖區大小的情況下獲取數據會出現獲取到0的情況 , 需要將默認的操作位歸0 , 并且讀取到有效數據結束即可 ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());//遍歷方法一 : 記錄操作位位置后循環遍歷 // int position = buffer.position(); // for(int i = 0 ;i < position ; i++) { // System.out.println(buffer.get(i)); // }//遍歷方法二: 設置限制位為操作位后 , 操作位歸0 遍歷 // buffer.limit(buffer.position()); // buffer.position(0); // while(buffer.hasRemaining()) { // System.out.println(buffer.get()); // }//遍歷方法三: 反轉緩沖區//先將限制位設置為當前的操作位 , 然后把操作位歸0buffer.flip(); // buffer.hasRemaining()該方法 本質上就是判斷操作位是否小于限制位while(buffer.hasRemaining()) {System.out.println(buffer.get());}//獲取緩沖區中的底層數組byte[] array = buffer.array();//底層也是使用的數組復制 , 返回的是整個底層數組 , 而不是有效數據System.out.println(new String(array , 0 , buffer.position()));//如果使用過反轉buffer.flip();System.out.println(new String(array , 0 , buffer.limit()));} }代碼2
import java.nio.ByteBuffer;public class BufferDemo2 {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);buffer.put("hello".getBytes());System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit()); // buffer.flip(); // System.out.println("操作位:"+ buffer.position()); // System.out.println("限制位:"+ buffer.limit());//重繞緩沖區buffer.rewind(); //作用: 將操作位歸0 , 限制位不變 。 System.out.println("操作位:"+ buffer.position());System.out.println("限制位:"+ buffer.limit());} }Channel 通道
SocketChannel
代碼示例:
客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel;public class SocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開通道//SocketChannel 默認為阻塞連接 此時和Socket基本一樣SocketChannel s = SocketChannel.open();//設置SoceketChannel為非阻塞的s.configureBlocking(false);//發起連接s.connect(new InetSocketAddress("localhost", 8090));//由于SoceketChannel為非阻塞的 , 所以不能保證連接的真正建立//在實際開發中往往會認為的設置阻塞 , 來保證連接的建立//判斷連接是否成功 , 如果沒有連接成功finishConnect()底層會試圖再次建立連接//如果多次試圖連接沒有成功 , 則報錯while(!s.finishConnect()) ;//寫出數據s.write(ByteBuffer.wrap("hello".getBytes()));//獲取服務器端的響應Thread.sleep(100);ByteBuffer b = ByteBuffer.allocate(100);s.read(b);b.flip();System.out.println(new String(b.array() , 0 , b.limit())); // 關閉通道 s.close();}服務端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel;public class ServerSocketChannelDemo {public static void main(String[] args) throws IOException, InterruptedException {//打開服務器通道ServerSocketChannel s= ServerSocketChannel.open();//綁定偵聽的端口s.bind(new InetSocketAddress( 8090));//設置非阻塞s.configureBlocking(false);//接收連接SocketChannel accept = s.accept();//由于ServerSocketChannel是非阻塞的 , 所以可能出現還沒有客戶端聯入 但是服務器已經結束的現象//所以需要人為的設置為阻塞的 。 while(accept == null) {accept = s.accept();}//將socketChannel設置為非阻塞accept.configureBlocking(false);//讀取數據ByteBuffer buffer = ByteBuffer.allocate(100);accept.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//向客戶端做出響應accept.write(ByteBuffer.wrap("服務器端接收成功!".getBytes()));Thread.sleep(1000);//如果不加延時 , 服務器端寫出數據立即結束 , 此時客戶端還沒有接收完數據會報錯} }通道特點 :
Selector 選擇器
代碼:
客戶端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;public class ClientDemo {public static void main(String[] args) throws IOException {//打開客戶端的通道SocketChannel sc = SocketChannel.open();//設置為非阻塞sc.configureBlocking(false);//獲取選擇器Selector selc = Selector.open();//將通道注冊到選擇器上sc.register(selc, SelectionKey.OP_CONNECT);//并給予連接權限//發起連接sc.connect(new InetSocketAddress("localhost", 8080));while(true) {//進行選擇 , 篩選出有用的連接selc.select();//獲取篩選之后有用的事件Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> iterator = keys.iterator();while(iterator.hasNext()) {//將遍歷到的事件讀取出來SelectionKey next = iterator.next();//可能向服務器發起連接//可能向服務器寫數據//可能接收服務器的數據if(next.isConnectable()) {//判斷是否是一個連接事件//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//判斷之前的連接是否成功while(!scx.finishConnect());//連接成功之后 進行讀寫操作scx.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(next.isWritable()) {//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//寫數據scx.write(ByteBuffer.wrap("讀取數據成功!".getBytes()));//執行完寫操作之后 , 需要將這個通道的寫權限注銷掉 ,防止不停地向服務器寫數據scx.register(selc, next.interestOps() ^ SelectionKey.OP_WRITE);//可用^ 或& ~}if(next.isReadable()) {//從該事件中獲取到對應的通道SocketChannel scx = (SocketChannel) next.channel();//讀數據ByteBuffer buffer = ByteBuffer.allocate(100);scx.read(buffer);buffer.flip();System.out.println(new String(buffer.array() , 0 , buffer.limit()));//移除可讀事件scx.register(selc, next.interestOps() & ~SelectionKey.OP_READ);}//為了防止事件移除失敗 , 處理完成后將事件移除iterator.remove();}}} }服務器端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;import javax.swing.plaf.SliderUI;public class ServerDemo {public static void main(String[] args) throws IOException {//打開服務器端通道ServerSocketChannel ssc = ServerSocketChannel.open();//綁定偵聽的端口號ssc.bind(new InetSocketAddress(8080));//接收任何IP客戶端8080端口傳來的數據//將通道設置為非阻塞ssc.configureBlocking(false);//將服務器注冊到選擇器上Selector selc = Selector.open();//為服務器注冊一個接受請求的權限ssc.register(selc, SelectionKey.OP_ACCEPT);while(true) {//進行選擇selc.select();//將選擇后的事件獲取出來Set<SelectionKey> keys = selc.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {//獲取這個事件SelectionKey key = it.next();//可能是接受連接事件//可能是可讀事件//可能是可寫事件if(key.isAcceptable()) {//獲取事件的通道ServerSocketChannel sscx = (ServerSocketChannel) key.channel();//接受連接SocketChannel sc = sscx.accept();while(sc == null) {sscx.accept();}//設置為非阻塞sc.configureBlocking(false);//注冊一個可讀事件sc.register(selc, SelectionKey.OP_READ | SelectionKey.OP_WRITE);}if(key.isReadable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//讀取數據ByteBuffer buffer = ByteBuffer.allocate(100); scx.read(buffer);buffer.flip();System.out.println(new String (buffer.array() , 0 , buffer.limit()));//消除可讀事件scx.register(selc, key.interestOps() ^ SelectionKey.OP_READ);}if(key.isWritable()) {//獲取事件的通道SocketChannel scx = (SocketChannel) key.channel();//寫出數據scx.write(ByteBuffer.wrap("hello".getBytes()));//消除可以寫事件scx.register(selc, key.interestOps() & ~SelectionKey.OP_WRITE);}it.remove();}}} }三 、 考慮 : 數據粘包怎么處理?
總結
- 上一篇: Java 利用InetAddress类确
- 下一篇: Linux_09 Linux软件包管理