netty编解码器与序列化框架分析
netty編解碼器分析
編碼(Encode)也稱為序列化(serialization),它將對象序列化為字節(jié)數(shù)組,用于網(wǎng)絡傳輸、數(shù)據(jù)持久化或者其它用途。
反之,解碼(Decode)也稱為反序列化(deserialization),用于把從網(wǎng)絡、磁盤等讀取的字節(jié)數(shù)組還原成原始對象(通常是原始對象的拷貝),以方便后續(xù)的業(yè)務邏輯操作。
進行遠程跨進程服務調(diào)用時(例如 RPC 調(diào)用),需要使用特定的編解碼技術,對需要進行網(wǎng)絡傳輸?shù)膶ο笞鼍幋a或者解碼,以便完成遠程調(diào)用。
Java序列化
Java默認提供的序列化機制,需要序列化的Java對象只需要實現(xiàn)java.io.Serializable接口并生成序列化ID,這個類就能夠通過java.io.ObjectInputStream和java.io.ObjectOutputStrem序列化和反序列化。
但是由于它自身存在很多缺點,因此大多數(shù)的RPC框架并沒有選擇它。Java序列化的主要缺點如下:
在netty中使用java的序列化,只需要添加編碼器ObjectEncoder和解碼器ObjectDecoder即可,netty都封裝好了。
服務器端代碼:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ObjectEncoder()); // 編碼ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); // 解碼ch.pipeline().addLast(new ServerHandler());}});服務器端業(yè)務代碼:
public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {System.out.println("receive from client: " + msg);UserResponse response = new UserResponse();response.setCode(200);response.setMessage("success");ctx.writeAndFlush(response);}}思考:不需要解決粘包半包問題?ObjectEncoder和ObjectDecoder中已經(jīng)對粘包半包問題進行了處理,使用的是自定義消息長度的方式,下面看源代碼。
io.netty.handler.codec.serialization.ObjectEncoder的源代碼:
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {private static final byte[] LENGTH_PLACEHOLDER = new byte[4];@Overrideprotected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {int startIdx = out.writerIndex();ByteBufOutputStream bout = new ByteBufOutputStream(out);ObjectOutputStream oout = null;try {bout.write(LENGTH_PLACEHOLDER); // 先占位4個字節(jié)oout = new CompactObjectOutputStream(bout);oout.writeObject(msg);oout.flush();} finally {if (oout != null) {oout.close();} else {bout.close();}}int endIdx = out.writerIndex();out.setInt(startIdx, endIdx - startIdx - 4); // 計算報文的長度,放在開始的4個字節(jié)里面} }io.netty.handler.codec.serialization.ObjectDecoder解碼器的源代碼:
public class ObjectDecoder extends LengthFieldBasedFrameDecoder {... ...public ObjectDecoder(int maxObjectSize, ClassResolver classResolver) {super(maxObjectSize, 0, 4, 0, 4);this.classResolver = classResolver;}... ... }發(fā)現(xiàn)ObjectDecoder繼承LengthFieldBasedFrameDecoder,構(gòu)造方法中調(diào)用了父類的構(gòu)造方法super(maxObjectSize, 0, 4, 0, 4);,指定了解碼時前4個字節(jié)為報文的長度,解碼后丟棄前面的4個字節(jié),傳遞給后面的Handler。
Google的Protobuf
Protobuf全稱Google Protocol Buffers,它由谷歌開源而來,在谷歌內(nèi)部久經(jīng)考驗。它將數(shù)據(jù)結(jié)構(gòu)以.proto 文件進行描述,通過代碼生成工具可以生成對應數(shù)據(jù)結(jié)構(gòu)的POJO對象和Protobuf相關的方法和屬性。
它的特點如下:
使用方法如下:
先下載對應操作系統(tǒng)的工具,下載地址:https://github.com/protocolbuffers/protobuf/releases,這里使用的是protoc-3.13.0-win64.zip,解壓縮后bin目錄下有個protoc.exe。
編寫UserResponse.proto文件:
syntax = "proto3"; option java_package = "com.morris.netty.serialize.protobuf"; option java_outer_classname = "UserResponseProto";message UserResponse {int32 code = 1;string message = 2; }然后使用protoc.exe生成對應的實體類:
> protoc.exe --java_out=. UserResponse.proto把生成的實體類,拷貝到工程對應的目錄。
項目中引入protobuf的maven依賴:
dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>3.6.1</version> </dependency>服務器端代碼:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); // 處理粘包半包的解碼器ch.pipeline().addLast(new ProtobufDecoder(UserRequestProto.UserRequest.getDefaultInstance())); // protobuf解碼器ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); // 處理粘包半包的編碼器ch.pipeline().addLast(new ProtobufEncoder()); // protobuf編碼器ch.pipeline().addLast(new ServerHandler()); // 具體業(yè)務處理}});服務器端業(yè)務代碼:
public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("receive from client: " + msg);UserResponseProto.UserResponse response = UserResponseProto.UserResponse.newBuilder().setCode(200).setMessage("success").buildPartial();ctx.writeAndFlush(response);}}JBoss Marshalling
JBoss Marshalling是一個Java對象的序列化API包,修正了JDK自帶的序列化包的很多問題,但又保持跟java.io.Serializable接口的兼容;同時增加了一些可調(diào)的參數(shù)和附加的特性,并且這些參數(shù)和特性可通過工廠類進行配置。
相比于傳統(tǒng)的Java序列化機制,它的優(yōu)點如下:
使用方法如下:
引入marshalling的maven依賴:
<dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.6.Final</version> </dependency>創(chuàng)建一個生產(chǎn)marshalling編碼器和marshalling解碼器的工廠類:
package com.morris.netty.serialize.marshalling;import io.netty.handler.codec.marshalling.*; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration;public final class MarshallingCodeCFactory {public static MarshallingDecoder buildMarshallingDecoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);return decoder;}public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;} }服務器端代碼:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); // 解碼器ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); // 編碼器ch.pipeline().addLast(new ServerHandler());}});這里不需要處理粘包和半包問題的原因與java序列化不需要處理的原因一致,具體可以參考MarshallingEncoder和MarshallingDecoder的源代碼。
服務器端業(yè)務代碼與java序列化的服務器端業(yè)務代碼一致。
相比于前面介紹的兩種編解碼框架,JBoss Marshalling更多是在JBoss內(nèi)部使用,應用范圍有限,netty是JBoss公司的,所以netty要支持一下自己家的產(chǎn)品。
自定義編解碼器MessagePack
編碼器相關基類:
- 將消息編碼為字節(jié):MessageToByteEncoder。
- 將消息編碼為消息:MessageToMessageEncoder,T代表源數(shù)據(jù)的類型。
解碼器相關基類:
- 將字節(jié)解碼為消息:ByteToMessageDecoder。
- 將一種消息類型解碼為另一種消息類型:MessageToMessageDecoder。
編解碼器類:
- ByteToMessageCodec。
- MessageToMessageCodec。
下面通過自定義編碼器和解碼器,將MessagePack作為序列化框架嵌入到項目中來:
引入MessagePack的maven依賴:
<dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version> </dependency>編碼器:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.msgpack.MessagePack;public class MessagePackEncoder extends MessageToByteEncoder<UserRequest> {@Overrideprotected void encode(ChannelHandlerContext ctx, UserRequest msg, ByteBuf out)throws Exception {MessagePack messagePack = new MessagePack();byte[] raw = messagePack.write(msg);out.writeBytes(raw);} }解碼器:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; import org.msgpack.MessagePack;import java.io.Serializable; import java.util.List;public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)throws Exception {byte[] bytes = new byte[in.readableBytes()];in.readBytes(bytes);MessagePack messagePack = new MessagePack();Serializable serializable = messagePack.read(bytes, UserRequest.class);out.add(serializable);} }實體類,注意加上@Message注解:
package com.morris.netty.serialize.pojo;import lombok.Data; import org.msgpack.annotation.Message;import java.io.Serializable;@Message @Data public class UserRequest implements Serializable {private int age;private String name; }服務器端啟動類關鍵代碼:
ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, 0, 2)); // 解決粘包半包問題ch.pipeline().addLast(new MessagePackDecoder()); // 解碼器ch.pipeline().addLast(new ServerHandler());}});服務器端業(yè)務處理類:
package com.morris.netty.serialize.messagepack;import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;public class ServerHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {System.out.println("receive from client: " + msg);ctx.writeAndFlush(Unpooled.copiedBuffer("success\n".getBytes()));ctx.fireChannelRead(msg);}}客戶端啟動類關鍵代碼:
Bootstrap b = new Bootstrap(); b.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LengthFieldPrepender(2)); // 解決粘包半包問題// 服務器端發(fā)送過來的是以換行符分割的文本,所以這里使用LineBasedFrameDecoder處理粘包半包問題ch.pipeline().addLast(new LineBasedFrameDecoder(1024));ch.pipeline().addLast(new MessagePackEncoder()); // 編碼器ch.pipeline().addLast(new ClientHandler());}});客戶端業(yè)務處理類:
package com.morris.netty.serialize.messagepack;import com.morris.netty.serialize.pojo.UserRequest; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler;public class ClientHandler extends SimpleChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 0; i < 100; i++) {UserRequest request = new UserRequest();request.setAge(i);request.setName("morris");ctx.writeAndFlush(request);}}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) {ByteBuf receiveByteBuf = (ByteBuf) msg;byte[] bytes = new byte[receiveByteBuf.readableBytes()];receiveByteBuf.readBytes(bytes);System.out.println("receive from server: " + new String(bytes));} }當然,除了上述介紹的編解碼框架和技術之外,比較常用的還有kryo、hession和Json等。
總結(jié)
以上是生活随笔為你收集整理的netty编解码器与序列化框架分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用vue完成点击鼠标产生小心心特效
- 下一篇: web网页设计实例作业 ——中国茶文化(