2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
java的網(wǎng)絡(luò)工具netty簡介
? ? ? ? ?Netty是一個NIO的客服端服務(wù)器框架,它可以簡單、快速的搭建器一個協(xié)議包客服端服務(wù)器的應(yīng)用程序。它極大的簡化了TCP和UDP這類的網(wǎng)絡(luò)編程。
???“快速”和“簡單”并不意味著會讓你的最終應(yīng)用產(chǎn)生維護(hù)性或性能上的問題。Netty 是一個吸收了多種協(xié)議的實現(xiàn)經(jīng)驗,這些協(xié)議包括FTP,SMTP,HTTP,各種二進(jìn)制,文本協(xié)議,并經(jīng)過相當(dāng)精心設(shè)計的項目,最終,Netty 成功的找到了一種方式,在保證易于開發(fā)的同時還保證了其應(yīng)用的性能,穩(wěn)定性和伸縮性。
????????這里簡單記錄下學(xué)習(xí)要點,詳細(xì)的講解。可以看官網(wǎng)(github:https://github.com/netty/netty?)或者查看李林鋒的的系列文章http://ifeve.com/author/linfeng/?。
????????體系結(jié)構(gòu)圖:
????????由李林鋒講解的易懂的架構(gòu)圖:
????1、兩個selector線程:mainReactor處理accpet事件、subReactor處理connection、read、send事件
????2、業(yè)務(wù)處理線程池:包括編碼、解碼、業(yè)務(wù)處理。
1、官網(wǎng)案例
? ? a、處理bytes的serverhandler
/***?@see?進(jìn)入的channel:用于處理接受時候的事件處理*/
public?class?TimeServerHandler?extends?ChannelInboundHandlerAdapter?{/***?@see?當(dāng)一個channel準(zhǔn)備好的時候,發(fā)送一個32位的數(shù)字*/public?void?channelActive(final?ChannelHandlerContext?ctx)?{//?ByteBuf:沒有了flip()。它只有2個功能:讀、寫//?讀://?寫:當(dāng)你寫的時候,如果讀取下標(biāo)沒有改變,則繼續(xù)增長final?ByteBuf?time?=?ctx.alloc().buffer(4);time.writeInt((int)?(System.currentTimeMillis()?/?1000L?+?2208988800L));final?ChannelFuture?f?=?ctx.writeAndFlush(time);//?當(dāng)寫如完成的時候,執(zhí)行f.addListener(new?ChannelFutureListener()?{public?void?operationComplete(ChannelFuture?future)?throws?Exception?{//?TODO?Auto-generated?method?stubassert?f?==?future;ctx.close();}});}public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}
}
? ? sever的啟動部分
public?class?TimeServer?{private?int?port;public?TimeServer()?{this.port?=?port;}public?void?runn()?throws?Exception?{EventLoopGroup?bossGroup?=?new?NioEventLoopGroup();EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{ServerBootstrap?b?=?new?ServerBootstrap();b.group(bossGroup,?workerGroup);b.channel(NioServerSocketChannel.class);b.childHandler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{//?TODO?Auto-generated?method?stubch.pipeline().addLast(new?TimeServerHandler());}});b.option(ChannelOption.SO_BACKLOG,?128);b.childOption(ChannelOption.SO_KEEPALIVE,?true);ChannelFuture?f?=?b.bind(port).sync();f.channel().closeFuture().sync();}?finally?{workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public?static?void?main(String[]?args)?{try?{new?TimeServer().runn();}?catch?(Exception?e)?{e.printStackTrace();}}
}
????b、client部分:處理字節(jié)
public?class?TimeDecoder?extends?ByteToMessageDecoder?{/***?@see?定義一個回調(diào)的數(shù)據(jù)累加buff*?@see?如果有out,則表示解析成功。*/@Overrideprotected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List<Object>?out)?throws?Exception?{if?(in.readableBytes()?<?4)?{return;}out.add(in.readBytes(4));}}
? ?client的 channel處理類
public?class?TimeClientHandler?extends?ChannelInboundHandlerAdapter?{public?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?{ByteBuf?buf?=?(ByteBuf)?msg;try?{long?currentTimeMillis?=?(buf.readUnsignedInt()?-?2208988800L)?*?1000L;System.out.println(new?Date(currentTimeMillis));ctx.close();}?catch?(Exception?e)?{e.printStackTrace();}?finally?{buf.release();}}public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}
}
????client的啟動類:
public?class?TimeClient?{public?static?void?main(String[]?args)?throws?InterruptedException?{String?host?=?args[0];int?port?=?Integer.parseInt(args[1]);EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();try?{Bootstrap?b?=?new?Bootstrap();//?啟動客服端連接b.group(workerGroup);//?同時用于主線程和工作線程b.channel(NioSocketChannel.class);//?客服端需要的channelb.option(ChannelOption.SO_KEEPALIVE,?true);?//?socketChannel沒有父類b.handler(new?ChannelInitializer<SocketChannel>()?{@Overrideprotected?void?initChannel(SocketChannel?ch)?throws?Exception?{//?TODO?Auto-generated?method?stubch.pipeline().addLast(new?TimeClientHandler());}});ChannelFuture?f?=?b.connect(host,?port).sync();f.channel().closeFuture().sync();}?finally?{workerGroup.shutdownGracefully();}}
}
2、stream處理
????小型buffer的socket傳送流傳輸依據(jù)TCP/IP,接受的數(shù)據(jù)是儲存在一個接受的socket的buffer中。但是,傳送的buffer不是一個隊列包、而是一個隊列btyes。這就意味著,即使你使用兩個包去傳送兩端信息,系統(tǒng)不會將它們視為兩端信息,而是作為一串bytes。因此,這不能保證你讀去的數(shù)據(jù)是你遠(yuǎn)程寫入的數(shù)據(jù)。例如,我們需要使用系統(tǒng)的TCP/IP棧接受到3個數(shù)據(jù)包。
?????
因為根據(jù)流協(xié)議,你很有可能在你的應(yīng)用中讀取到你下面的部分
因次,在服務(wù)器和客服端的接受部分,對接受數(shù)據(jù)必須定義一個協(xié)議的框架(處理方式),這個框架能夠被應(yīng)用程序使用。接收到的部分必須是下面這種方式。
????a、第一種解決方式:
????????在TIME client的實例中。我們同樣是有一個相似的問題,一個非常小的32位bit的整數(shù)數(shù)據(jù),它不太可能分散。然而,隨著流量的增加,問題是它會碎片化。
????????簡單的解決方式,增加一個內(nèi)部的累加buffer,然后將接受的4bytes傳輸?shù)竭@個buffer中。在TimeClientHandler直接修改
public?class?TimeClientHandler2?extends?ChannelInboundHandlerAdapter?{private?ByteBuf?buf;@Overridepublic?void?handlerAdded(ChannelHandlerContext?ctx)?{buf?=?ctx.alloc().buffer(4);}@Overridepublic?void?handlerRemoved(ChannelHandlerContext?ctx)?{buf.release();buf?=?null;}@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?{ByteBuf?m?=?(ByteBuf)?msg;buf.writeBytes(m);m.release();if?(buf.readableBytes()?>=?4)?{long?currentTimeMillis?=?(buf.readUnsignedInt()?-?2208988800L)?*?1000L;System.out.println(new?Date(currentTimeMillis));ctx.close();}}@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}
}
????b、第二種就是前面的實例方式、將decode分離出來處理。看起來清晰、方便
3、編解object數(shù)據(jù)
? ??object
public?class?UnixTime?{private?final?long?value;public?UnixTime()?{this(System.currentTimeMillis()?/?1000L?+?2208988800L);}public?UnixTime(long?value)?{this.value?=?value;}public?long?value()?{return?this.value;}public?String?toString()?{return?new?Date((value()?-?2208988800L)?*?1000L).toString();}}
????object:decode
public?class?TimeDecoder2?extends?ByteToMessageDecoder?{/***?@see?定義一個回調(diào)的數(shù)據(jù)累加buff*?@see?如果有out,則表示解析成功。*/@Overrideprotected?void?decode(ChannelHandlerContext?ctx,?ByteBuf?in,?List<Object>?out)?throws?Exception?{if?(in.readableBytes()?<?4)?{return;}out.add(new?UnixTime(in.readUnsignedInt()));}
}
????objec:encode
public?class?TimeEncoder?extends?ChannelOutboundHandlerAdapter?{public?void?write(ChannelHandlerContext?ctx,?Object?msg,?ChannelPromise?promise)?{UnixTime?m?=?(UnixTime)?msg;ByteBuf?encoded?=?ctx.alloc().buffer(4);encoded.writeInt((int)?m.value());ctx.write(encoded,?promise);}
? ?
????server:handler
public?class?TimeServerHandler2?extends?ChannelInboundHandlerAdapter?{/***?@see?當(dāng)一個channel準(zhǔn)備好的時候,發(fā)送一個32位的數(shù)字*/public?void?channelActive(final?ChannelHandlerContext?ctx)?{//?ByteBuf:沒有了flip()。它只有2個功能:讀、寫//?讀://?寫:當(dāng)你寫的時候,如果讀取下標(biāo)沒有改變,則繼續(xù)增長final?ByteBuf?time?=?ctx.alloc().buffer(4);time.writeInt((int)?(System.currentTimeMillis()?/?1000L?+?2208988800L));final?ChannelFuture?f?=?ctx.writeAndFlush(new?UnixTime());//?當(dāng)寫如完成的時候,執(zhí)行f.addListener(ChannelFutureListener.CLOSE);}public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}
}
?
client:handler
public?class?TimeClientHandler3?extends?ChannelInboundHandlerAdapter?{@Overridepublic?void?channelRead(ChannelHandlerContext?ctx,?Object?msg)?{UnixTime?m?=?(UnixTime)?msg;System.out.println(m);ctx.close();}@Overridepublic?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?{cause.printStackTrace();ctx.close();}
}
轉(zhuǎn)載于:https://my.oschina.net/u/2246410/blog/652313
總結(jié)
以上是生活随笔為你收集整理的java的网络工具netty简介的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。