Netty源码解析8-ChannelHandler实例之CodecHandler
請(qǐng)戳GitHub原文: github.com/wangzhiwubi…
更多文章關(guān)注:多線程/集合/分布式/Netty/NIO/RPC
- Java高級(jí)特性增強(qiáng)-集合
- Java高級(jí)特性增強(qiáng)-多線程
- Java高級(jí)特性增強(qiáng)-Synchronized
- Java高級(jí)特性增強(qiáng)-volatile
- Java高級(jí)特性增強(qiáng)-并發(fā)集合框架
- Java高級(jí)特性增強(qiáng)-分布式
- Java高級(jí)特性增強(qiáng)-Zookeeper
- Java高級(jí)特性增強(qiáng)-JVM
- Java高級(jí)特性增強(qiáng)-NIO
- RPC
- zookeeper
- JVM
- NIO
- 其他更多
編解碼處理器作為Netty編程時(shí)必備的ChannelHandler,每個(gè)應(yīng)用都必不可少。Netty作為網(wǎng)絡(luò)應(yīng)用框架,在網(wǎng)絡(luò)上的各個(gè)應(yīng)用之間不斷進(jìn)行數(shù)據(jù)交互。而網(wǎng)絡(luò)數(shù)據(jù)交換的基本單位是字節(jié),所以需要將本應(yīng)用的POJO對(duì)象編碼為字節(jié)數(shù)據(jù)發(fā)送到其他應(yīng)用,或者將收到的其他應(yīng)用的字節(jié)數(shù)據(jù)解碼為本應(yīng)用可使用的POJO對(duì)象。這一部分,又和JAVA中的序列化和反序列化對(duì)應(yīng)。幸運(yùn)的是,有很多其他的開源工具(protobuf,thrift,json,xml等等)可方便的處理POJO對(duì)象的序列化,可參見這個(gè)鏈接。 在互聯(lián)網(wǎng)中,Netty使用TCP/UDP協(xié)議傳輸數(shù)據(jù)。由于Netty基于異步事件處理以及TCP的一些特性,使得TCP數(shù)據(jù)包會(huì)發(fā)生粘包現(xiàn)象。想象這樣的情況,客戶端與服務(wù)端建立連接后,連接發(fā)送了兩條消息:
+------+ +------+ | MSG1 | | MSG2 | +------+ +------+ 復(fù)制代碼在互聯(lián)網(wǎng)上傳輸數(shù)據(jù)時(shí),連續(xù)發(fā)送的兩條消息,在服務(wù)端極有可能被合并為一條:
+------------+ | MSG1 MSG2 | +------------+ 復(fù)制代碼這還不是最壞的情況,由于路由器的拆包和重組,可能收到這樣的兩個(gè)數(shù)據(jù)包:
+----+ +---------+ +-------+ +-----+ | MS | | G1MSG2 | 或者 | MSG1M | | SG2 | +----+ +---------+ +-------+ +-----+ 復(fù)制代碼而服務(wù)端要正確的識(shí)別出這樣的兩條消息,就需要編碼器的正確工作。為了正確的識(shí)別出消息,業(yè)界有以下幾種做法:
使用定界符分割消息,一個(gè)特例是使用換行符分隔每條消息。 使用定長(zhǎng)的消息。 在消息的某些字段指明消息長(zhǎng)度。
明白了這些,進(jìn)入正題,分析Netty的編碼框架ByteToMessageDecoder。
ByteToMessageDecoder
在分析之前,需要說明一點(diǎn):ByteToMessage容易引起誤解,解碼結(jié)果Message會(huì)被認(rèn)為是JAVA對(duì)象POJO,但實(shí)際解碼結(jié)果是消息幀。也就是說該解碼器處理TCP的粘包現(xiàn)象,將網(wǎng)絡(luò)發(fā)送的字節(jié)流解碼為具有確定含義的消息幀,之后的解碼器再將消息幀解碼為實(shí)際的POJO對(duì)象。 明白了這點(diǎn),再次回顧兩條消息發(fā)送的最壞情況,可知要正確取得兩條消息,需要一個(gè)內(nèi)存區(qū)域存儲(chǔ)消息,當(dāng)收到MS時(shí)繼續(xù)等待第二個(gè)包G1MSG2到達(dá)再進(jìn)行解碼操作。在ByteToMessageDecoder中,這個(gè)內(nèi)存區(qū)域被抽象為Cumulator,直譯累積器,可自動(dòng)擴(kuò)容累積字節(jié)數(shù)據(jù),Netty將其定義為一個(gè)接口:
public interface Cumulator {ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);} 復(fù)制代碼其中,兩個(gè)ByteBuf參數(shù)cumulation指已經(jīng)累積的字節(jié)數(shù)據(jù),in表示該次channelRead()讀取到的新數(shù)據(jù)。返回ByteBuf為累積數(shù)據(jù)后的新累積區(qū)(必要時(shí)候自動(dòng)擴(kuò)容)。自動(dòng)擴(kuò)容的代碼如下:
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int newReadBytes) {ByteBuf oldCumulation = cumulation;// 擴(kuò)容后新的緩沖區(qū)cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);cumulation.writeBytes(oldCumulation);// 舊的緩沖區(qū)釋放oldCumulation.release();return cumulation;} 復(fù)制代碼自動(dòng)擴(kuò)容的方法簡(jiǎn)單粗暴,直接使用大容量的Bytebuf替換舊的ByteBuf。Netty定義了兩個(gè)累積器,一個(gè)為MERGE_CUMULATOR:
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;// 1.累積區(qū)容量不夠容納數(shù)據(jù)// 2.用戶使用了slice().retain()或duplicate().retain()使refCnt增加if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()|| cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());} else {buffer = cumulation;}buffer.writeBytes(in);in.release();return buffer;}}; 復(fù)制代碼可知,兩種情況下會(huì)擴(kuò)容:
另一個(gè)累積器為COMPOSITE_CUMULATOR:
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {@Overridepublic ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {ByteBuf buffer;if (cumulation.refCnt() > 1) {buffer = expandCumulation(alloc, cumulation, in.readableBytes());buffer.writeBytes(in);in.release();} else {CompositeByteBuf composite;if (cumulation instanceof CompositeByteBuf) {composite = (CompositeByteBuf) cumulation;} else {composite = alloc.compositeBuffer(Integer.MAX_VALUE);composite.addComponent(true, cumulation);}composite.addComponent(true, in);buffer = composite;}return buffer;}}; 復(fù)制代碼這個(gè)累積器只在第二種情況refCnt>1時(shí)擴(kuò)容,除此之外處理和MERGE_CUMULATOR一致,不同的是當(dāng)cumulation不是CompositeByteBuf時(shí)會(huì)創(chuàng)建新的同類CompositeByteBuf,這樣最后返回的ByteBuf必定是CompositeByteBuf。使用這個(gè)累積器后,當(dāng)容量不夠時(shí)并不會(huì)進(jìn)行內(nèi)存復(fù)制,只會(huì)講新讀入的in加到CompositeByteBuf中。需要注意的是:此種情況下雖然不需內(nèi)存復(fù)制,卻要求用戶維護(hù)復(fù)雜的索引,在某些使用中可能慢于MERGE_CUMULATOR。故Netty默認(rèn)使用MERGE_CUMULATOR累積器。 累積器分析完畢,步入正題ByteToMessageDecoder,首先看類簽名:
public abstract class ByteToMessageDecoder extendsChannelInboundHandlerAdapter 復(fù)制代碼該類是一個(gè)抽象類,其中的抽象方法只有一個(gè)decode():
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception; 復(fù)制代碼用戶使用了該解碼框架后,只需實(shí)現(xiàn)該方法就可定義自己的解碼器。參數(shù)in表示累積器已累積的數(shù)據(jù),out表示本次可從累積數(shù)據(jù)解碼出的結(jié)果列表,結(jié)果可為POJO對(duì)象或者ByteBuf等等Object。 關(guān)注一下成員變量,以便更好的分析:
ByteBuf cumulation; // 累積區(qū)private Cumulator cumulator = MERGE_CUMULATOR; // 累積器// 設(shè)置為true后每個(gè)channelRead事件只解碼出一個(gè)結(jié)果private boolean singleDecode; // 某些特殊協(xié)議使用private boolean decodeWasNull; // 解碼結(jié)果為空private boolean first; // 是否首個(gè)消息// 累積區(qū)不丟棄字節(jié)的最大次數(shù),16次后開始丟棄private int discardAfterReads = 16;private int numReads; // 累積區(qū)不丟棄字節(jié)的channelRead次數(shù) 復(fù)制代碼下面,直接進(jìn)入channelRead()事件處理:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 只對(duì)ByteBuf處理即只對(duì)字節(jié)數(shù)據(jù)進(jìn)行處理if (msg instanceof ByteBuf) {// 解碼結(jié)果列表CodecOutputList out = CodecOutputList.newInstance();try {ByteBuf data = (ByteBuf) msg;first = cumulation == null; // 累積區(qū)為空表示首次解碼if (first) {// 首次解碼直接使用讀入的ByteBuf作為累積區(qū)cumulation = data;} else {// 非首次需要進(jìn)行字節(jié)數(shù)據(jù)累積cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);}callDecode(ctx, cumulation, out); // 解碼操作} catch (DecoderException e) {throw e;} catch (Throwable t) {throw new DecoderException(t);} finally {if (cumulation != null && !cumulation.isReadable()) {// 此時(shí)累積區(qū)不再有字節(jié)數(shù)據(jù),已被處理完畢numReads = 0;cumulation.release();cumulation = null;} else if (++ numReads >= discardAfterReads) {// 連續(xù)discardAfterReads次后// 累積區(qū)還有字節(jié)數(shù)據(jù),此時(shí)丟棄一部分?jǐn)?shù)據(jù)numReads = 0;discardSomeReadBytes(); // 丟棄一些已讀字節(jié)}int size = out.size();// 本次沒有解碼出數(shù)據(jù),此時(shí)size=0decodeWasNull = !out.insertSinceRecycled();fireChannelRead(ctx, out, size); // 觸發(fā)事件out.recycle(); // 回收解碼結(jié)果}} else {ctx.fireChannelRead(msg);}} 復(fù)制代碼解碼結(jié)果列表CodecOutputList是Netty定制的一個(gè)特殊列表,該列表在線程中被緩存,可循環(huán)使用來(lái)存儲(chǔ)解碼結(jié)果,減少不必要的列表實(shí)例創(chuàng)建,從而提升性能。由于解碼結(jié)果需要頻繁存儲(chǔ),普通的ArrayList難以滿足該需求,故定制化了一個(gè)特殊列表,由此可見Netty對(duì)優(yōu)化的極致追求。 注意finally塊的第一個(gè)if情況滿足時(shí),即累積區(qū)的數(shù)據(jù)已被讀取完畢,請(qǐng)考慮釋放累積區(qū)的必要性。想象這樣的情況,當(dāng)一條消息被解碼完畢后,如果客戶端長(zhǎng)時(shí)間不發(fā)送消息,那么,服務(wù)端保存該條消息的累積區(qū)將一直占據(jù)服務(wù)端內(nèi)存浪費(fèi)資源,所以必須釋放該累積區(qū)。 第二個(gè)if情況滿足時(shí),即累積區(qū)的數(shù)據(jù)一直在channelRead讀取數(shù)據(jù)進(jìn)行累積和解碼,直到達(dá)到了discardAfterReads次(默認(rèn)16),此時(shí)累積區(qū)依然還有數(shù)據(jù)。在這樣的情況下,Netty主動(dòng)丟棄一些字節(jié),這是為了防止該累積區(qū)占用大量?jī)?nèi)存甚至耗盡內(nèi)存引發(fā)OOM。 處理完這些情況后,最后統(tǒng)一觸發(fā)ChannelRead事件,將解碼出的數(shù)據(jù)傳遞給下一個(gè)處理器。注意:當(dāng)out=0時(shí),統(tǒng)一到一起被處理了。 再看細(xì)節(jié)的discardSomeReadBytes()和fireChannelRead():
protected final void discardSomeReadBytes() {if (cumulation != null && !first && cumulation.refCnt() == 1) {cumulation.discardSomeReadBytes();}}static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {for (int i = 0; i < numElements; i ++) {ctx.fireChannelRead(msgs.getUnsafe(i));}} 復(fù)制代碼代碼比較簡(jiǎn)單,只需注意discardSomeReadBytes中,累積區(qū)的refCnt() == 1時(shí)才丟棄數(shù)據(jù)是因?yàn)?#xff1a;如果用戶使用了slice().retain()和duplicate().retain()使refCnt>1,表明該累積區(qū)還在被用戶使用,丟棄數(shù)據(jù)可能導(dǎo)致用戶的困惑,所以須確定用戶不再使用該累積區(qū)的已讀數(shù)據(jù),此時(shí)才丟棄。 下面分析解碼核心方法callDecode():
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {try {while (in.isReadable()) {int outSize = out.size();if (outSize > 0) {// 解碼出消息就立即處理,防止消息等待fireChannelRead(ctx, out, outSize);out.clear();// 用戶主動(dòng)刪除該Handler,繼續(xù)操作in是不安全的if (ctx.isRemoved()) {break;}outSize = 0;}int oldInputLength = in.readableBytes();decode(ctx, in, out); // 子類需要實(shí)現(xiàn)的具體解碼步驟// 用戶主動(dòng)刪除該Handler,繼續(xù)操作in是不安全的if (ctx.isRemoved()) {break; }// 此時(shí)outSize都==0(這的代碼容易產(chǎn)生誤解 應(yīng)該直接使用0)if (outSize == out.size()) {if (oldInputLength == in.readableBytes()) {// 沒有解碼出消息,且沒讀取任何in數(shù)據(jù)break;} else {// 讀取了一部份數(shù)據(jù)但沒有解碼出消息// 說明需要更多的數(shù)據(jù),故繼續(xù)continue;}}// 運(yùn)行到這里outSize>0 說明已經(jīng)解碼出消息if (oldInputLength == in.readableBytes()) {// 解碼出消息但是in的讀索引不變,用戶的decode方法有Bugthrow new DecoderException("did not read anything but decoded a message.");}// 用戶設(shè)定一個(gè)channelRead事件只解碼一次if (isSingleDecode()) {break; }}} catch (DecoderException e) {throw e;} catch (Throwable cause) {throw new DecoderException(cause);}} 復(fù)制代碼循環(huán)中的第一個(gè)if分支,檢查解碼結(jié)果,如果已經(jīng)解碼出消息則立即將消息傳播到下一個(gè)處理器進(jìn)行處理,這樣可使消息得到及時(shí)處理。在調(diào)用decode()方法的前后,都檢查該Handler是否被用戶從ChannelPipeline中刪除,如果刪除則跳出解碼步驟不對(duì)輸入緩沖區(qū)in進(jìn)行操作,因?yàn)槔^續(xù)操作in已經(jīng)不安全。解碼完成后,對(duì)in解碼前后的讀索引進(jìn)行了檢查,防止用戶的錯(cuò)誤使用,如果用戶錯(cuò)誤使用將拋出異常。 至此,核心的解碼框架已經(jīng)分析完畢,再看最后的一些邊角處理。首先是channelReadComplete()讀事件完成后的處理:
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {numReads = 0; // 連續(xù)讀次數(shù)置0discardSomeReadBytes(); // 丟棄已讀數(shù)據(jù),節(jié)約內(nèi)存if (decodeWasNull) {// 沒有解碼出結(jié)果,則期待更多數(shù)據(jù)讀入decodeWasNull = false;if (!ctx.channel().config().isAutoRead()) {ctx.read();}}ctx.fireChannelReadComplete();}復(fù)制代碼如果channelRead()中沒有解碼出消息,極有可能是數(shù)據(jù)不夠,由此調(diào)用ctx.read()期待讀入更多的數(shù)據(jù)。如果設(shè)置了自動(dòng)讀取,將會(huì)在HeadHandler中調(diào)用ctx.read();沒有設(shè)置自動(dòng)讀取,則需要此處顯式調(diào)用。 最后再看Handler從ChannelPipelien中移除的處理handlerRemoved():
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {ByteBuf buf = cumulation;if (buf != null) {cumulation = null; // 釋放累積區(qū),GC回收int readable = buf.readableBytes();if (readable > 0) {ByteBuf bytes = buf.readBytes(readable);buf.release();// 解碼器已被刪除故不再解碼,只將數(shù)據(jù)傳播到下一個(gè)Handlerctx.fireChannelRead(bytes);} else {buf.release();}numReads = 0; // 置0,有可能被再次添加ctx.fireChannelReadComplete();}handlerRemoved0(ctx); // 用戶可進(jìn)行的自定義處理} 復(fù)制代碼當(dāng)解碼器被刪除時(shí),如果還有沒被解碼的數(shù)據(jù),則將數(shù)據(jù)傳播到下一個(gè)處理器處理,防止丟失數(shù)據(jù)。此外,當(dāng)連接不再有效觸發(fā)channelInactive事件或者觸發(fā)ChannelInputShutdownEvent時(shí),則會(huì)調(diào)用callDecode()解碼,如果解碼出消息,傳播到下一個(gè)處理器。這部分的代碼不再列出。 至此,ByteToMessageDecoder解碼框架已分析完畢,下面,我們選用具體的實(shí)例進(jìn)行分析。
LineBasedFrameDecoder
基于行分隔的解碼器LineBasedFrameDecoder是一個(gè)特殊的分隔符解碼器,該解碼器使用的分隔符為:windows的\r\n和類linux的\n。 首先看該類定義的成員變量:
// 最大幀長(zhǎng)度,超過此長(zhǎng)度將拋出異常TooLongFrameExceptionprivate final int maxLength;// 是否快速失敗,true-檢測(cè)到幀長(zhǎng)度過長(zhǎng)立即拋出異常不在讀取整個(gè)幀// false-檢測(cè)到幀長(zhǎng)度過長(zhǎng)依然讀完整個(gè)幀再拋出異常private final boolean failFast;// 是否略過分隔符,true-解碼結(jié)果不含分隔符private final boolean stripDelimiter;// 超過最大幀長(zhǎng)度是否丟棄字節(jié)private boolean discarding;private int discardedBytes; // 丟棄的字節(jié)數(shù) 復(fù)制代碼其中,前三個(gè)變量可由用戶根據(jù)實(shí)際情況配置,后兩個(gè)變量解碼時(shí)使用。 該子類覆蓋的解碼方法如下:
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Object decoded = decode(ctx, in);if (decoded != null) {out.add(decoded);}} 復(fù)制代碼其中又定義了decode(ctx, in)解碼出單個(gè)消息幀,事實(shí)上這也是其他編碼子類使用的方法。decode(ctx, in)方法處理很繞彎,只給出偽代碼:
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {final int eol = findEndOfLine(buffer);if (!discarding) {if (eol >= 0) {// 此時(shí)已找到換行符if(!checkMaxLength()) {return getFrame().retain();} // 超過最大長(zhǎng)度拋出異常} else {if (checkMaxLength()) {// 設(shè)置true表示下一次解碼需要丟棄字節(jié)discarding = true; if (failFast) {// 拋出異常}} }} else {if (eol >= 0) {// 丟棄換行符以及之前的字節(jié)buffer.readerIndex(eol + delimLength);} else {// 丟棄收到的所有字節(jié)buffer.readerIndex(buffer.writerIndex());}}} 復(fù)制代碼該方法需要結(jié)合解碼框架的while循環(huán)反復(fù)理解,每個(gè)if情況都是一次while循環(huán),而變量discarding就成為控制每次解碼流程的狀態(tài)量,注意其中的狀態(tài)轉(zhuǎn)移。(想法:使用狀態(tài)機(jī)實(shí)現(xiàn),則流程更清晰)
DelimiterBasedFrameDecoder
該解碼器是更通用的分隔符解碼器,可支持多個(gè)分隔符,每個(gè)分隔符可為一個(gè)或多個(gè)字符。如果定義了多個(gè)分隔符,并且可解碼出多個(gè)消息幀,則選擇產(chǎn)生最小幀長(zhǎng)的結(jié)果。例如,使用行分隔符\r\n和\n分隔:
+--------------+| ABC\nDEF\r\n |+--------------+ 復(fù)制代碼可有兩種結(jié)果:
+-----+-----+ +----------+ | ABC | DEF | (√) 和 | ABC\nDEF | (×) +-----+-----+ +----------+ 復(fù)制代碼該編碼器可配置的變量與LineBasedFrameDecoder類似,只是多了一個(gè)ByteBuf[] delimiters用于配置具體的分隔符。 Netty在Delimiters類中定義了兩種默認(rèn)的分隔符,分別是NULL分隔符和行分隔符:
public static ByteBuf[] nulDelimiter() {return new ByteBuf[] {Unpooled.wrappedBuffer(new byte[] { 0 }) };}public static ByteBuf[] lineDelimiter() {return new ByteBuf[] {Unpooled.wrappedBuffer(new byte[] { '\r', '\n' }),Unpooled.wrappedBuffer(new byte[] { '\n' }),};} 復(fù)制代碼FixedLengthFrameDecoder
該解碼器十分簡(jiǎn)單,按照固定長(zhǎng)度f(wàn)rameLength解碼出消息幀。如下的數(shù)據(jù)幀解碼為固定長(zhǎng)度3的消息幀示例如下:
+---+----+------+----+ +-----+-----+-----+ | A | BC | DEFG | HI | -> | ABC | DEF | GHI | +---+----+------+----+ +-----+-----+-----+ 復(fù)制代碼其中的解碼方法也十分簡(jiǎn)單:
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {if (in.readableBytes() < frameLength) {return null;} else {return in.readSlice(frameLength).retain();}} 復(fù)制代碼LengthFieldBasedFrameDecoder
基于長(zhǎng)度字段的消息幀解碼器,該解碼器可根據(jù)數(shù)據(jù)包中的長(zhǎng)度字段動(dòng)態(tài)的解碼出消息幀。一個(gè)推薦的二進(jìn)制傳輸協(xié)議可設(shè)計(jì)為如下格式:
+----------+------+----------+------+ | 頭部長(zhǎng)度 | 頭部 | 數(shù)據(jù)長(zhǎng)度 | 數(shù)據(jù) | +----------+------+----------+------+ 復(fù)制代碼這樣的協(xié)議可滿足大多數(shù)場(chǎng)景使用,但不幸的是:很多情況下并不可以設(shè)計(jì)新的協(xié)議,往往要在老舊的協(xié)議上傳輸數(shù)據(jù)。由此,Netty將該解碼器設(shè)計(jì)的十分通用,只要有類似的長(zhǎng)度字段便能正確解碼出消息幀。當(dāng)然前提是:正確使用解碼器。 沒有什么是完美的,由于該解碼器十分通用,所以有大量的配置變量:
private final ByteOrder byteOrder;private final int maxFrameLength;private final boolean failFast;private final int lengthFieldOffset;private final int lengthFieldLength;private final int lengthAdjustment;private final int initialBytesToStrip; 復(fù)制代碼變量byteOrder表示長(zhǎng)度字段的字節(jié)序:大端或小端,默認(rèn)為大端。如果對(duì)字節(jié)序有疑問,請(qǐng)查閱其他資料,不再贅述。maxFrameLength和failFast與其他解碼器相同,控制最大幀長(zhǎng)度和快速失敗拋異常,注意:該解碼器failFast默認(rèn)為true。 接下來(lái)將重點(diǎn)介紹其它四個(gè)變量:
-
lengthFieldOffset表示長(zhǎng)度字段偏移量即在一個(gè)數(shù)據(jù)包中長(zhǎng)度字段的具體下標(biāo)位置。標(biāo)準(zhǔn)情況,該長(zhǎng)度字段為數(shù)據(jù)部分長(zhǎng)度。
-
lengthFieldLength表示長(zhǎng)度字段的具體字節(jié)數(shù),如一個(gè)int占4字節(jié)。該解碼器支持的字節(jié)數(shù)有:1,2,3,4和8,其他則會(huì)拋出異常。另外,還需要注意的是:長(zhǎng)度字段的結(jié)果為無(wú)符號(hào)數(shù)。
-
lengthAdjustment是一個(gè)長(zhǎng)度調(diào)節(jié)量,當(dāng)數(shù)據(jù)包的長(zhǎng)度字段不是數(shù)據(jù)部分長(zhǎng)度而是總長(zhǎng)度時(shí),可將此值設(shè)定為頭部長(zhǎng)度,便能正確解碼出包含整個(gè)數(shù)據(jù)包的結(jié)果消息幀。注意:某些情況下,該值可設(shè)定為負(fù)數(shù)。
-
initialBytesToStrip表示需要略過的字節(jié)數(shù),如果我們只關(guān)心數(shù)據(jù)部分而不關(guān)心頭部,可將此值設(shè)定為頭部長(zhǎng)度從而丟棄頭部。 下面我們使用具體的例子來(lái)說明:
-
需求1:如下待解碼數(shù)據(jù)包,正確解碼為消息幀,其中長(zhǎng)度字段在最前面的2字節(jié),數(shù)據(jù)部分為12字節(jié)的字符串"HELLO, WORLD",長(zhǎng)度字段0x000C=12 表示數(shù)據(jù)部分長(zhǎng)度,數(shù)據(jù)包總長(zhǎng)度則為14字節(jié)。
解碼前(14 bytes) 解碼后(14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+ 復(fù)制代碼
正確配置(只列出四個(gè)值中不為0的值):
lengthFieldLength = 2; 復(fù)制代碼-
需求2:需求1的數(shù)據(jù)包不變,消息幀中去除長(zhǎng)度字段。
解碼前(14 bytes) 解碼后(12 bytes)+--------+----------------+ +----------------+| Length | Actual Content |----->| Actual Content || 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |+--------+----------------+ +----------------+ 復(fù)制代碼
正確配置:
lengthFieldLength = 2;initialBytesToStrip = 2; 復(fù)制代碼需求3:需求1數(shù)據(jù)包中長(zhǎng)度字段表示數(shù)據(jù)包總長(zhǎng)度。
解碼前(14 bytes) 解碼后(14 bytes)+--------+----------------+ +--------+----------------+| Length | Actual Content |----->| Length | Actual Content || 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |+--------+----------------+ +--------+----------------+ 復(fù)制代碼正確配置:
lengthFieldLength = 2;lengthAdjustment = -2; // 調(diào)整長(zhǎng)度字段的2字節(jié) 復(fù)制代碼需求4:綜合難度,數(shù)據(jù)包有兩個(gè)頭部HDR1和HDR2,長(zhǎng)度字段以及數(shù)據(jù)部分組成,其中長(zhǎng)度字段值表示數(shù)據(jù)包總長(zhǎng)度。結(jié)果消息幀需要第二個(gè)頭部HDR2和數(shù)據(jù)部分。請(qǐng)先給出答案再與標(biāo)準(zhǔn)答案比較,結(jié)果正確說明你已完全掌握了該解碼器的使用。
解碼前 (16 bytes) 解碼后 (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+ 復(fù)制代碼正確配置:
lengthFieldOffset = 1;lengthFieldLength = 2;lengthAdjustment = -3;initialBytesToStrip = 3; 復(fù)制代碼本解碼器的解碼過程總體上較為復(fù)雜,由于解碼的代碼是在while循環(huán)里面,decode方法return或者拋出異常時(shí)可看做一次循環(huán)結(jié)束,直到in中數(shù)據(jù)被解析完或者in的readerIndex讀索引不再增加才會(huì)從while循環(huán)跳出。使用狀態(tài)的思路理解,每個(gè)return或者拋出異常看為一個(gè)狀態(tài):
狀態(tài)1:丟棄過長(zhǎng)幀狀態(tài),可能是用戶設(shè)置了錯(cuò)誤的幀長(zhǎng)度或者實(shí)際幀過長(zhǎng)。
if (discardingTooLongFrame) {long bytesToDiscard = this.bytesToDiscard;int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());in.skipBytes(localBytesToDiscard); // 丟棄實(shí)際的字節(jié)數(shù)bytesToDiscard -= localBytesToDiscard;this.bytesToDiscard = bytesToDiscard;failIfNecessary(false);} 復(fù)制代碼變量localBytesToDiscard取得實(shí)際需要丟棄的字節(jié)數(shù),由于過長(zhǎng)幀有兩種情況:a.用戶設(shè)置了錯(cuò)誤的長(zhǎng)度字段,此時(shí)in中并沒有如此多的字節(jié);b.in中確實(shí)有如此長(zhǎng)度的幀,這個(gè)幀確實(shí)超過了設(shè)定的最大長(zhǎng)度。bytesToDiscard的計(jì)算是為了failIfNecessary()確定異常的拋出,其值為0表示當(dāng)次丟棄狀態(tài)已經(jīng)丟棄了in中的所有數(shù)據(jù),可以對(duì)新讀入in的數(shù)據(jù)進(jìn)行處理;否則,還處于異常狀態(tài)。
private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {if (bytesToDiscard == 0) {long tooLongFrameLength = this.tooLongFrameLength;this.tooLongFrameLength = 0;// 由于已經(jīng)丟棄所有數(shù)據(jù),關(guān)閉丟棄模式discardingTooLongFrame = false;// 已經(jīng)丟棄了所有字節(jié),當(dāng)非快速失敗模式拋異常if (!failFast || firstDetectionOfTooLongFrame) {fail(tooLongFrameLength);}} else {if (failFast && firstDetectionOfTooLongFrame) {// 幀長(zhǎng)度異常,快速失敗模式檢測(cè)到即拋異常fail(tooLongFrameLength);}}}復(fù)制代碼可見,首次檢測(cè)到幀長(zhǎng)度是一種特殊情況,在之后的一個(gè)狀態(tài)進(jìn)行分析。請(qǐng)注意該狀態(tài)并不是都拋異常,還有可能進(jìn)入狀態(tài)2。
狀態(tài)2:in中數(shù)據(jù)不足夠組成消息幀,此時(shí)直接返回null等待更多數(shù)據(jù)到達(dá)。
if (in.readableBytes() < lengthFieldEndOffset) {return null;} 復(fù)制代碼狀態(tài)3:幀長(zhǎng)度錯(cuò)誤檢測(cè),檢測(cè)長(zhǎng)度字段為負(fù)值得幀以及加入調(diào)整長(zhǎng)度后總長(zhǎng)小于長(zhǎng)度字段的幀,均拋出異常。
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;// 該方法取出長(zhǎng)度字段的值,不再深入分析long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);if (frameLength < 0) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("...");}frameLength += lengthAdjustment + lengthFieldEndOffset;if (frameLength < lengthFieldEndOffset) {in.skipBytes(lengthFieldEndOffset);throw new CorruptedFrameException("..."); 復(fù)制代碼狀態(tài)4:幀過長(zhǎng),由前述可知:可能是用戶設(shè)置了錯(cuò)誤的幀長(zhǎng)度或者實(shí)際幀過長(zhǎng)
if (frameLength > maxFrameLength) {long discard = frameLength - in.readableBytes();tooLongFrameLength = frameLength;if (discard < 0) {in.skipBytes((int) frameLength);} else {discardingTooLongFrame = true;bytesToDiscard = discard;in.skipBytes(in.readableBytes());}failIfNecessary(true);return null;} 復(fù)制代碼變量discard<0表示當(dāng)前收到的數(shù)據(jù)足以確定是實(shí)際的幀過長(zhǎng),所以直接丟棄過長(zhǎng)的幀長(zhǎng)度;>0表示當(dāng)前in中的數(shù)據(jù)并不足以確定是用戶設(shè)置了錯(cuò)誤的幀長(zhǎng)度,還是正確幀的后續(xù)數(shù)據(jù)字節(jié)還沒有到達(dá),但無(wú)論何種情況,將丟棄狀態(tài)discardingTooLongFrame標(biāo)記設(shè)置為true,之后后續(xù)數(shù)據(jù)字節(jié)進(jìn)入狀態(tài)1處理。==0時(shí),在failIfNecessary(true)無(wú)論如何都將拋出異常,><0時(shí),只有設(shè)置快速失敗才會(huì)拋出異常。還需注意一點(diǎn):failIfNecessary()的參數(shù)firstDetectionOfTooLongFrame的首次是指正確解析數(shù)據(jù)后發(fā)生的第一次發(fā)生的幀過長(zhǎng),可知會(huì)有很多首次。
狀態(tài)5:正確解碼出消息幀。
int frameLengthInt = (int) frameLength;if (in.readableBytes() < frameLengthInt) {return null; // 到達(dá)的數(shù)據(jù)還達(dá)不到幀長(zhǎng)}if (initialBytesToStrip > frameLengthInt) {in.skipBytes(frameLengthInt); // 跳過字節(jié)數(shù)錯(cuò)誤throw new CorruptedFrameException("...");}in.skipBytes(initialBytesToStrip);// 正確解碼出數(shù)據(jù)幀int readerIndex = in.readerIndex();int actualFrameLength = frameLengthInt - initialBytesToStrip;ByteBuf frame = in.slice(readerIndex, actualFrameLength).retain();in.readerIndex(readerIndex + actualFrameLength);return frame; 復(fù)制代碼代碼中混合了兩個(gè)簡(jiǎn)單狀態(tài),到達(dá)的數(shù)據(jù)還達(dá)不到幀長(zhǎng)和用戶設(shè)置的忽略字節(jié)數(shù)錯(cuò)誤。由于較為簡(jiǎn)單,故合并到一起。 至此解碼框架分析完畢。可見,要正確的寫出基于長(zhǎng)度字段的解碼器還是較為復(fù)雜的,如果開發(fā)時(shí)確有需求,特別要注意狀態(tài)的轉(zhuǎn)移。下面介紹較為簡(jiǎn)單的編碼框架。
請(qǐng)戳GitHub原文: https://github.com/wangzhiwubigdata/God-Of-BigData關(guān)注公眾號(hào),內(nèi)推,面試,資源下載,關(guān)注更多大數(shù)據(jù)技術(shù)~大數(shù)據(jù)成神之路~預(yù)計(jì)更新500+篇文章,已經(jīng)更新60+篇~ 復(fù)制代碼總結(jié)
以上是生活随笔為你收集整理的Netty源码解析8-ChannelHandler实例之CodecHandler的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Scrapy基本用法
- 下一篇: Infinispan 10.0.0.Be