从NIO到Netty开发
轉載自??從NIO到Netty開發
1. 從傳統BIO到NIO的升級
2. NIO新特性
?
3. NIO服務端實現
根據上圖,發現服務端的事件有2個,一是接受連接事件,二是讀取數據:
public class NIOServer {private ByteBuffer readBuffer; ? ?private Selector selector; ? ?private ServerSocket serverSocket; ? ?public static void main(String[] args) {NIOServer server = new NIOServer();server.init();System.out.println("server started:8383");server.listener();} ? ?public void init() { ? ? ? ?//1. 創建臨時緩沖區readBuffer = ByteBuffer.allocate(1024); ? ? ? ?//2. 創建服務端Socket非阻塞通道ServerSocketChannel serverSocketChannel; ? ? ? ?try {serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false); ? ? ? ? ? ?//3. 指定內部Socket綁定的服務端地址 并支持重用端口,因為有可能多個客戶端同時訪問同一端口serverSocket=serverSocketChannel.socket();serverSocket.setReuseAddress(true);serverSocket.bind(new InetSocketAddress(8383)); ? ? ? ? ? ?//4. 創建輪詢器 并綁定到管道上,開始監聽客戶端請求selector = Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);} catch (Exception e) {e.printStackTrace();}} ? ?private void listener() { ? ? ? ?while (true) { ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?//5. 開始監聽事件,不斷取出事件的key,假如存在事件,則直接處理。selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); ? ? ? ? ? ? ? ?while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove();handleKey(key);}} catch (Exception e) {e.printStackTrace();}}} ? ?private void handleKey(SelectionKey key) throws IOException {SocketChannel channel = null; ? ? ? ?try { ? ? ? ? ? ?//6. 如果有客戶端要連接 這里則處理是否接受連接事件if (key.isAcceptable()) {ServerSocketChannel severChannel = (ServerSocketChannel) key.channel();channel = severChannel.accept();channel.configureBlocking(false); ? ? ? ? ? ? ? ?// 告訴輪詢器 接下來關心的是讀取客戶端數據這件事channel.register(selector, SelectionKey.OP_READ);} else if (key.isReadable()) { //7. 如果客戶端發送數據,則這里讀取數據。channel = (SocketChannel) key.channel(); ? ? ? ? ? ? ? ?// 清空緩沖區readBuffer.clear(); ? ? ? ? ? ? ? ?// 當客戶端關閉channel后,會不斷收到read事件,此刻read方法返回-1 所以對應的服務器端也需要關閉channelint readCount = channel.read(readBuffer); ? ? ? ? ? ? ? ?if (readCount > 0) {readBuffer.flip();String question = CharsetHelper.decode(readBuffer).toString();System.out.println("server get the question:" + question);String answer = getAnswer(question);channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));} else {channel.close();}} } catch (Exception e) {e.printStackTrace();}finally { ? ? ? ? ? ?//8. 斷開連接通道if (channel!=null) {channel.close();}}} ? ?public static String getAnswer(String question) {String answer = null; ? ? ? ?switch (question) { ? ? ? ?case "who":answer = "我是小娜\n"; ? ? ? ? ? ?break; ? ? ? ?case "what":answer = "我是來幫你解悶的\n"; ? ? ? ? ? ?break; ? ? ? ?case "where":answer = "我來自外太空\n"; ? ? ? ? ? ?break; ? ? ? ?case "hi":answer = "hello\n"; ? ? ? ? ? ?break; ? ? ? ?case "bye":answer = "88\n"; ? ? ? ? ? ?break; ? ? ? ?default:answer = "請輸入 who, 或者what, 或者where";} ? ? ? ?return answer;}}4. NIO客戶端實現:
客戶端的實現有3個步驟:1.請求連接。2.當連接成功,寫數據。3.讀取服務端結果。
public class NIOClient implements Runnable {private BlockingQueue<String> words; ? ?private Random random; ? ?public static void main(String[] args) { ? ? ? ?// 多個線程發起Socket客戶端連接請求for (int i = 0; i < 5; i++) {NIOClient c = new NIOClient();c.init(); ? ? ? ? ? ?new Thread(c).start();}} ? ?//1. 初始化要發送的數據private void init() {words = new ArrayBlockingQueue<String>(5);random = new Random(); ? ? ? ?try {words.put("hi");words.put("who");words.put("what");words.put("where");words.put("bye");} catch (Exception e) { ? ? ? ? ? ?// TODO: handle exception}} ? ?//2. 啟動子線程代碼@Overridepublic void run() {SocketChannel channel = null;Selector selector = null; ? ? ? ?try { ? ? ? ? ? ?//3. 創建連接服務端的通道 并設置為阻塞方法,這里需要指定服務端的ip和端口號channel = SocketChannel.open();channel.configureBlocking(false);channel.connect(new InetSocketAddress("localhost", 8383));selector = Selector.open(); ? ? ? ? ? ?//4. 請求關心連接事件channel.register(selector, SelectionKey.OP_CONNECT); ? ? ? ? ? ?boolean isOver = false; ? ? ? ? ? ?while (!isOver) {selector.select();Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); ? ? ? ? ? ? ? ?while (keys.hasNext()) {SelectionKey key = keys.next();keys.remove(); ? ? ? ? ? ? ? ? ? ?if (key.isConnectable()) { //5. 當通道連接準備完畢,發送請求并指定接收允許獲取服務端返回信息if (channel.isConnectionPending()) { ? ? ? ? ? ? ? ? ? ? ? ? ? ?if (channel.finishConnect()) {key.interestOps(SelectionKey.OP_READ);channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));sleep();} else {key.cancel();}}} else if (key.isReadable()) {//6. 開始讀取服務端返回數據ByteBuffer byteBuffer = ByteBuffer.allocate(512);channel.read(byteBuffer);byteBuffer.flip();String answer = CharsetHelper.decode(byteBuffer).toString();System.out.println("client get the answer:" + answer);String word = getWord(); ? ? ? ? ? ? ? ? ? ? ? ?if (word != null) {channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));} else {isOver = true;}sleep();}}}} catch (Exception e) {e.printStackTrace();} finally { ? ? ? ? ? ?//7. 關閉通道if (channel != null) { ? ? ? ? ? ? ? ?try {channel.close();} catch (IOException e) {e.printStackTrace();}}}} ? ?public String getWord() { ? ? ? ?return words.poll();} ? ?private void sleep() { ? ? ? ?try {TimeUnit.SECONDS.sleep(random.nextInt(3));} catch (InterruptedException e) {e.printStackTrace();}}}-
?
5.Netty開發簡介
上面提到,NIO可以實現同步非阻塞的數據交互,但是對于NIO來說,一個普通的請求數據需要太多的開發步驟,不利于推廣,這里主要介紹NIO的實現框架Netty.
Netty是由JBOSS提供的一個java開源框架。Netty提供異步的、事件驅動的網絡應用程序框架和工具,用以快速開發高性能、高可靠性的網絡服務器和客戶端程序。
也就是說,Netty 是一個基于NIO的客戶、服務器端編程框架,使用Netty 可以確保你快速和簡單的開發出一個網絡應用,例如實現了某種協議的客戶,服務端應用。Netty相當簡化和流線化了網絡應用的編程開發過程,例如,TCP和UDP的socket服務開發。
?
6. Netty服務端實現:
public class EchoServer {private final int port; ? ?public EchoServer(int port) { ? ? ? ?this.port = port;} ? ?public void start() throws Exception { ? ? ? ?//1.創建線程組EventLoopGroup group=new NioEventLoopGroup(); ? ? ? ?try { ? ? ? ? ? ?//2.創建服務端啟動對象 裝配線程組&交互通道&服務器端口&網絡請求處理器鏈ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(group).channel(NioServerSocketChannel.class) .localAddress("localhost", port) ? ?.childHandler(new ChannelInitializer<Channel>() { @Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoOutHandler1());ch.pipeline().addLast(new EchoOutHandler2());ch.pipeline().addLast(new EchoInHandler1());ch.pipeline().addLast(new EchoInHandler2());}}); ? ? ? ? ? ?// 3.開始監聽客戶端請求ChannelFuture channelFuture = serverBootstrap.bind().sync();System.out.println("開始監聽,端口號為:"+channelFuture.channel().localAddress()); ? ? ? ? ? ?// 4.等待所有請求執行完畢后,關閉通道;如請求還沒執行完,這里為阻塞狀態。channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();}finally { ? ? ? ? ? ?//5.停止所有線程組內部代碼的執行group.shutdownGracefully().sync();}} ? ?public static void main(String[] args) throws Exception { ? ? ? ?new EchoServer(2000).start();}}7.Netty客戶端實現:
public class EchoClient {public static void main(String[] args) throws InterruptedException { ? ? ? ?new EchoClient("localhost", 2000).start();} ? ?private final String host; ? ?private final int port; ? ?public EchoClient(String host, int port) { ? ? ? ?this.host = host; ? ? ? ?this.port = port;} ? ?private void start() throws InterruptedException { ? ? ? ?//1.創建線程組EventLoopGroup group = new NioEventLoopGroup(); ? ? ? ?try { ? ? ? ? ? ?//2. 創建客戶端啟動對象,同樣需要裝配線程組,通道,綁定遠程地址,請求處理器鏈。Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).remoteAddress(host, port).handler(new ChannelInitializer<Channel>() { ? ? ? ? ? ? ? ? ? ? ? ?@Overrideprotected void initChannel(Channel ch) throws Exception {ch.pipeline().addLast(new EchoClientHandler());}}); ? ? ? ? ? ?//3.開始請求連接ChannelFuture future = bootstrap.connect().sync(); ? ? ? ? ? ?//4.當請求操作結后,關閉通道。future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally { ? ? ? ? ? ?if (group != null) {group.shutdownGracefully().sync();}}}}8.Netty處理器鏈
對于向服務端發送一個請求,并得到一個響應來說。如果使用Netty來說,需要實現兩種不同的處理器,一個是讀的一個是寫的。他們共同組成一個鏈式調用,如下圖:
?
對于服務端來說,上面我們創建了4個處理器,他們組成一條鏈,分別是:EchoInHandler1 -> EchoInHandler2 -> EchoOutHandler2 -> EchoOutHandler1.
public class EchoInHandler1 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler1 ?channelRead..."); ? ? ? ?//將消息傳遞到新的鏈。。。ctx.fireChannelRead(msg);} ? ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();} }public class EchoInHandler2 extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println("EchoInHandler2 ?channelRead..."); ? ? ? ?// Object msg 為Netty的一種緩存對象ByteBuf buffer = (ByteBuf) msg; ? ? ? ?byte[] req = new byte[buffer.readableBytes()];buffer.readBytes(req);String reqBody = new String(req, "UTF-8");System.out.println("獲取到的客戶端請求:" + reqBody); ? ? ? ?// 往客戶端寫數據String date = new Date().toString();ByteBuf returnBuf = Unpooled.copiedBuffer(date.getBytes());ctx.write(returnBuf);} ? ?@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}}public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler2 write...");ctx.write(msg);}}public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter {@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("EchoOutHandler1 write...");System.out.println("write msg:" + msg);ctx.write(msg);ctx.flush();// 最后將數據刷新到客戶端}}客戶端的處理器主要是當連接成功后,請求獲取當前時間,并讀取返回結果:
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf>{ ? ?//客戶端連接服務器的時候調用@Override ? ?public void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客戶端連接服務器。。。。"); ? ? ? ?byte[] req = "QUERY TIME ORDER".getBytes();ByteBuf copiedBuffer = Unpooled.buffer(req.length);copiedBuffer.writeBytes(req);ctx.writeAndFlush(copiedBuffer);} ? ?//讀取服務端數據@Override ? ?protected void channelRead0(ChannelHandlerContext ctx, ByteBuf bytbuf) throws Exception {System.out.println("client get the server's data"); ? ? ? ?byte[] resp=new byte[bytbuf.readableBytes()];bytbuf.readBytes(resp);String respContent = new String(resp,"UTF-8");System.out.println("返回的數據:"+respContent);} ? ?//強制關閉服務器的連接也會造成異常:遠程主機強迫關閉了一個現有的連接。@Override ? ?public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println(cause.getLocalizedMessage());ctx.close();}}總結
以上是生活随笔為你收集整理的从NIO到Netty开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电视剧装台大结局介绍 装台简介
- 下一篇: 歌手何静简历 何静的简介