Netty详解(三):Netty 入门应用
生活随笔
收集整理的這篇文章主要介紹了
Netty详解(三):Netty 入门应用
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. Netty服務(wù)端開發(fā)
TimeServer.java
package com.basic.netty.bio;import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;public class TimeServer {public void bind(int port)throws Exception{//配置服務(wù)端的NIO線程組,包包含了一組NIO線程,專門用于網(wǎng)絡(luò)事件的處理,//實際上它們就是Reactor線程組。//這里創(chuàng)建了兩個,一個用于服務(wù)端接受客戶端的連接,//另一個用于進(jìn)行SocketChannel的網(wǎng)絡(luò)讀寫。EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup=new NioEventLoopGroup();try{//Netty用于啟動NIO服務(wù)端的輔助啟動類,目的是降低服務(wù)端的開發(fā)復(fù)雜度ServerBootstrap b = new ServerBootstrap();//將兩個NIO線程組當(dāng)作入?yún)鬟f到ServerBootstrap中b.group(bossGroup,workerGroup)//功能對應(yīng)于JDK NIO類庫中的ServerSocketChannel類.channel(NioServerSocketChannel.class)//配置TCP參數(shù),這里將backlog設(shè)置為1024.option(ChannelOption.SO_BACKLOG,1024)//綁定I/O事件的處理類ChildChannelHandler,它//的作用類似于Reactor模式中的Handler類,主要用于處理網(wǎng)絡(luò)I/O事件,例如記錄日志、對消息進(jìn)行編解碼等。.childHandler(new ChildChannelHandler());//綁定端口,同步等待成功ChannelFuture f=b.bind(port).sync();//調(diào)用同步阻塞方法sync 等待綁定操作完成,完成后Netty會返回一個ChannelFuture,//它的功能類似于JDK 的 java.util.concurrent.Future,主要用于異步操作的通知回調(diào)。//等待服務(wù)端鏈路關(guān)閉之后main函數(shù)才退出。f.channel().closeFuture().sync();}finally{//優(yōu)雅退出,翻放線程池資源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{@Overrideprotected void initChannel(SocketChannel arg0) throws Exception {arg0.pipeline().addLast(new TimeServerHandler());}}public static void main(String[] args) throws Exception{int port=8080;new TimeServer().bind(port);} }TimeServerHandler.java
package com.basic.netty.bio;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext;//從ChannelHandlerAdapter繼承,用于對網(wǎng)絡(luò)事件進(jìn)行讀寫操作,通常只需要關(guān)注channelRead和exceptionCaught方法 public class TimeServerHandler extends ChannelHandlerAdapter{//channelRead() 該方法在接受到數(shù)據(jù)的時候自動調(diào)用(會存在半包問題)@Overridepublic void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{//類型轉(zhuǎn)換,將msg轉(zhuǎn)換為Netty的ByteBuf,類似JDK的java.nio.ByteBuffer對象ByteBuf buf=(ByteBuf)msg;//獲取緩沖區(qū)可讀字節(jié)數(shù),創(chuàng)建byte數(shù)組byte[] req=new byte[buf.readableBytes()];//將緩沖區(qū)字節(jié)復(fù)制到新建的數(shù)組中buf.readBytes(req);//獲取請求消息String body=new String(req,"UTF-8");System.out.println("The time server receive order : "+body);String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";ByteBuf resp =Unpooled.copiedBuffer(currentTime.getBytes());ctx.write(resp);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx)throws Exception{/*將消息發(fā)送隊列中的消息寫入到SocketChannel中發(fā)送給對方。從性能角度考慮,為了防止頻繁地喚醒Selector進(jìn)行消息發(fā)送,Netty的write方法并不直接將消息寫入SocketChannel中,調(diào)用write方法只是把待發(fā)送的消息放到發(fā)送緩沖數(shù)組中,再通過調(diào)用flush方法,將發(fā)送的緩沖區(qū)的消息全部寫到SocketChannel中*/ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){//發(fā)生異常時,關(guān)閉ChannelHandlerContextctx.close();} }2. Netty客戶端開發(fā)
TimeClient.java
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; public class TimeClient {public void connect(int port,String host) throws Exception{EventLoopGroup group=new NioEventLoopGroup();try{Bootstrap b=new Bootstrap();//Channel需要設(shè)置為NioSocketChannel,然后為其添加Handlerb.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).handler(new ChannelInitializer<SocketChannel>(){//為了簡單直接創(chuàng)建匿名內(nèi)部類,實現(xiàn)initChannel方法//其作用是當(dāng)創(chuàng)建NioSocketChannel成功之后,在進(jìn)行初始化時,//將它的ChannelHandler設(shè)置到ChannelPipeline中,用于處理網(wǎng)絡(luò)I/O事件@Overridepublic void initChannel(SocketChannel ch) throws Exception{ch.pipeline().addLast(new TimeClientHandler());}});//發(fā)起異步連接,然后調(diào)用同步方法等待連接成功ChannelFuture f=b.connect(host,port).sync();//當(dāng)客戶端連接關(guān)閉之后,客戶端主函數(shù)退出,退出前釋放NIO線程組的資源f.channel().closeFuture().sync();}finally{}}public static void main(String[] args) throws Exception {int port=8080;new TimeClient().connect(port, "127.0.0.1");} }TimeClientHandler.java
import java.util.logging.Logger;import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext;public class TimeClientHandler extends ChannelHandlerAdapter{private static final Logger logger=Logger.getLogger(TimeClientHandler.class.getName());private final ByteBuf firstMessage;public TimeClientHandler(){byte[] req="QUERY TIME ORDER".getBytes();firstMessage=Unpooled.buffer(req.length);firstMessage.writeBytes(req);}/*** 當(dāng)客戶端和服務(wù)器TCP鏈路建立成功后,NIO線程會調(diào)用channelActive方法*/@Overridepublic void channelActive(ChannelHandlerContext ctx){//發(fā)送查詢時間的指令給服務(wù)端ctx.writeAndFlush(firstMessage);}/*** 當(dāng)服務(wù)端返回應(yīng)答消息時調(diào)用*/@Overridepublic void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{ByteBuf buf=(ByteBuf)msg;byte[] req=new byte[buf.readableBytes()];buf.readBytes(req);String body=new String(req,"UTF-8");System.out.println("Now is : " + body);}/*** 當(dāng)發(fā)生異常時*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){logger.warning("Unexpected exception from downstrea : " + cause.getMessage());ctx.close();} }上述例程仍沒有考慮讀半包的處理,但進(jìn)行性能或者壓力測試將不能正確工作。
3. 總結(jié)
- 當(dāng)channel上面有數(shù)據(jù)到來時會觸發(fā)channelRead事件,當(dāng)數(shù)據(jù)到來時,eventLoop被喚醒繼而調(diào)用channelRead方法處理數(shù)據(jù)。
- 當(dāng)Channel上一旦沒有更多數(shù)據(jù)要從底層傳輸中讀取,就會觸發(fā)channelReadComplete()。可能是以下兩種情況,read到0個字節(jié)或者是read到的字節(jié)數(shù)小于buffer的容量,滿足以上條件就會調(diào)用channelReadComplete方法。
總結(jié)
以上是生活随笔為你收集整理的Netty详解(三):Netty 入门应用的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: NIO详解(十三):Java IO 和N
- 下一篇: Netty详解(五):Netty TCP