ByteBuf使用实例
之前我們有個netty5的拆包解決方案(參加netty5拆包問題解決實例),現(xiàn)在我們采用另一種思路,不需要新增LengthFieldBasedFrameDecoder,直接修改NettyMessageDecoder:
package com.wlf.netty.nettyapi.msgpack;
import com.wlf.netty.nettyapi.constant.Delimiter;
import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class NettyMessageDecoder extends ByteToMessageDecoder {
/**
* 消息體字節(jié)大小:分割符字段4字節(jié)+長度字段4字節(jié)+請求類型字典1字節(jié)+預(yù)留字段1字節(jié)=10字節(jié)
*/
private static final int HEAD_LENGTH = 10;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
while (true) {
// 標記字節(jié)流開始位置
byteBuf.markReaderIndex();
// 若讀取到分割標識,說明讀取當前字節(jié)流開始位置了
if (byteBuf.readInt() == Delimiter.DELIMITER) {
break;
}
// 重置讀索引為0
byteBuf.resetReaderIndex();
// 長度校驗,字節(jié)流長度至少10字節(jié),小于10字節(jié)則等待下一次字節(jié)流過來
if (byteBuf.readableBytes() < HEAD_LENGTH) {
byteBuf.resetReaderIndex();
return;
}
}
// 2、獲取data的字節(jié)流長度
int dataLength = byteBuf.readInt();
// 校驗數(shù)據(jù)包是否全部發(fā)送過來,總字節(jié)流長度(此處讀取的是除去delimiter和length之后的總長度)-
// type和reserved兩個字節(jié)=data的字節(jié)流長度
int totalLength = byteBuf.readableBytes();
if ((totalLength - 2) < dataLength) {
// 長度校驗,字節(jié)流長度少于數(shù)據(jù)包長度,說明數(shù)據(jù)包拆包了,等待下一次字節(jié)流過來
byteBuf.resetReaderIndex();
return;
}
// 3、請求類型
byte type = byteBuf.readByte();
// 4、預(yù)留字段
byte reserved = byteBuf.readByte();
// 5、數(shù)據(jù)包內(nèi)容
byte[] data = null;
if (dataLength > 0) {
data = new byte[dataLength];
byteBuf.readBytes(data);
}
NettyMessage nettyMessage = new NettyMessage();
Header header = new Header();
header.setDelimiter(Delimiter.DELIMITER);
header.setLength(dataLength);
header.setType(type);
header.setReserved(reserved);
nettyMessage.setHeader(header);
nettyMessage.setData(data);
list.add(nettyMessage);
// 回收已讀字節(jié)
byteBuf.discardReadBytes();
}
}
我們的改動很小,只不過將原來的讀索引改為標記索引,然后在拆包時退出方法前重置讀索引,這樣下次數(shù)據(jù)包過來,我們的讀索引依然從0開始,delimiter的標記就可以讀出來,而不會陷入死循環(huán)了。
ByteBuf是ByteBuffer的進化版,ByteBuffer(參見ByteBuffer使用實例)才一個索引,讀寫模式需要通過flip來轉(zhuǎn)換,而ByteBuf有兩個索引,readerIndex讀索引和writerIndex寫索引,讀寫轉(zhuǎn)換無縫連接,青出于藍而勝于藍:
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
| | (CONTENT) | |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
既然有兩個索引,那么標記mask、重置reset必然也是兩兩對應(yīng),上面的代碼中我們只需要用到讀標記和讀重置。
我們把客戶端handler也修改下,先把LengthFieldBasedFrameDecoder去掉:
// channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024 * 1024 * 1024, 4, 4, 2, 0));
再讓數(shù)據(jù)包更大一些:
/**
* 構(gòu)造PCM請求消息體
*
* @return
*/
private byte[] buildPcmData() throws Exception {
byte[] resultByte = longToBytes(System.currentTimeMillis());
// 讀取一個本地文件
String AUDIO_PATH = "D:\input\test_1.pcm";
try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
int len = -1;
byte[] content = new byte[1024];
while((len = raf.read(content)) != -1)
{
resultByte = addAll(resultByte, content);
}
}
return resultByte;
}
再debug下看看,第一次解析客戶端發(fā)送的數(shù)據(jù),讀取1024字節(jié),我們可以看到讀索引是8(delimiter+length=8),寫索引就是1024,我們的大包里有3939116個字節(jié),去掉10個字節(jié)的header,剩下小包是3939106::
第二次再讀1024,代碼已經(jīng)執(zhí)行reset重置讀索引了,所以讀索引由8改為0,寫索引累增到2048:
第三次再讀1024,寫索引繼續(xù)累增到3072:
最后一次發(fā)1024,寫索引已經(jīng)到達3939116,大包傳輸結(jié)束了:
從上面看出,我們對ByteBuf的capacity一直在翻倍,讀指針一直標記在大包的起始位置0,這樣做的目的是每次都能讀取小包的長度length(3939106),拿來跟整個ByteBuf的長度作比較,只要它取到的小包沒到達到length,我們就繼續(xù)接受新包,寫索引不停的累加,直到整個大包長度>=3939116(也就是小包>=3939106),這時我們開始移動讀索引,將字節(jié)流寫入對象,最后回收已讀取的字節(jié)(調(diào)用discardReaderBytes方法):
BEFORE discardReadBytes()
+-------------------+------------------+------------------+
| discardable bytes | readable bytes | writable bytes |
+-------------------+------------------+------------------+
| | | |
0 <= readerIndex <= writerIndex <= capacity
AFTER discardReadBytes()
+------------------+--------------------------------------+
| readable bytes | writable bytes (got more space) |
+------------------+--------------------------------------+
| | |
readerIndex (0) <= writerIndex (decreased) <= capacity
其他方法參見測試類:
package com.wlf.netty.nettyserver;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.junit.Assert;
import org.junit.Test;
public class ByteBufTest {
@Test
public void byteBufTest() {
ByteBuf byteBuf = Unpooled.buffer(10);
byteBuf.writeInt(0xabef0101);
byteBuf.writeInt(1024);
byteBuf.writeByte((byte) 1);
byteBuf.writeByte((byte) 0);
// 開始讀取
printDelimiter(byteBuf);
printLength(byteBuf);
// 派生一個ByteBuf,取剩下2個字節(jié),但讀索引不動
ByteBuf duplicatBuf = byteBuf.duplicate();
printByteBuf(byteBuf);
// 派生一個ByteBuf,取剩下2個字節(jié),讀索引動了
ByteBuf sliceBuf = byteBuf.readSlice(2);
printByteBuf(byteBuf);
// 兩個派生的對象其實是一樣的
Assert.assertEquals(duplicatBuf, sliceBuf);
}
private void printDelimiter(ByteBuf buf) {
int newDelimiter = buf.readInt();
System.out.printf("delimeter: %s
", Integer.toHexString(newDelimiter));
printByteBuf(buf);
}
private void printLength(ByteBuf buf) {
int length = buf.readInt();
System.out.printf("length: %d
", length);
printByteBuf(buf);
}
private void printByteBuf(ByteBuf buf) {
System.out.printf("reader Index: %d, writer Index: %d, capacity: %d
", buf.readerIndex(), buf.writerIndex(), buf.capacity());
}
}
輸出:
delimeter: abef0101 reader Index: 4, writer Index: 10, capacity: 10 length: 1024 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 8, writer Index: 10, capacity: 10 reader Index: 10, writer Index: 10, capacity: 10
總結(jié)
以上是生活随笔為你收集整理的ByteBuf使用实例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 洛谷模板,树状数组二 差分
- 下一篇: excel 批量替换换行符