中间件系列「三」netty之NIO基础
Java NIO系統(tǒng)的核心在于:通道(Channel)和緩沖區(qū)(Buffer)。通道表示打開到 IO 設(shè)備(例如:文件、套接字)的連接。若需要使用 NIO 系統(tǒng),需要獲取用于連接 IO 設(shè)備的通道以及用于容納數(shù)據(jù)的緩沖區(qū)。然后操作緩沖區(qū),對(duì)數(shù)據(jù)進(jìn)行處理
簡(jiǎn)而言之,通道負(fù)責(zé)傳輸,緩沖區(qū)負(fù)責(zé)存儲(chǔ)
常見的Channel有以下四種,其中FileChannel主要用于文件傳輸,其余三種用于網(wǎng)絡(luò)通信
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Buffer有以下幾種,其中使用較多的是ByteBuffer
- ByteBuffer
- MappedByteBuffer
- DirectByteBuffer
- HeapByteBuffer
- ShortBuffer
- IntBuffer
- LongBuffer
- FloatBuffer
- DoubleBuffer
- CharBuffer
1、Selector
在使用Selector之前,處理socket連接還有以下兩種方法
使用多線程技術(shù)
為每個(gè)連接分別開辟一個(gè)線程,分別去處理對(duì)應(yīng)的socke連接
這種方法存在以下幾個(gè)問題
- 內(nèi)存占用高
- 每個(gè)線程都需要占用一定的內(nèi)存,當(dāng)連接較多時(shí),會(huì)開辟大量線程,導(dǎo)致占用大量?jī)?nèi)存
- 線程上下文切換成本高
- 只適合連接數(shù)少的場(chǎng)景
- 連接數(shù)過多,會(huì)導(dǎo)致創(chuàng)建很多線程,從而出現(xiàn)問題
使用線程池技術(shù)
使用線程池,讓線程池中的線程去處理連接
這種方法存在以下幾個(gè)問題
- 阻塞模式下,線程僅能處理一個(gè)連接
- 線程池中的線程獲取任務(wù)(task)后,只有當(dāng)其執(zhí)行完任務(wù)之后(斷開連接后),才會(huì)去獲取并執(zhí)行下一個(gè)任務(wù)
- 若socke連接一直未斷開,則其對(duì)應(yīng)的線程無法處理其他socke連接
- 僅適合短連接場(chǎng)景
- 短連接即建立連接發(fā)送請(qǐng)求并響應(yīng)后就立即斷開,使得線程池中的線程可以快速處理其他連接
使用選擇器(這也叫響應(yīng)式編程)
selector 的作用就是配合一個(gè)線程來管理多個(gè) channel(fileChannel因?yàn)槭亲枞降?#xff0c;所以無法使用selector),獲取這些 channel 上發(fā)生的事件,這些 channel 工作在非阻塞模式下,當(dāng)一個(gè)channel中沒有執(zhí)行任務(wù)時(shí),可以去執(zhí)行其他channel中的任務(wù)。適合連接數(shù)多,但流量較少的場(chǎng)景
若事件未就緒,調(diào)用 selector 的 select() 方法會(huì)阻塞線程,直到 channel 發(fā)生了就緒事件。這些事件就緒后,select 方法就會(huì)返回這些事件交給 thread 來處理
2、ByteBuffer
使用方式
- 向 buffer 寫入數(shù)據(jù),例如調(diào)用 channel.read(buffer)
- 調(diào)用 flip() 切換至讀模式
- flip會(huì)使得buffer中的limit變?yōu)閜osition,position變?yōu)?
- 從 buffer 讀取數(shù)據(jù),例如調(diào)用 buffer.get()
- 調(diào)用 clear() 或者compact()切換至寫模式
- 調(diào)用clear()方法時(shí)position=0,limit變?yōu)閏apacity
- 調(diào)用compact()方法時(shí),會(huì)將緩沖區(qū)中的未讀數(shù)據(jù)壓縮到緩沖區(qū)前面
- 重復(fù)以上步驟
使用ByteBuffer讀取文件中的內(nèi)容
public class TestByteBuffer {public static void main(String[] args) {// 獲得FileChanneltry (FileChannel channel = new FileInputStream("stu.txt").getChannel()) {// 獲得緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(10);int hasNext = 0;StringBuilder builder = new StringBuilder();while((hasNext = channel.read(buffer)) > 0) {// 切換模式 limit=position, position=0buffer.flip();// 當(dāng)buffer中還有數(shù)據(jù)時(shí),獲取其中的數(shù)據(jù)while(buffer.hasRemaining()) {builder.append((char)buffer.get());}// 切換模式 position=0, limit=capacitybuffer.clear();}System.out.println(builder.toString());} catch (IOException e) {}} }核心屬性
字節(jié)緩沖區(qū)的父類Buffer中有幾個(gè)核心屬性,如下
// Invariants: mark <= position <= limit <= capacity private int mark = -1; private int position = 0; private int limit; private int capacity;- capacity:緩沖區(qū)的容量。通過構(gòu)造函數(shù)賦予,一旦設(shè)置,無法更改
- limit:緩沖區(qū)的界限。位于limit 后的數(shù)據(jù)不可讀寫。緩沖區(qū)的限制不能為負(fù),并且不能大于其容量
- position:下一個(gè)讀寫位置的索引(類似PC)。緩沖區(qū)的位置不能為負(fù),并且不能大于limit
- mark:記錄當(dāng)前position的值。position被改變后,可以通過調(diào)用reset() 方法恢復(fù)到mark的位置。
以上四個(gè)屬性必須滿足以下要求
mark <= position <= limit <= capacity
核心方法
put()方法
- put()方法可以將一個(gè)數(shù)據(jù)放入到緩沖區(qū)中。
- 進(jìn)行該操作后,postition的值會(huì)+1,指向下一個(gè)可以放入的位置。capacity = limit ,為緩沖區(qū)容量的值。
flip()方法
- flip()方法會(huì)切換對(duì)緩沖區(qū)的操作模式,由寫->讀 / 讀->寫
- 進(jìn)行該操作后
- 如果是寫模式->讀模式,position = 0 , limit 指向最后一個(gè)元素的下一個(gè)位置,capacity不變
- 如果是讀->寫,則恢復(fù)為put()方法中的值
get()方法
- get()方法會(huì)讀取緩沖區(qū)中的一個(gè)值
- 進(jìn)行該操作后,position會(huì)+1,如果超過了limit則會(huì)拋出異常
- 注意:get(i)方法不會(huì)改變position的值
rewind()方法
- 該方法只能在讀模式下使用
- rewind()方法后,會(huì)恢復(fù)position、limit和capacity的值,變?yōu)檫M(jìn)行g(shù)et()前的值
clean()方法
- clean()方法會(huì)將緩沖區(qū)中的各個(gè)屬性恢復(fù)為最初的狀態(tài),position = 0, capacity = limit
- 此時(shí)緩沖區(qū)的數(shù)據(jù)依然存在,處于“被遺忘”狀態(tài),下次進(jìn)行寫操作時(shí)會(huì)覆蓋這些數(shù)據(jù)
mark()和reset()方法
- mark()方法會(huì)將postion的值保存到mark屬性中
- reset()方法會(huì)將position的值改為mark中保存的值
compact()方法
此方法為ByteBuffer的方法,而不是Buffer的方法
- compact會(huì)把未讀完的數(shù)據(jù)向前壓縮,然后切換到寫模式
- 數(shù)據(jù)前移后,原位置的值并未清零,寫時(shí)會(huì)覆蓋之前的值
clear() VS compact()
clear只是對(duì)position、limit、mark進(jìn)行重置,而compact在對(duì)position進(jìn)行設(shè)置,以及l(fā)imit、mark進(jìn)行重置的同時(shí),還涉及到數(shù)據(jù)在內(nèi)存中拷貝(會(huì)調(diào)用arraycopy)。所以compact比clear更耗性能。但compact能保存你未讀取的數(shù)據(jù),將新數(shù)據(jù)追加到為讀取的數(shù)據(jù)之后;而clear則不行,若你調(diào)用了clear,則未讀取的數(shù)據(jù)就無法再讀取到了
所以需要根據(jù)情況來判斷使用哪種方法進(jìn)行模式切換
方法調(diào)用及演示
ByteBuffer調(diào)試工具類
需要先導(dǎo)入netty依賴
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.51.Final</version> </dependency> import java.nio.ByteBuffer;import io.netty.util.internal.MathUtil; import io.netty.util.internal.StringUtil; import io.netty.util.internal.MathUtil.*;/*** @author Panwen Chen* @date 2021/4/12 15:59*/ public class ByteBufferUtil {private static final char[] BYTE2CHAR = new char[256];private static final char[] HEXDUMP_TABLE = new char[256 * 4];private static final String[] HEXPADDING = new String[16];private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];private static final String[] BYTE2HEX = new String[256];private static final String[] BYTEPADDING = new String[16];static {final char[] DIGITS = "0123456789abcdef".toCharArray();for (int i = 0; i < 256; i++) {HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];}int i;// Generate the lookup table for hex dump paddingsfor (i = 0; i < HEXPADDING.length; i++) {int padding = HEXPADDING.length - i;StringBuilder buf = new StringBuilder(padding * 3);for (int j = 0; j < padding; j++) {buf.append(" ");}HEXPADDING[i] = buf.toString();}// Generate the lookup table for the start-offset header in each row (up to 64KiB).for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {StringBuilder buf = new StringBuilder(12);buf.append(StringUtil.NEWLINE);buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));buf.setCharAt(buf.length() - 9, '|');buf.append('|');HEXDUMP_ROWPREFIXES[i] = buf.toString();}// Generate the lookup table for byte-to-hex-dump conversionfor (i = 0; i < BYTE2HEX.length; i++) {BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);}// Generate the lookup table for byte dump paddingsfor (i = 0; i < BYTEPADDING.length; i++) {int padding = BYTEPADDING.length - i;StringBuilder buf = new StringBuilder(padding);for (int j = 0; j < padding; j++) {buf.append(' ');}BYTEPADDING[i] = buf.toString();}// Generate the lookup table for byte-to-char conversionfor (i = 0; i < BYTE2CHAR.length; i++) {if (i <= 0x1f || i >= 0x7f) {BYTE2CHAR[i] = '.';} else {BYTE2CHAR[i] = (char) i;}}}/*** 打印所有內(nèi)容* @param buffer*/public static void debugAll(ByteBuffer buffer) {int oldlimit = buffer.limit();buffer.limit(buffer.capacity());StringBuilder origin = new StringBuilder(256);appendPrettyHexDump(origin, buffer, 0, buffer.capacity());System.out.println("+--------+-------------------- all ------------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);System.out.println(origin);buffer.limit(oldlimit);}/*** 打印可讀取內(nèi)容* @param buffer*/public static void debugRead(ByteBuffer buffer) {StringBuilder builder = new StringBuilder(256);appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());System.out.println("+--------+-------------------- read -----------------------+----------------+");System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());System.out.println(builder);}private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {if (MathUtil.isOutOfBounds(offset, length, buf.capacity())) {throw new IndexOutOfBoundsException("expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length+ ") <= " + "buf.capacity(" + buf.capacity() + ')');}if (length == 0) {return;}dump.append(" +-------------------------------------------------+" +StringUtil.NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +StringUtil.NEWLINE + "+--------+-------------------------------------------------+----------------+");final int startIndex = offset;final int fullRows = length >>> 4;final int remainder = length & 0xF;// Dump the rows which have 16 bytes.for (int row = 0; row < fullRows; row++) {int rowStartIndex = (row << 4) + startIndex;// Per-row prefix.appendHexDumpRowPrefix(dump, row, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + 16;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(" |");// ASCII dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append('|');}// Dump the last row which has less than 16 bytes.if (remainder != 0) {int rowStartIndex = (fullRows << 4) + startIndex;appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);// Hex dumpint rowEndIndex = rowStartIndex + remainder;for (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);}dump.append(HEXPADDING[remainder]);dump.append(" |");// Ascii dumpfor (int j = rowStartIndex; j < rowEndIndex; j++) {dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);}dump.append(BYTEPADDING[remainder]);dump.append('|');}dump.append(StringUtil.NEWLINE +"+--------+-------------------------------------------------+----------------+");}private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {if (row < HEXDUMP_ROWPREFIXES.length) {dump.append(HEXDUMP_ROWPREFIXES[row]);} else {dump.append(StringUtil.NEWLINE);dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));dump.setCharAt(dump.length() - 9, '|');dump.append('|');}}public static short getUnsignedByte(ByteBuffer buffer, int index) {return (short) (buffer.get(index) & 0xFF);} } public class TestByteBuffer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(10);// 向buffer中寫入1個(gè)字節(jié)的數(shù)據(jù)buffer.put((byte)97);// 使用工具類,查看buffer狀態(tài)ByteBufferUtil.debugAll(buffer);// 向buffer中寫入4個(gè)字節(jié)的數(shù)據(jù)buffer.put(new byte[]{98, 99, 100, 101});ByteBufferUtil.debugAll(buffer);// 獲取數(shù)據(jù)buffer.flip();ByteBufferUtil.debugAll(buffer);System.out.println(buffer.get());System.out.println(buffer.get());ByteBufferUtil.debugAll(buffer);// 使用compact切換模式buffer.compact();ByteBufferUtil.debugAll(buffer);// 再次寫入buffer.put((byte)102);buffer.put((byte)103);ByteBufferUtil.debugAll(buffer);} }運(yùn)行結(jié)果
// 向緩沖區(qū)寫入了一個(gè)字節(jié)的數(shù)據(jù),此時(shí)postition為1 +--------+-------------------- all ------------------------+----------------+ position: [1], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 00 00 00 00 00 00 00 00 00 |a......... | +--------+-------------------------------------------------+----------------+// 向緩沖區(qū)寫入四個(gè)字節(jié)的數(shù)據(jù),此時(shí)position為5 +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+// 調(diào)用flip切換模式,此時(shí)position為0,表示從第0個(gè)數(shù)據(jù)開始讀取 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+ // 讀取兩個(gè)字節(jié)的數(shù)據(jù) 97 98// position變?yōu)? +--------+-------------------- all ------------------------+----------------+ position: [2], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 61 62 63 64 65 00 00 00 00 00 |abcde..... | +--------+-------------------------------------------------+----------------+// 調(diào)用compact切換模式,此時(shí)position及其后面的數(shù)據(jù)被壓縮到ByteBuffer前面去了 // 此時(shí)position為3,會(huì)覆蓋之前的數(shù)據(jù) +--------+-------------------- all ------------------------+----------------+ position: [3], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 64 65 64 65 00 00 00 00 00 |cdede..... | +--------+-------------------------------------------------+----------------+// 再次寫入兩個(gè)字節(jié)的數(shù)據(jù),之前的 0x64 0x65 被覆蓋 +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [10]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 63 64 65 66 67 00 00 00 00 00 |cdefg..... | +--------+-------------------------------------------------+----------------+方法二
編碼:通過StandardCharsets的encode方法獲得ByteBuffer,此時(shí)獲得的ByteBuffer為讀模式,無需通過flip切換模式
解碼:通過StandardCharsets的decoder方法解碼
public class Translate {public static void main(String[] args) {// 準(zhǔn)備兩個(gè)字符串String str1 = "hello";String str2 = "";// 通過StandardCharsets的encode方法獲得ByteBuffer// 此時(shí)獲得的ByteBuffer為讀模式,無需通過flip切換模式ByteBuffer buffer1 = StandardCharsets.UTF_8.encode(str1);ByteBufferUtil.debugAll(buffer1);// 將緩沖區(qū)中的數(shù)據(jù)轉(zhuǎn)化為字符串// 通過StandardCharsets解碼,獲得CharBuffer,再通過toString獲得字符串str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);ByteBufferUtil.debugAll(buffer1);} }運(yùn)行結(jié)果 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+方法三編碼:字符串調(diào)用getByte()方法獲得字節(jié)數(shù)組,將字節(jié)數(shù)組傳給ByteBuffer的wrap()方法,通過該方法獲得ByteBuffer。同樣無需調(diào)用flip方法切換為讀模式
解碼:通過StandardCharsets的decoder方法解碼
public class Translate {public static void main(String[] args) {// 準(zhǔn)備兩個(gè)字符串String str1 = "hello";String str2 = "";// 通過StandardCharsets的encode方法獲得ByteBuffer// 此時(shí)獲得的ByteBuffer為讀模式,無需通過flip切換模式ByteBuffer buffer1 = ByteBuffer.wrap(str1.getBytes());ByteBufferUtil.debugAll(buffer1);// 將緩沖區(qū)中的數(shù)據(jù)轉(zhuǎn)化為字符串// 通過StandardCharsets解碼,獲得CharBuffer,再通過toString獲得字符串str2 = StandardCharsets.UTF_8.decode(buffer1).toString();System.out.println(str2);ByteBufferUtil.debugAll(buffer1);} }運(yùn)行結(jié)果 +--------+-------------------- all ------------------------+----------------+ position: [0], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+ hello +--------+-------------------- all ------------------------+----------------+ position: [5], limit: [5]+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f | +--------+-------------------------------------------------+----------------+ |00000000| 68 65 6c 6c 6f |hello | +--------+-------------------------------------------------+----------------+
???????粘包與半包
現(xiàn)象
網(wǎng)絡(luò)上有多條數(shù)據(jù)發(fā)送給服務(wù)端,數(shù)據(jù)之間使用 \n 進(jìn)行分隔
但由于某種原因這些數(shù)據(jù)在接收時(shí),被進(jìn)行了重新組合,例如原始數(shù)據(jù)有3條為
- Hello,world\n
- I’m Nyima\n
- How are you?\n
變成了下面的兩個(gè) byteBuffer (粘包,半包)
- Hello,world\nI’m Nyima\nHo
- w are you?\n
出現(xiàn)原因
粘包
發(fā)送方在發(fā)送數(shù)據(jù)時(shí),并不是一條一條地發(fā)送數(shù)據(jù),而是將數(shù)據(jù)整合在一起,當(dāng)數(shù)據(jù)達(dá)到一定的數(shù)量后再一起發(fā)送。這就會(huì)導(dǎo)致多條信息被放在一個(gè)緩沖區(qū)中被一起發(fā)送出去
半包
接收方的緩沖區(qū)的大小是有限的,當(dāng)接收方的緩沖區(qū)滿了以后,就需要將信息截?cái)?/strong>,等緩沖區(qū)空了以后再繼續(xù)放入數(shù)據(jù)。這就會(huì)發(fā)生一段完整的數(shù)據(jù)最后被截?cái)嗟默F(xiàn)象
解決辦法
- 通過get(index)方法遍歷ByteBuffer,遇到分隔符時(shí)進(jìn)行處理。注意:get(index)不會(huì)改變position的值
- 記錄該段數(shù)據(jù)長(zhǎng)度,以便于申請(qǐng)對(duì)應(yīng)大小的緩沖區(qū)
- 將緩沖區(qū)的數(shù)據(jù)通過get()方法寫入到target中
- 調(diào)用compact方法切換模式,因?yàn)榫彌_區(qū)中可能還有未讀的數(shù)據(jù)
二、文件編程
1、FileChannel
工作模式
FileChannel只能在阻塞模式下工作,所以無法搭配Selector
獲取
不能直接打開 FileChannel,必須通過 FileInputStream、FileOutputStream 或者 RandomAccessFile 來獲取 FileChannel,它們都有 getChannel 方法
- 通過 FileInputStream 獲取的 channel?只能讀
- 通過 FileOutputStream 獲取的 channel?只能寫
- 通過 RandomAccessFile 是否能讀寫根據(jù)構(gòu)造 RandomAccessFile 時(shí)的讀寫模式?jīng)Q定
讀取
通過 FileInputStream 獲取channel,通過read方法將數(shù)據(jù)寫入到ByteBuffer中
read方法的返回值表示讀到了多少字節(jié),若讀到了文件末尾則返回-1
int readBytes = channel.read(buffer);可根據(jù)返回值判斷是否讀取完畢Copy while(channel.read(buffer) > 0) {// 進(jìn)行對(duì)應(yīng)操作... }Copy寫入
因?yàn)閏hannel也是有大小的,所以 write 方法并不能保證一次將 buffer 中的內(nèi)容全部寫入 channel。必須需要按照以下規(guī)則進(jìn)行寫入
// 通過hasRemaining()方法查看緩沖區(qū)中是否還有數(shù)據(jù)未寫入到通道中 while(buffer.hasRemaining()) {channel.write(buffer); }Copy關(guān)閉
通道需要close,一般情況通過try-with-resource進(jìn)行關(guān)閉,最好使用以下方法獲取strea以及channel,避免某些原因使得資源未被關(guān)閉
public class TestChannel {public static void main(String[] args) throws IOException {try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {// 執(zhí)行對(duì)應(yīng)操作...}} }position
channel也擁有一個(gè)保存讀取數(shù)據(jù)位置的屬性,即position
long pos = channel.position();可以通過position(int pos)設(shè)置channel中position的值 long newPos = ...; channel.position(newPos);設(shè)置當(dāng)前位置時(shí),如果設(shè)置為文件的末尾
- 這時(shí)讀取會(huì)返回 -1
- 這時(shí)寫入,會(huì)追加內(nèi)容,但要注意如果 position 超過了文件末尾,再寫入時(shí)在新內(nèi)容和原末尾之間會(huì)有空洞(00)
強(qiáng)制寫入
操作系統(tǒng)出于性能的考慮,會(huì)將數(shù)據(jù)緩存,不是立刻寫入磁盤,而是等到緩存滿了以后將所有數(shù)據(jù)一次性的寫入磁盤。可以調(diào)用?force(true)?方法將文件內(nèi)容和元數(shù)據(jù)(文件的權(quán)限等信息)立刻寫入磁盤
2、兩個(gè)Channel傳輸數(shù)據(jù)
transferTo方法
使用transferTo方法可以快速、高效地將一個(gè)channel中的數(shù)據(jù)傳輸?shù)搅硪粋€(gè)channel中,但一次只能傳輸2G的內(nèi)容
transferTo底層使用了零拷貝技術(shù)
public class TestChannel {public static void main(String[] args){try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {// 參數(shù):inputChannel的起始位置,傳輸數(shù)據(jù)的大小,目的channel// 返回值為傳輸?shù)臄?shù)據(jù)的字節(jié)數(shù)// transferTo一次只能傳輸2G的數(shù)據(jù)inputChannel.transferTo(0, inputChannel.size(), outputChannel);} catch (IOException e) {e.printStackTrace();}} }當(dāng)傳輸?shù)奈募?strong>大于2G時(shí),需要使用以下方法進(jìn)行多次傳輸
public class TestChannel {public static void main(String[] args){try (FileInputStream fis = new FileInputStream("stu.txt");FileOutputStream fos = new FileOutputStream("student.txt");FileChannel inputChannel = fis.getChannel();FileChannel outputChannel = fos.getChannel()) {long size = inputChannel.size();long capacity = inputChannel.size();// 分多次傳輸while (capacity > 0) {// transferTo返回值為傳輸了的字節(jié)數(shù)capacity -= inputChannel.transferTo(size-capacity, capacity, outputChannel);}} catch (IOException e) {e.printStackTrace();}} }3、Path與Paths
- Path 用來表示文件路徑
- Paths 是工具類,用來獲取 Path 實(shí)例
- .?代表了當(dāng)前路徑
- ..?代表了上一級(jí)路徑
例如目錄結(jié)構(gòu)如下
d:|- data|- projects|- a|- b代碼
Path path = Paths.get("d:\\data\\projects\\a\\..\\b"); System.out.println(path); System.out.println(path.normalize()); // 正常化路徑 會(huì)去除 . 以及 ..輸出結(jié)果為
d:\data\projects\a\..\b d:\data\projects\b4、Files
查找
檢查文件是否存在
Path path = Paths.get("helloword/data.txt"); System.out.println(Files.exists(path));創(chuàng)建
創(chuàng)建一級(jí)目錄
Path path = Paths.get("helloword/d1"); Files.createDirectory(path);- 如果目錄已存在,會(huì)拋異常 FileAlreadyExistsException
- 不能一次創(chuàng)建多級(jí)目錄,否則會(huì)拋異常 NoSuchFileException
創(chuàng)建多級(jí)目錄用
Path path = Paths.get("helloword/d1/d2"); Files.createDirectories(path);Copy拷貝及移動(dòng)拷貝文件
Path source = Paths.get("helloword/data.txt"); Path target = Paths.get("helloword/target.txt");Files.copy(source, target);Copy- 如果文件已存在,會(huì)拋異常 FileAlreadyExistsException
如果希望用 source?覆蓋掉 target,需要用 StandardCopyOption 來控制
Files.copy(source, target, StandardCopyOption.REPLACE_EXISTING);Copy移動(dòng)文件
Path source = Paths.get("helloword/data.txt"); Path target = Paths.get("helloword/data.txt");Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);StandardCopyOption.ATOMIC_MOVE 保證文件移動(dòng)的原子性刪除
刪除文件
Path target = Paths.get("helloword/target.txt");三、網(wǎng)絡(luò)編程
1、阻塞
- 阻塞模式下,相關(guān)方法都會(huì)導(dǎo)致線程暫停
- ServerSocketChannel.accept 會(huì)在沒有連接建立時(shí)讓線程暫停
- SocketChannel.read 會(huì)在通道中沒有數(shù)據(jù)可讀時(shí)讓線程暫停
- 阻塞的表現(xiàn)其實(shí)就是線程暫停了,暫停期間不會(huì)占用 cpu,但線程相當(dāng)于閑置
- 單線程下,阻塞方法之間相互影響,幾乎不能正常工作,需要多線程支持
- 但多線程下,有新的問題,體現(xiàn)在以下方面
- 32 位 jvm 一個(gè)線程 320k,64 位 jvm 一個(gè)線程 1024k,如果連接數(shù)過多,必然導(dǎo)致 OOM,并且線程太多,反而會(huì)因?yàn)轭l繁上下文切換導(dǎo)致性能降低
- 可以采用線程池技術(shù)來減少線程數(shù)和線程上下文切換,但治標(biāo)不治本,如果有很多連接建立,但長(zhǎng)時(shí)間 inactive,會(huì)阻塞線程池中所有線程,因此不適合長(zhǎng)連接,只適合短連接
服務(wù)端代碼
public class Server {public static void main(String[] args) {// 創(chuàng)建緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務(wù)器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 為服務(wù)器通道綁定端口server.bind(new InetSocketAddress(8080));// 用戶存放連接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循環(huán)接收連接while (true) {System.out.println("before connecting...");// 沒有連接時(shí),會(huì)阻塞線程SocketChannel socketChannel = server.accept();System.out.println("after connecting...");channels.add(socketChannel);// 循環(huán)遍歷集合中的連接for(SocketChannel channel : channels) {System.out.println("before reading");// 處理通道中的數(shù)據(jù)// 當(dāng)通道中沒有數(shù)據(jù)可讀時(shí),會(huì)阻塞線程channel.read(buffer);buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}} catch (IOException e) {e.printStackTrace();}} }?客戶端代碼
public class Client {public static void main(String[] args) {try (SocketChannel socketChannel = SocketChannel.open()) {// 建立連接socketChannel.connect(new InetSocketAddress("localhost", 8080));System.out.println("waiting...");} catch (IOException e) {e.printStackTrace();}} }運(yùn)行結(jié)果
- 客戶端-服務(wù)器建立連接前:服務(wù)器端因accept阻塞
- 客戶端-服務(wù)器建立連接后,客戶端發(fā)送消息前:服務(wù)器端因通道為空被阻塞
- 客戶端發(fā)送數(shù)據(jù)后,服務(wù)器處理通道中的數(shù)據(jù)。再次進(jìn)入循環(huán)時(shí),再次被accept阻塞
- 之前的客戶端再次發(fā)送消息,服務(wù)器端因?yàn)楸籥ccept阻塞,無法處理之前客戶端發(fā)送到通道中的信息
2、非阻塞
-
可以通過ServerSocketChannel的configureBlocking(false)方法將獲得連接設(shè)置為非阻塞的。此時(shí)若沒有連接,accept會(huì)返回null
-
可以通過SocketChannel的configureBlocking(false)方法將從通道中讀取數(shù)據(jù)設(shè)置為非阻塞的。若此時(shí)通道中沒有數(shù)據(jù)可讀,read會(huì)返回-1
服務(wù)器代碼如下
public class Server {public static void main(String[] args) {// 創(chuàng)建緩沖區(qū)ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務(wù)器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {// 為服務(wù)器通道綁定端口server.bind(new InetSocketAddress(8080));// 用戶存放連接的集合ArrayList<SocketChannel> channels = new ArrayList<>();// 循環(huán)接收連接while (true) {// 設(shè)置為非阻塞模式,沒有連接時(shí)返回null,不會(huì)阻塞線程server.configureBlocking(false);SocketChannel socketChannel = server.accept();// 通道不為空時(shí)才將連接放入到集合中if (socketChannel != null) {System.out.println("after connecting...");channels.add(socketChannel);}// 循環(huán)遍歷集合中的連接for(SocketChannel channel : channels) {// 處理通道中的數(shù)據(jù)// 設(shè)置為非阻塞模式,若通道中沒有數(shù)據(jù),會(huì)返回0,不會(huì)阻塞線程channel.configureBlocking(false);int read = channel.read(buffer);if(read > 0) {buffer.flip();ByteBufferUtil.debugRead(buffer);buffer.clear();System.out.println("after reading");}}}} catch (IOException e) {e.printStackTrace();}} }這樣寫存在一個(gè)問題,因?yàn)樵O(shè)置為了非阻塞,會(huì)一直執(zhí)行while(true)中的代碼,CPU一直處于忙碌狀態(tài),會(huì)使得性能變低,所以實(shí)際情況中不使用這種方法處理請(qǐng)求
3、Selector
多路復(fù)用
單線程可以配合 Selector 完成對(duì)多個(gè) Channel 可讀寫事件的監(jiān)控,這稱之為多路復(fù)用
- 多路復(fù)用僅針對(duì)網(wǎng)絡(luò) IO,普通文件 IO?無法利用多路復(fù)用
- 如果不用 Selector 的非阻塞模式,線程大部分時(shí)間都在做無用功,而 Selector 能夠保證
- 有可連接事件時(shí)才去連接
- 有可讀事件才去讀取
- 有可寫事件才去寫入
- 限于網(wǎng)絡(luò)傳輸能力,Channel 未必時(shí)時(shí)可寫,一旦 Channel 可寫,會(huì)觸發(fā) Selector 的可寫事件
4、使用及Accpet事件
要使用Selector實(shí)現(xiàn)多路復(fù)用,服務(wù)端代碼如下改進(jìn)
public class SelectServer {public static void main(String[] args) {ByteBuffer buffer = ByteBuffer.allocate(16);// 獲得服務(wù)器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 創(chuàng)建選擇器Selector selector = Selector.open();// 通道必須設(shè)置為非阻塞模式server.configureBlocking(false);// 將通道注冊(cè)到選擇器中,并設(shè)置感興趣的事件server.register(selector, SelectionKey.OP_ACCEPT);while (true) {// 若沒有事件就緒,線程會(huì)被阻塞,反之不會(huì)被阻塞。從而避免了CPU空轉(zhuǎn)// 返回值為就緒的事件個(gè)數(shù)int ready = selector.select();System.out.println("selector ready counts : " + ready);// 獲取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍歷事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判斷key的類型if(key.isAcceptable()) {// 獲得key對(duì)應(yīng)的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 獲取連接并處理,而且是必須處理,否則需要取消SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 處理完畢后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}} }步驟解析
- 獲得選擇器Selector
- 將通道設(shè)置為非阻塞模式,并注冊(cè)到選擇器中,并設(shè)置感興趣的事件
- channel 必須工作在非阻塞模式
- FileChannel 沒有非阻塞模式,因此不能配合 selector 一起使用
- 綁定的事件類型可以有
- connect - 客戶端連接成功時(shí)觸發(fā)
- accept - 服務(wù)器端成功接受連接時(shí)觸發(fā)
- read - 數(shù)據(jù)可讀入時(shí)觸發(fā),有因?yàn)榻邮漳芰θ?#xff0c;數(shù)據(jù)暫不能讀入的情況
- write - 數(shù)據(jù)可寫出時(shí)觸發(fā),有因?yàn)榘l(fā)送能力弱,數(shù)據(jù)暫不能寫出的情況
-
通過Selector監(jiān)聽事件,并獲得就緒的通道個(gè)數(shù),若沒有通道就緒,線程會(huì)被阻塞
-
阻塞直到綁定事件發(fā)生
int count = selector.select();阻塞直到綁定事件發(fā)生,或是超時(shí)(時(shí)間單位為 ms)???????
-
- int count = selector.select(long timeout);
-
不會(huì)阻塞,也就是不管有沒有事件,立刻返回,自己根據(jù)返回值檢查是否有事件
int count = selector.selectNow();
-
獲取就緒事件并得到對(duì)應(yīng)的通道,然后進(jìn)行處理
事件發(fā)生后,要么處理,要么取消(cancel),不能什么都不做,否則下次該事件仍會(huì)觸發(fā),這是因?yàn)?nio 底層使用的是水平觸發(fā)
Level_triggered(水平觸發(fā)):當(dāng)被監(jiān)控的文件描述符上有可讀寫事件發(fā)生時(shí),epoll_wait()會(huì)通知處理程序去讀寫。如果這次沒有把數(shù)據(jù)一次性全部讀寫完(如讀寫緩沖區(qū)太小),那么下次調(diào)用 epoll_wait()時(shí),它還會(huì)通知你在上沒讀寫完的文件描述符上繼續(xù)讀寫,當(dāng)然如果你一直不去讀寫,它會(huì)一直通知你!!!如果系統(tǒng)中有大量你不需要讀寫的就緒文件描述符,而它們每次都會(huì)返回,這樣會(huì)大大降低處理程序檢索自己關(guān)心的就緒文件描述符的效率!!!
Edge_triggered(邊緣觸發(fā)):當(dāng)被監(jiān)控的文件描述符上有可讀寫事件發(fā)生時(shí),epoll_wait()會(huì)通知處理程序去讀寫。如果這次沒有把數(shù)據(jù)全部讀寫完(如讀寫緩沖區(qū)太小),那么下次調(diào)用epoll_wait()時(shí),它不會(huì)通知你,也就是它只會(huì)通知你一次,直到該文件描述符上出現(xiàn)第二次可讀寫事件才會(huì)通知你!!!這種模式比水平觸發(fā)效率高,系統(tǒng)不會(huì)充斥大量你不關(guān)心的就緒文件描述符!!!
5、Read事件
- 在Accept事件中,若有客戶端與服務(wù)器端建立了連接,需要將其對(duì)應(yīng)的SocketChannel設(shè)置為非阻塞,并注冊(cè)到選擇其中
- 添加Read事件,觸發(fā)后進(jìn)行讀取操作
???????刪除事件
當(dāng)處理完一個(gè)事件后,一定要調(diào)用迭代器的remove方法移除對(duì)應(yīng)事件,否則會(huì)出現(xiàn)錯(cuò)誤。原因如下
以我們上面的?Read事件?的代碼為例
-
當(dāng)調(diào)用了 server.register(selector, SelectionKey.OP_ACCEPT)后,Selector中維護(hù)了一個(gè)集合,用于存放SelectionKey以及其對(duì)應(yīng)的通道
// WindowsSelectorImpl 中的 SelectionKeyImpl數(shù)組 private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[8]; public class SelectionKeyImpl extends AbstractSelectionKey {// Key對(duì)應(yīng)的通道final SelChImpl channel;... } -
當(dāng)選擇器中的通道對(duì)應(yīng)的事件發(fā)生后,selecionKey會(huì)被放到另一個(gè)集合中,但是selecionKey不會(huì)自動(dòng)移除,所以需要我們?cè)谔幚硗暌粋€(gè)事件后,通過迭代器手動(dòng)移除其中的selecionKey。否則會(huì)導(dǎo)致已被處理過的事件再次被處理,就會(huì)引發(fā)錯(cuò)誤
-
斷開處理
當(dāng)客戶端與服務(wù)器之間的連接斷開時(shí),會(huì)給服務(wù)器端發(fā)送一個(gè)讀事件,對(duì)異常斷開和正常斷開需要加以不同的方式進(jìn)行處理
-
正常斷開
-
正常斷開時(shí),服務(wù)器端的channel.read(buffer)方法的返回值為-1,所以當(dāng)結(jié)束到返回值為-1時(shí),需要調(diào)用key的cancel方法取消此事件,并在取消后移除該事件
int read = channel.read(buffer); // 斷開連接時(shí),客戶端會(huì)向服務(wù)器發(fā)送一個(gè)寫事件,此時(shí)read的返回值為-1 if(read == -1) {// 取消該事件的處理key.cancel();channel.close(); } else {... } // 取消或者處理,都需要移除key iterator.remove();異常斷開
-
-
- 異常斷開時(shí),會(huì)拋出IOException異常, 在try-catch的catch塊中捕獲異常并調(diào)用key的cancel方法即可
消息邊界
不處理消息邊界存在的問題
將緩沖區(qū)的大小設(shè)置為4個(gè)字節(jié),發(fā)送2個(gè)漢字(你好),通過decode解碼并打印時(shí),會(huì)出現(xiàn)亂碼
ByteBuffer buffer = ByteBuffer.allocate(4); // 解碼并打印 System.out.println(StandardCharsets.UTF_8.decode(buffer)); 你� ��這是因?yàn)閁TF-8字符集下,1個(gè)漢字占用3個(gè)字節(jié),此時(shí)緩沖區(qū)大小為4個(gè)字節(jié),一次讀時(shí)間無法處理完通道中的所有數(shù)據(jù),所以一共會(huì)觸發(fā)兩次讀事件。這就導(dǎo)致?你好?的?好?字被拆分為了前半部分和后半部分發(fā)送,解碼時(shí)就會(huì)出現(xiàn)問題
處理消息邊界
傳輸?shù)奈谋究赡苡幸韵氯N情況
- 文本大于緩沖區(qū)大小
- 此時(shí)需要將緩沖區(qū)進(jìn)行擴(kuò)容
- 發(fā)生半包現(xiàn)象
- 發(fā)生粘包現(xiàn)象
解決思路大致有以下三種
- 固定消息長(zhǎng)度,數(shù)據(jù)包大小一樣,服務(wù)器按預(yù)定長(zhǎng)度讀取,當(dāng)發(fā)送的數(shù)據(jù)較少時(shí),需要將數(shù)據(jù)進(jìn)行填充,直到長(zhǎng)度與消息規(guī)定長(zhǎng)度一致。缺點(diǎn)是浪費(fèi)帶寬
- 另一種思路是按分隔符拆分,缺點(diǎn)是效率低,需要一個(gè)一個(gè)字符地去匹配分隔符
- TLV 格式,即 Type 類型、Length 長(zhǎng)度、Value 數(shù)據(jù)(也就是在消息開頭用一些空間存放后面數(shù)據(jù)的長(zhǎng)度),如HTTP請(qǐng)求頭中的Content-Type與Content-Length。類型和長(zhǎng)度已知的情況下,就可以方便獲取消息大小,分配合適的 buffer,缺點(diǎn)是 buffer 需要提前分配,如果內(nèi)容過大,則影響 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
下文的消息邊界處理方式為第二種:按分隔符拆分
附件與擴(kuò)容
Channel的register方法還有第三個(gè)參數(shù):附件,可以向其中放入一個(gè)Object類型的對(duì)象,該對(duì)象會(huì)與登記的Channel以及其對(duì)應(yīng)的SelectionKey綁定,可以從SelectionKey獲取到對(duì)應(yīng)通道的附件
public final SelectionKey register(Selector sel, int ops, Object att)可通過SelectionKey的attachment()方法獲得附件 ByteBuffer buffer = (ByteBuffer) key.attachment();我們需要在Accept事件發(fā)生后,將通道注冊(cè)到Selector中時(shí),對(duì)每個(gè)通道添加一個(gè)ByteBuffer附件,讓每個(gè)通道發(fā)生讀事件時(shí)都使用自己的通道,避免與其他通道發(fā)生沖突而導(dǎo)致問題
// 設(shè)置為非阻塞模式,同時(shí)將連接的通道也注冊(cè)到選擇其中,同時(shí)設(shè)置附件 socketChannel.configureBlocking(false); ByteBuffer buffer = ByteBuffer.allocate(16); // 添加通道對(duì)應(yīng)的Buffer附件 socketChannel.register(selector, SelectionKey.OP_READ, buffer);當(dāng)Channel中的數(shù)據(jù)大于緩沖區(qū)時(shí),需要對(duì)緩沖區(qū)進(jìn)行擴(kuò)容操作。此代碼中的擴(kuò)容的判定方法:Channel調(diào)用compact方法后,的position與limit相等,說明緩沖區(qū)中的數(shù)據(jù)并未被讀取(容量太小),此時(shí)創(chuàng)建新的緩沖區(qū),其大小擴(kuò)大為兩倍。同時(shí)還要將舊緩沖區(qū)中的數(shù)據(jù)拷貝到新的緩沖區(qū)中,同時(shí)調(diào)用SelectionKey的attach方法將新的緩沖區(qū)作為新的附件放入SelectionKey中
// 如果緩沖區(qū)太小,就進(jìn)行擴(kuò)容 if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 將舊buffer中的內(nèi)容放入新的buffer中ewBuffer.put(buffer);// 將新buffer作為附件放到key中key.attach(newBuffer); }改造后的服務(wù)器代碼如下 public class SelectServer {public static void main(String[] args) {// 獲得服務(wù)器通道try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));// 創(chuàng)建選擇器Selector selector = Selector.open();// 通道必須設(shè)置為非阻塞模式server.configureBlocking(false);// 將通道注冊(cè)到選擇器中,并設(shè)置感興趣的事件server.register(selector, SelectionKey.OP_ACCEPT);// 為serverKey設(shè)置感興趣的事件while (true) {// 若沒有事件就緒,線程會(huì)被阻塞,反之不會(huì)被阻塞。從而避免了CPU空轉(zhuǎn)// 返回值為就緒的事件個(gè)數(shù)int ready = selector.select();System.out.println("selector ready counts : " + ready);// 獲取所有事件Set<SelectionKey> selectionKeys = selector.selectedKeys();// 使用迭代器遍歷事件Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 判斷key的類型if(key.isAcceptable()) {// 獲得key對(duì)應(yīng)的channelServerSocketChannel channel = (ServerSocketChannel) key.channel();System.out.println("before accepting...");// 獲取連接SocketChannel socketChannel = channel.accept();System.out.println("after accepting...");// 設(shè)置為非阻塞模式,同時(shí)將連接的通道也注冊(cè)到選擇其中,同時(shí)設(shè)置附件socketChannel.configureBlocking(false);ByteBuffer buffer = ByteBuffer.allocate(16);socketChannel.register(selector, SelectionKey.OP_READ, buffer);// 處理完畢后移除iterator.remove();} else if (key.isReadable()) {SocketChannel channel = (SocketChannel) key.channel();System.out.println("before reading...");// 通過key獲得附件(buffer)ByteBuffer buffer = (ByteBuffer) key.attachment();int read = channel.read(buffer);if(read == -1) {key.cancel();channel.close();} else {// 通過分隔符來分隔buffer中的數(shù)據(jù)split(buffer);// 如果緩沖區(qū)太小,就進(jìn)行擴(kuò)容if (buffer.position() == buffer.limit()) {ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);// 將舊buffer中的內(nèi)容放入新的buffer中buffer.flip();newBuffer.put(buffer);// 將新buffer放到key中作為附件key.attach(newBuffer);}}System.out.println("after reading...");// 處理完畢后移除iterator.remove();}}}} catch (IOException e) {e.printStackTrace();}}private static void split(ByteBuffer buffer) {buffer.flip();for(int i = 0; i < buffer.limit(); i++) {// 遍歷尋找分隔符// get(i)不會(huì)移動(dòng)positionif (buffer.get(i) == '\n') {// 緩沖區(qū)長(zhǎng)度int length = i+1-buffer.position();ByteBuffer target = ByteBuffer.allocate(length);// 將前面的內(nèi)容寫入target緩沖區(qū)for(int j = 0; j < length; j++) {// 將buffer中的數(shù)據(jù)寫入target中target.put(buffer.get());}// 打印結(jié)果ByteBufferUtil.debugAll(target);}}// 切換為寫模式,但是緩沖區(qū)可能未讀完,這里需要使用compactbuffer.compact();} }CopyByteBuffer的大小分配
- 每個(gè) channel 都需要記錄可能被切分的消息,因?yàn)?ByteBuffer 不能被多個(gè) channel 共同使用,因此需要為每個(gè) channel 維護(hù)一個(gè)獨(dú)立的 ByteBuffer
- ByteBuffer 不能太大,比如一個(gè) ByteBuffer 1Mb 的話,要支持百萬連接就要 1Tb 內(nèi)存,因此需要設(shè)計(jì)大小可變的 ByteBuffer
- 分配思路可以參考
- 一種思路是首先分配一個(gè)較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點(diǎn)是消息連續(xù)容易處理,缺點(diǎn)是數(shù)據(jù)拷貝耗費(fèi)性能
- 參考實(shí)現(xiàn)?Java Resizable Array
- 另一種思路是用多個(gè)數(shù)組組成 buffer,一個(gè)數(shù)組不夠,把多出來的內(nèi)容寫入新的數(shù)組,與前面的區(qū)別是消息存儲(chǔ)不連續(xù)解析復(fù)雜,優(yōu)點(diǎn)是避免了拷貝引起的性能損耗
- 一種思路是首先分配一個(gè)較小的 buffer,例如 4k,如果發(fā)現(xiàn)數(shù)據(jù)不夠,再分配 8k 的 buffer,將 4k buffer 內(nèi)容拷貝至 8k buffer,優(yōu)點(diǎn)是消息連續(xù)容易處理,缺點(diǎn)是數(shù)據(jù)拷貝耗費(fèi)性能
6、Write事件
服務(wù)器通過Buffer向通道中寫入數(shù)據(jù)時(shí),可能因?yàn)橥ǖ廊萘啃∮贐uffer中的數(shù)據(jù)大小,導(dǎo)致無法一次性將Buffer中的數(shù)據(jù)全部寫入到Channel中,這時(shí)便需要分多次寫入,具體步驟如下
-
執(zhí)行一次寫操作,向?qū)uffer中的內(nèi)容寫入到SocketChannel中,然后判斷Buffer中是否還有數(shù)據(jù)
-
若Buffer中還有數(shù)據(jù),則需要將SockerChannel注冊(cè)到Seletor中,并關(guān)注寫事件,同時(shí)將未寫完的Buffer作為附件一起放入到SelectionKey中
int write = socket.write(buffer); // 通道中可能無法放入緩沖區(qū)中的所有數(shù)據(jù) if (buffer.hasRemaining()) {// 注冊(cè)到Selector中,關(guān)注可寫事件,并將buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer); } -
添加寫事件的相關(guān)操作key.isWritable(),對(duì)Buffer再次進(jìn)行寫操作
- 每次寫后需要判斷Buffer中是否還有數(shù)據(jù)(是否寫完)。若寫完,需要移除SelecionKey中的Buffer附件,避免其占用過多內(nèi)存,同時(shí)還需移除對(duì)寫事件的關(guān)注
整體代碼如下
public class WriteServer {public static void main(String[] args) {try(ServerSocketChannel server = ServerSocketChannel.open()) {server.bind(new InetSocketAddress(8080));server.configureBlocking(false);Selector selector = Selector.open();server.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();// 處理后就移除事件iterator.remove();if (key.isAcceptable()) {// 獲得客戶端的通道SocketChannel socket = server.accept();// 寫入數(shù)據(jù)StringBuilder builder = new StringBuilder();for(int i = 0; i < 500000000; i++) {builder.append("a");}ByteBuffer buffer = StandardCharsets.UTF_8.encode(builder.toString());// 先執(zhí)行一次Buffer->Channel的寫入,如果未寫完,就添加一個(gè)可寫事件int write = socket.write(buffer);System.out.println(write);// 通道中可能無法放入緩沖區(qū)中的所有數(shù)據(jù)if (buffer.hasRemaining()) {// 注冊(cè)到Selector中,關(guān)注可寫事件,并將buffer添加到key的附件中socket.configureBlocking(false);socket.register(selector, SelectionKey.OP_WRITE, buffer);}} else if (key.isWritable()) {SocketChannel socket = (SocketChannel) key.channel();// 獲得bufferByteBuffer buffer = (ByteBuffer) key.attachment();// 執(zhí)行寫操作int write = socket.write(buffer);System.out.println(write);// 如果已經(jīng)完成了寫操作,需要移除key中的附件,同時(shí)不再對(duì)寫事件感興趣if (!buffer.hasRemaining()) {key.attach(null);key.interestOps(0);}}}}} catch (IOException e) {e.printStackTrace();}} }7、優(yōu)化
多線程優(yōu)化
充分利用多核CPU,分兩組選擇器
- 單線程配一個(gè)選擇器(Boss),專門處理 accept 事件
- 創(chuàng)建 cpu 核心數(shù)的線程(Worker),每個(gè)線程配一個(gè)選擇器,輪流處理 read 事件
實(shí)現(xiàn)思路
-
創(chuàng)建一個(gè)負(fù)責(zé)處理Accept事件的Boss線程,與多個(gè)負(fù)責(zé)處理Read事件的Worker線程
-
Boss線程執(zhí)行的操作
-
接受并處理Accepet事件,當(dāng)Accept事件發(fā)生后,調(diào)用Worker的register(SocketChannel socket)方法,讓W(xué)orker去處理Read事件,其中需要根據(jù)標(biāo)識(shí)robin去判斷將任務(wù)分配給哪個(gè)Worker
// 創(chuàng)建固定數(shù)量的Worker Worker[] workers = new Worker[4]; // 用于負(fù)載均衡的原子整數(shù) AtomicInteger robin = new AtomicInteger(0); // 負(fù)載均衡,輪詢分配Worker workers[robin.getAndIncrement()% workers.length].register(socket);Copy -
register(SocketChannel socket)方法會(huì)通過同步隊(duì)列完成Boss線程與Worker線程之間的通信,讓SocketChannel的注冊(cè)任務(wù)被Worker線程執(zhí)行。添加任務(wù)后需要調(diào)用selector.wakeup()來喚醒被阻塞的Selector
public void register(final SocketChannel socket) throws IOException {// 只啟動(dòng)一次if (!started) {// 初始化操作}// 向同步隊(duì)列中添加SocketChannel的注冊(cè)事件// 在Worker線程中執(zhí)行注冊(cè)事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 喚醒被阻塞的Selector// select類似LockSupport中的park,wakeup的原理類似LockSupport中的unparkselector.wakeup(); }Copy
-
-
Worker線程執(zhí)行的操作
- 從同步隊(duì)列中獲取注冊(cè)任務(wù),并處理Read事件
實(shí)現(xiàn)代碼
public class ThreadsServer {public static void main(String[] args) {try (ServerSocketChannel server = ServerSocketChannel.open()) {// 當(dāng)前線程為Boss線程Thread.currentThread().setName("Boss");server.bind(new InetSocketAddress(8080));// 負(fù)責(zé)輪詢Accept事件的SelectorSelector boss = Selector.open();server.configureBlocking(false);server.register(boss, SelectionKey.OP_ACCEPT);// 創(chuàng)建固定數(shù)量的WorkerWorker[] workers = new Worker[4];// 用于負(fù)載均衡的原子整數(shù)AtomicInteger robin = new AtomicInteger(0);for(int i = 0; i < workers.length; i++) {workers[i] = new Worker("worker-"+i);}while (true) {boss.select();Set<SelectionKey> selectionKeys = boss.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while (iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// BossSelector負(fù)責(zé)Accept事件if (key.isAcceptable()) {// 建立連接SocketChannel socket = server.accept();System.out.println("connected...");socket.configureBlocking(false);// socket注冊(cè)到Worker的Selector中System.out.println("before read...");// 負(fù)載均衡,輪詢分配Workerworkers[robin.getAndIncrement()% workers.length].register(socket);System.out.println("after read...");}}}} catch (IOException e) {e.printStackTrace();}}static class Worker implements Runnable {private Thread thread;private volatile Selector selector;private String name;private volatile boolean started = false;/*** 同步隊(duì)列,用于Boss線程與Worker線程之間的通信*/private ConcurrentLinkedQueue<Runnable> queue;public Worker(String name) {this.name = name;}public void register(final SocketChannel socket) throws IOException {// 只啟動(dòng)一次if (!started) {thread = new Thread(this, name);selector = Selector.open();queue = new ConcurrentLinkedQueue<>();thread.start();started = true;}// 向同步隊(duì)列中添加SocketChannel的注冊(cè)事件// 在Worker線程中執(zhí)行注冊(cè)事件queue.add(new Runnable() {@Overridepublic void run() {try {socket.register(selector, SelectionKey.OP_READ);} catch (IOException e) {e.printStackTrace();}}});// 喚醒被阻塞的Selector// select類似LockSupport中的park,wakeup的原理類似LockSupport中的unparkselector.wakeup();}@Overridepublic void run() {while (true) {try {selector.select();// 通過同步隊(duì)列獲得任務(wù)并運(yùn)行Runnable task = queue.poll();if (task != null) {// 獲得任務(wù),執(zhí)行注冊(cè)操作task.run();}Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> iterator = selectionKeys.iterator();while(iterator.hasNext()) {SelectionKey key = iterator.next();iterator.remove();// Worker只負(fù)責(zé)Read事件if (key.isReadable()) {// 簡(jiǎn)化處理,省略細(xì)節(jié)SocketChannel socket = (SocketChannel) key.channel();ByteBuffer buffer = ByteBuffer.allocate(16);socket.read(buffer);buffer.flip();ByteBufferUtil.debugAll(buffer);}}} catch (IOException e) {e.printStackTrace();}}}} }四、NIO與BIO
1、Stream與Channel
- stream 不會(huì)自動(dòng)緩沖數(shù)據(jù),channel 會(huì)利用系統(tǒng)提供的發(fā)送緩沖區(qū)、接收緩沖區(qū)(更為底層)
- stream 僅支持阻塞 API,channel 同時(shí)支持阻塞、非阻塞 API,網(wǎng)絡(luò) channel 可配合 selector 實(shí)現(xiàn)多路復(fù)用
- 二者均為全雙工,即讀寫可以同時(shí)進(jìn)行
- 雖然Stream是單向流動(dòng)的,但是它也是全雙工的
2、IO模型
- 同步:線程自己去獲取結(jié)果(一個(gè)線程)
- 例如:線程調(diào)用一個(gè)方法后,需要等待方法返回結(jié)果
- 異步:線程自己不去獲取結(jié)果,而是由其它線程返回結(jié)果(至少兩個(gè)線程)
- 例如:線程A調(diào)用一個(gè)方法后,繼續(xù)向下運(yùn)行,運(yùn)行結(jié)果由線程B返回
當(dāng)調(diào)用一次 channel.read?或 stream.read?后,會(huì)由用戶態(tài)切換至操作系統(tǒng)內(nèi)核態(tài)來完成真正數(shù)據(jù)讀取,而讀取又分為兩個(gè)階段,分別為:
-
等待數(shù)據(jù)階段
-
復(fù)制數(shù)據(jù)階段
根據(jù)UNIX 網(wǎng)絡(luò)編程 - 卷 I,IO模型主要有以下幾種
阻塞IO
- 用戶線程進(jìn)行read操作時(shí),需要等待操作系統(tǒng)執(zhí)行實(shí)際的read操作,此期間用戶線程是被阻塞的,無法執(zhí)行其他操作
非阻塞IO
- 用戶線程在一個(gè)循環(huán)中一直調(diào)用read方法,若內(nèi)核空間中還沒有數(shù)據(jù)可讀,立即返回
- 只是在等待階段非阻塞
- 用戶線程發(fā)現(xiàn)內(nèi)核空間中有數(shù)據(jù)后,等待內(nèi)核空間執(zhí)行復(fù)制數(shù)據(jù),待復(fù)制結(jié)束后返回結(jié)果
多路復(fù)用
Java中通過Selector實(shí)現(xiàn)多路復(fù)用
- 當(dāng)沒有事件是,調(diào)用select方法會(huì)被阻塞住
- 一旦有一個(gè)或多個(gè)事件發(fā)生后,就會(huì)處理對(duì)應(yīng)的事件,從而實(shí)現(xiàn)多路復(fù)用
多路復(fù)用與阻塞IO的區(qū)別
- 阻塞IO模式下,若線程因accept事件被阻塞,發(fā)生read事件后,仍需等待accept事件執(zhí)行完成后,才能去處理read事件
- 多路復(fù)用模式下,一個(gè)事件發(fā)生后,若另一個(gè)事件處于阻塞狀態(tài),不會(huì)影響該事件的執(zhí)行
異步IO
- 線程1調(diào)用方法后理解返回,不會(huì)被阻塞也不需要立即獲取結(jié)果
- 當(dāng)方法的運(yùn)行結(jié)果出來以后,由線程2將結(jié)果返回給線程1
3、零拷貝
零拷貝指的是數(shù)據(jù)無需拷貝到 JVM 內(nèi)存中,同時(shí)具有以下三個(gè)優(yōu)點(diǎn)
- 更少的用戶態(tài)與內(nèi)核態(tài)的切換
- 不利用 cpu 計(jì)算,減少 cpu 緩存?zhèn)喂蚕?/li>
- 零拷貝適合小文件傳輸
傳統(tǒng) IO 問題
傳統(tǒng)的 IO 將一個(gè)文件通過 socket 寫出
File f = new File("helloword/data.txt"); RandomAccessFile file = new RandomAccessFile(file, "r");byte[] buf = new byte[(int)f.length()]; file.read(buf);Socket socket = ...; socket.getOutputStream().write(buf);Copy內(nèi)部工作流如下
-
Java 本身并不具備 IO 讀寫能力,因此 read 方法調(diào)用后,要從 Java 程序的用戶態(tài)切換至內(nèi)核態(tài),去調(diào)用操作系統(tǒng)(Kernel)的讀能力,將數(shù)據(jù)讀入內(nèi)核緩沖區(qū)。這期間用戶線程阻塞,操作系統(tǒng)使用 DMA(Direct Memory Access)來實(shí)現(xiàn)文件讀,其間也不會(huì)使用 CPU
-
DMA 也可以理解為硬件單元,用來解放 cpu 完成文件 IO
-
從內(nèi)核態(tài)切換回用戶態(tài),將數(shù)據(jù)從內(nèi)核緩沖區(qū)讀入用戶緩沖區(qū)(即 byte[] buf),這期間?CPU 會(huì)參與拷貝,無法利用 DMA
-
調(diào)用 write 方法,這時(shí)將數(shù)據(jù)從用戶緩沖區(qū)(byte[] buf)寫入?socket 緩沖區(qū),CPU 會(huì)參與拷貝
-
接下來要向網(wǎng)卡寫數(shù)據(jù),這項(xiàng)能力 Java 又不具備,因此又得從用戶態(tài)切換至內(nèi)核態(tài),調(diào)用操作系統(tǒng)的寫能力,使用 DMA 將?socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會(huì)使用 CPU
可以看到中間環(huán)節(jié)較多,java 的 IO 實(shí)際不是物理設(shè)備級(jí)別的讀寫,而是緩存的復(fù)制,底層的真正讀寫是操作系統(tǒng)來完成的
- 用戶態(tài)與內(nèi)核態(tài)的切換發(fā)生了 3 次,這個(gè)操作比較重量級(jí)
- 數(shù)據(jù)拷貝了共 4 次
NIO 優(yōu)化
通過?DirectByteBuf
- ByteBuffer.allocate(10)
- 底層對(duì)應(yīng) HeapByteBuffer,使用的還是 Java 內(nèi)存
- ByteBuffer.allocateDirect(10)
- 底層對(duì)應(yīng)DirectByteBuffer,使用的是操作系統(tǒng)內(nèi)存
大部分步驟與優(yōu)化前相同,唯有一點(diǎn):Java 可以使用 DirectByteBuffer 將堆外內(nèi)存映射到 JVM 內(nèi)存中來直接訪問使用
- 這塊內(nèi)存不受 JVM 垃圾回收的影響,因此內(nèi)存地址固定,有助于 IO 讀寫
- Java 中的 DirectByteBuf 對(duì)象僅維護(hù)了此內(nèi)存的虛引用,內(nèi)存回收分成兩步
- DirectByteBuffer 對(duì)象被垃圾回收,將虛引用加入引用隊(duì)列
- 當(dāng)引用的對(duì)象ByteBuffer被垃圾回收以后,虛引用對(duì)象Cleaner就會(huì)被放入引用隊(duì)列中,然后調(diào)用Cleaner的clean方法來釋放直接內(nèi)存
- DirectByteBuffer 的釋放底層調(diào)用的是 Unsafe 的 freeMemory 方法
- 通過專門線程訪問引用隊(duì)列,根據(jù)虛引用釋放堆外內(nèi)存
- DirectByteBuffer 對(duì)象被垃圾回收,將虛引用加入引用隊(duì)列
- 減少了一次數(shù)據(jù)拷貝,用戶態(tài)與內(nèi)核態(tài)的切換次數(shù)沒有減少
進(jìn)一步優(yōu)化1
以下兩種方式都是零拷貝,即無需將數(shù)據(jù)拷貝到用戶緩沖區(qū)中(JVM內(nèi)存中)
底層采用了?linux 2.1?后提供的?sendFile?方法,Java 中對(duì)應(yīng)著兩個(gè) channel 調(diào)用?transferTo/transferFrom?方法拷貝數(shù)據(jù)
-
Java 調(diào)用 transferTo 方法后,要從 Java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會(huì)使用 CPU
-
數(shù)據(jù)從內(nèi)核緩沖區(qū)傳輸?shù)?socket 緩沖區(qū),CPU 會(huì)參與拷貝
-
最后使用 DMA 將?socket 緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會(huì)使用 CPU
這種方法下
- 只發(fā)生了1次用戶態(tài)與內(nèi)核態(tài)的切換
- 數(shù)據(jù)拷貝了 3 次
進(jìn)一步優(yōu)化2
linux 2.4?對(duì)上述方法再次進(jìn)行了優(yōu)化
-
Java 調(diào)用 transferTo 方法后,要從 Java 程序的用戶態(tài)切換至內(nèi)核態(tài),使用 DMA將數(shù)據(jù)讀入內(nèi)核緩沖區(qū),不會(huì)使用 CPU
-
只會(huì)將一些 offset 和 length 信息拷入?socket 緩沖區(qū),幾乎無消耗
-
使用 DMA 將?內(nèi)核緩沖區(qū)的數(shù)據(jù)寫入網(wǎng)卡,不會(huì)使用 CPU
整個(gè)過程僅只發(fā)生了1次用戶態(tài)與內(nèi)核態(tài)的切換,數(shù)據(jù)拷貝了 2 次
4、AIO
AIO 用來解決數(shù)據(jù)復(fù)制階段的阻塞問題
- 同步意味著,在進(jìn)行讀寫操作時(shí),線程需要等待結(jié)果,還是相當(dāng)于閑置
- 異步意味著,在進(jìn)行讀寫操作時(shí),線程不必等待結(jié)果,而是將來由操作系統(tǒng)來通過回調(diào)方式由另外的線程來獲得結(jié)果
異步模型需要底層操作系統(tǒng)(Kernel)提供支持
- Windows 系統(tǒng)通過 IOCP?實(shí)現(xiàn)了真正的異步 IO
- Linux 系統(tǒng)異步 IO 在 2.6 版本引入,但其底層實(shí)現(xiàn)還是用多路復(fù)用模擬了異步 IO,性能沒有優(yōu)勢(shì)
總結(jié)
以上是生活随笔為你收集整理的中间件系列「三」netty之NIO基础的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Java注解系列】内置注解与AOP实现
- 下一篇: 【计算机是如何通信 四】Web服务器/S