漫谈Java IO之 Netty与NIO服务器
前面介紹了基本的網絡模型以及IO與NIO,那么有了NIO來開發非阻塞服務器,大家就滿足了嗎?有了技術支持,就回去追求效率,因此就產生了很多NIO的框架對NIO進行封裝——這就是大名鼎鼎的Netty。
前幾篇的內容,可以參考:
為什么要使用開源框架?
這個問題幾乎可以當做廢話,框架肯定要比一些原生的API封裝了更多地功能,重復造輪子在追求效率的情況并不是明智之舉。那么先來說說NIO有什么缺點吧:
那么有了這些問題,就急需一些大牛們開發通用框架來方便勞苦大眾了。最致命的NIO框架就是MINA和Netty了,這里不得不說個小插曲:
先來看看MINA的主要貢獻者:
再來看看NETYY的主要貢獻者:
總結起來,有這么幾點:
因此,如果讓你選擇你應該知道選擇誰了吧。另外,MINA對底層系統要求功底更深,且國內Netty的氛圍更好,有李林峰等人在大力宣傳(《Netty權威指南》的作者)。
講了一大堆的廢話之后,總結來說就是——Netty有前途,學它準沒錯。
Netty介紹
按照定義來說,Netty是一個異步、事件驅動的用來做高性能、高可靠性的網絡應用框架。主要的優點有:
主要支持的功能或者特性有:
總之提供了很多現成的功能可以直接供開發者使用。
Netty服務器小例子
基于Netty的服務器編程可以看做是Reactor模型:
即包含一個接收連接的線程池(也有可能是單個線程,boss線程池)以及一個處理連接的線程池(worker線程池)。boss負責接收連接,并進行IO監聽;worker負責后續的處理。為了便于理解Netty,直接看看代碼:
代碼非常少,而且想要換成阻塞IO,只需要替換Channel里面的工廠類即可:
public class NettyOioServer {public void serve(int port) throws InterruptedException {final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\b", Charset.forName("UTF-8")));EventLoopGroup bossGroup = new OioEventLoopGroup(1);EventLoopGroup workerGroup = new OioEventLoopGroup();try{ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//配置boss和worker.channel(OioServerSocketChannel.class) // 使用阻塞的SocketChannel....概括來說,在Netty中包含下面幾個主要的組件:
- Bootstrap:netty的組件容器,用于把其他各個部分連接起來;如果是TCP的Server端,則為ServerBootstrap.
- Channel:代表一個Socket的連接
- EventLoopGroup:一個Group包含多個EventLoop,可以理解為線程池
- EventLoop:處理具體的Channel,一個EventLoop可以處理多個Channel
- ChannelPipeline:每個Channel綁定一個pipeline,在上面注冊處理邏輯handler
- Handler:具體的對消息或連接的處理,有兩種類型,Inbound和Outbound。分別代表消息接收的處理和消息發送的處理。
- ChannelFuture:注解回調方法
了解上面的基本組件后,就看一下幾個重要的內容。
Netty的Buffer和零拷貝
在Unix操作系統中,系統底層可以基于mmap實現內核空間和用戶空間的內存映射。但是在Netty中并不是這個意思,它主要來自于下面幾個功能:
另外,Netty自己封裝實現了ByteBuf,相比于Nio原生的ByteBuffer,API上更易用了;同時支持容量的動態擴容;另外還支持Buffer的池化,高效復用Buffer。
public class ByteBufTest {public static void main(String[] args) {//創建bytebufByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());System.out.println(buf);// 讀取一個字節buf.readByte();System.out.println(buf);// 讀取一個字節buf.readByte();System.out.println(buf);// 丟棄無用數據buf.discardReadBytes();System.out.println(buf);// 清空buf.clear();System.out.println(buf);// 寫入buf.writeBytes("123".getBytes());System.out.println(buf);buf.markReaderIndex();System.out.println("mark:"+buf);buf.readByte();buf.readByte();System.out.println("read:"+buf);buf.resetReaderIndex();System.out.println("reset:"+buf);} }輸出為:
UnpooledHeapByteBuf(ridx: 0, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 1, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 2, widx: 5, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 0, cap: 5/5) UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) mark:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5) read:UnpooledHeapByteBuf(ridx: 2, widx: 3, cap: 5/5) reset:UnpooledHeapByteBuf(ridx: 0, widx: 3, cap: 5/5)有興趣的可以看一下上一篇分享的ByteBuffer,對比一下,就能發現在Netty中通過獨立的讀寫索引維護,避免讀寫模式的切換,更加方便了。
Handler的使用
前面介紹了Handler包含了Inbound和Outbound兩種,他們統一放在一個雙向鏈表中:
當接收消息的時候,會從鏈表的表頭開始遍歷,如果是inbound就調用對應的方法;如果發送消息則從鏈表的尾巴開始遍歷。那么上面途中的例子,接收消息就會輸出:
InboundA --> InboundB --> InboundC輸出消息,則會輸出:
OutboundC --> OutboundB --> OutboundA這里有段代碼,可以直接復制下來,試試看:
package cn.xingoo.book.netty.pipeline;import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset;/*** 注意:** 1 ChannelOutboundHandler要在最后一個Inbound之前**/ public class NettyNioServerHandlerTest {final static ByteBuf buffer = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi\r\n", Charset.forName("UTF-8")));public void serve(int port) throws InterruptedException {EventLoopGroup bossGroup = new NioEventLoopGroup(1);EventLoopGroup workerGroup = new NioEventLoopGroup();try{ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast("1",new InboundA());pipeline.addLast("2",new OutboundA());pipeline.addLast("3",new InboundB());pipeline.addLast("4",new OutboundB());pipeline.addLast("5",new OutboundC());pipeline.addLast("6",new InboundC());}});ChannelFuture f = b.bind().sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully().sync();workerGroup.shutdownGracefully().sync();}}public static void main(String[] args) throws InterruptedException {NettyNioServerHandlerTest server = new NettyNioServerHandlerTest();server.serve(5555);}private static class InboundA extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundA read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);}}private static class InboundB extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundB read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);// 從pipeline的尾巴開始找outboundctx.channel().writeAndFlush(buffer);}}private static class InboundC extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf)msg;System.out.println("InboundC read"+buf.toString(Charset.forName("UTF-8")));super.channelRead(ctx, msg);// 這樣會從當前的handler向前找outbound//ctx.writeAndFlush(buffer);}}private static class OutboundA extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundA write");super.write(ctx, msg, promise);}}private static class OutboundB extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundB write");super.write(ctx, msg, promise);}}private static class OutboundC extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutboundC write");super.write(ctx, msg, promise);}} }最后有一個TCP粘包的例子,有興趣的也可以自己試一下,代碼就不貼上來了,可以參考最后面的Github連接。
參考
轉載于:https://www.cnblogs.com/xing901022/p/8678869.html
總結
以上是生活随笔為你收集整理的漫谈Java IO之 Netty与NIO服务器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kindEditor富文本编辑器的工具栏
- 下一篇: javascript笔记整理(对象基础)