Java 非阻塞 IO 和异步 IO
轉(zhuǎn)載自?Java 非阻塞 IO 和異步 IO
上一篇文章介紹了 Java NIO 中 Buffer、Channel 和 Selector 的基本操作,主要是一些接口操作,比較簡(jiǎn)單。
本文將介紹非阻塞 IO?和異步 IO,也就是大家耳熟能詳?shù)?NIO 和 AIO。很多初學(xué)者可能分不清楚異步和非阻塞的區(qū)別,只是在各種場(chǎng)合能聽(tīng)到異步非阻塞這個(gè)詞。
本文會(huì)先介紹并演示阻塞模式,然后引入非阻塞模式來(lái)對(duì)阻塞模式進(jìn)行優(yōu)化,最后再介紹 JDK7 引入的異步 IO,由于網(wǎng)上關(guān)于異步 IO 的介紹相對(duì)較少,所以這部分內(nèi)容我會(huì)介紹得具體一些。
希望看完本文,讀者可以對(duì)非阻塞 IO 和異步 IO 的迷霧看得更清晰些,或者為初學(xué)者解開(kāi)一絲絲疑惑也是好的。
NIO,JDK1.4,New IO,Non-Blocking IO
NIO.2,JDK7,More New IO,Asynchronous IO,嚴(yán)格地說(shuō) NIO.2 不僅僅引入了 AIO
阻塞模式 IO
我們已經(jīng)介紹過(guò)使用 Java NIO 包組成一個(gè)簡(jiǎn)單的客戶(hù)端-服務(wù)端網(wǎng)絡(luò)通訊所需要的 ServerSocketChannel、SocketChannel 和 Buffer,我們這里整合一下它們,給出一個(gè)完整的可運(yùn)行的例子:
| 1234567891011121314151617181920 | public class Server {????public static void main(String[] args) throws IOException {????????ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();????????// 監(jiān)聽(tīng) 8080 端口進(jìn)來(lái)的 TCP 鏈接????????serverSocketChannel.socket().bind(new InetSocketAddress(8080));????????while (true) {????????????// 這里會(huì)阻塞,直到有一個(gè)請(qǐng)求的連接進(jìn)來(lái)????????????SocketChannel socketChannel = serverSocketChannel.accept();????????????// 開(kāi)啟一個(gè)新的線(xiàn)程來(lái)處理這個(gè)請(qǐng)求,然后在 while 循環(huán)中繼續(xù)監(jiān)聽(tīng) 8080 端口????????????SocketHandler handler = new SocketHandler(socketChannel);????????????new Thread(handler).start();????????}????}} |
這里看一下新的線(xiàn)程需要做什么,SocketHandler:
| 12345678910111213141516171819202122232425262728293031323334353637 | public class SocketHandler implements Runnable {????private SocketChannel socketChannel;????public SocketHandler(SocketChannel socketChannel) {????????this.socketChannel = socketChannel;????}????@Override????public void run() {????????ByteBuffer buffer = ByteBuffer.allocate(1024);????????try {????????????// 將請(qǐng)求數(shù)據(jù)讀入 Buffer 中????????????int num;????????????while ((num = socketChannel.read(buffer)) > 0) {????????????????// 讀取 Buffer 內(nèi)容之前先 flip 一下????????????????buffer.flip();????????????????// 提取 Buffer 中的數(shù)據(jù)????????????????byte[] bytes = new byte[num];????????????????buffer.get(bytes);????????????????String re = new String(bytes, "UTF-8");????????????????System.out.println("收到請(qǐng)求:" + re);????????????????// 回應(yīng)客戶(hù)端????????????????ByteBuffer writeBuffer = ByteBuffer.wrap(("我已經(jīng)收到你的請(qǐng)求,你的請(qǐng)求內(nèi)容是:" + re).getBytes());????????????????socketChannel.write(writeBuffer);????????????????buffer.flip();????????????}????????} catch (IOException e) {????????????IOUtils.closeQuietly(socketChannel);????????}????}} |
最后,貼一下客戶(hù)端 SocketChannel 的使用,客戶(hù)端比較簡(jiǎn)單:
| 1234567891011121314151617181920212223 | public class SocketChannelTest {????public static void main(String[] args) throws IOException {????????SocketChannel socketChannel = SocketChannel.open();????????socketChannel.connect(new InetSocketAddress("localhost", 8080));????????// 發(fā)送請(qǐng)求????????ByteBuffer buffer = ByteBuffer.wrap("1234567890".getBytes());????????socketChannel.write(buffer);????????// 讀取響應(yīng)????????ByteBuffer readBuffer = ByteBuffer.allocate(1024);????????int num;????????if ((num = socketChannel.read(readBuffer)) > 0) {????????????readBuffer.flip();????????????byte[] re = new byte[num];????????????readBuffer.get(re);????????????String result = new String(re, "UTF-8");????????????System.out.println("返回值: " + result);????????}????}} |
上面介紹的阻塞模式的代碼應(yīng)該很好理解:來(lái)一個(gè)新的連接,我們就新開(kāi)一個(gè)線(xiàn)程來(lái)處理這個(gè)連接,之后的操作全部由那個(gè)線(xiàn)程來(lái)完成。
那么,這個(gè)模式下的性能瓶頸在哪里呢?
非阻塞 IO
說(shuō)完了阻塞模式的使用及其缺點(diǎn)以后,我們這里就可以介紹非阻塞 IO 了。
非阻塞 IO 的核心在于使用一個(gè) Selector 來(lái)管理多個(gè)通道,可以是 SocketChannel,也可以是 ServerSocketChannel,將各個(gè)通道注冊(cè)到 Selector 上,指定監(jiān)聽(tīng)的事件。
之后可以只用一個(gè)線(xiàn)程來(lái)輪詢(xún)這個(gè) Selector,看看上面是否有通道是準(zhǔn)備好的,當(dāng)通道準(zhǔn)備好可讀或可寫(xiě),然后才去開(kāi)始真正的讀寫(xiě),這樣速度就很快了。我們就完全沒(méi)有必要給每個(gè)通道都起一個(gè)線(xiàn)程。
NIO 中 Selector 是對(duì)底層操作系統(tǒng)實(shí)現(xiàn)的一個(gè)抽象,管理通道狀態(tài)其實(shí)都是底層系統(tǒng)實(shí)現(xiàn)的,這里簡(jiǎn)單介紹下在不同系統(tǒng)下的實(shí)現(xiàn)。
select:上世紀(jì) 80 年代就實(shí)現(xiàn)了,它支持注冊(cè) FD_SETSIZE(1024) 個(gè) socket,在那個(gè)年代肯定是夠用的,不過(guò)現(xiàn)在嘛,肯定是不行了。
poll:1997 年,出現(xiàn)了 poll 作為 select 的替代者,最大的區(qū)別就是,poll 不再限制 socket 數(shù)量。
select 和 poll 都有一個(gè)共同的問(wèn)題,那就是它們都只會(huì)告訴你有幾個(gè)通道準(zhǔn)備好了,但是不會(huì)告訴你具體是哪幾個(gè)通道。所以,一旦知道有通道準(zhǔn)備好以后,自己還是需要進(jìn)行一次掃描,顯然這個(gè)不太好,通道少的時(shí)候還行,一旦通道的數(shù)量是幾十萬(wàn)個(gè)以上的時(shí)候,掃描一次的時(shí)間都很可觀(guān)了,時(shí)間復(fù)雜度 O(n)。所以,后來(lái)才催生了以下實(shí)現(xiàn)。
epoll:2002 年隨 Linux 內(nèi)核 2.5.44 發(fā)布,epoll 能直接返回具體的準(zhǔn)備好的通道,時(shí)間復(fù)雜度 O(1)。
除了 Linux 中的 epoll,2000 年 FreeBSD 出現(xiàn)了?Kqueue,還有就是,Solaris 中有?/dev/poll。
前面說(shuō)了那么多實(shí)現(xiàn),但是沒(méi)有出現(xiàn) Windows,Windows 平臺(tái)的非阻塞 IO 使用 select,我們也不必覺(jué)得 Windows 很落后,在 Windows 中 IOCP 提供的異步 IO 是比較強(qiáng)大的。
我們回到 Selector,畢竟 JVM 就是這么一個(gè)屏蔽底層實(shí)現(xiàn)的平臺(tái),我們面向 Selector 編程就可以了。
之前在介紹 Selector 的時(shí)候已經(jīng)了解過(guò)了它的基本用法,這邊來(lái)一個(gè)可運(yùn)行的實(shí)例代碼,大家不妨看看:
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 | public class SelectorServer {????public static void main(String[] args) throws IOException {????????Selector selector = Selector.open();????????ServerSocketChannel server = ServerSocketChannel.open();????????server.socket().bind(new InetSocketAddress(8080));????????// 將其注冊(cè)到 Selector 中,監(jiān)聽(tīng) OP_ACCEPT 事件????????server.configureBlocking(false);????????server.register(selector, SelectionKey.OP_ACCEPT);????????while (true) {????????????// 需要不斷地去調(diào)用 select() 方法獲取最新的準(zhǔn)備好的通道????????????int readyChannels = selector.select();????????????if (readyChannels == 0) {????????????????continue;????????????}????????????Set<SelectionKey> readyKeys = selector.selectedKeys();????????????// 遍歷????????????Iterator<SelectionKey> iterator = readyKeys.iterator();????????????while (iterator.hasNext()) {????????????????SelectionKey key = iterator.next();????????????????iterator.remove();????????????????if (key.isAcceptable()) {????????????????????// 有已經(jīng)接受的新的到服務(wù)端的連接????????????????????SocketChannel socketChannel = server.accept();????????????????????// 有新的連接并不代表這個(gè)通道就有數(shù)據(jù),????????????????????// 這里將這個(gè)新的 SocketChannel 注冊(cè)到 Selector,監(jiān)聽(tīng) OP_READ 事件,等待數(shù)據(jù)????????????????????socketChannel.configureBlocking(false);????????????????????socketChannel.register(selector, SelectionKey.OP_READ);????????????????} else if (key.isReadable()) {????????????????????// 有數(shù)據(jù)可讀????????????????????// 上面一個(gè) if 分支中注冊(cè)了監(jiān)聽(tīng) OP_READ 事件的 SocketChannel????????????????????SocketChannel socketChannel = (SocketChannel) key.channel();????????????????????ByteBuffer readBuffer = ByteBuffer.allocate(1024);????????????????????int num = socketChannel.read(readBuffer);????????????????????if (num > 0) {????????????????????????// 處理進(jìn)來(lái)的數(shù)據(jù)...????????????????????????System.out.println("收到數(shù)據(jù):" + new String(readBuffer.array()).trim());????????????????????????socketChannel.register(selector, SelectionKey.OP_WRITE);????????????????????} else if (num == -1) {????????????????????????// -1 代表連接已經(jīng)關(guān)閉????????????????????????socketChannel.close();????????????????????}????????????????}????????????????else if (key.isWritable()) {????????????????????// 通道可寫(xiě)????????????????????// 給用戶(hù)返回?cái)?shù)據(jù)的通道可以進(jìn)行寫(xiě)操作了????????????????????SocketChannel socketChannel = (SocketChannel) key.channel();????????????????????ByteBuffer buffer = ByteBuffer.wrap("返回給客戶(hù)端的數(shù)據(jù)...".getBytes());????????????????????socketChannel.write(buffer);????????????????????// 重新注冊(cè)這個(gè)通道,監(jiān)聽(tīng) OP_READ 事件,客戶(hù)端還可以繼續(xù)發(fā)送內(nèi)容過(guò)來(lái)????????????????????socketChannel.register(selector, SelectionKey.OP_READ);????????????????}????????????}????????}????}} |
至于客戶(hù)端,大家可以繼續(xù)使用上一節(jié)介紹阻塞模式時(shí)的客戶(hù)端進(jìn)行測(cè)試。
NIO.2 異步 IO
More New IO,或稱(chēng) NIO.2,隨 JDK 1.7 發(fā)布,包括了引入異步 IO 接口和 Paths 等文件訪(fǎng)問(wèn)接口。
異步這個(gè)詞,我想對(duì)于絕大多數(shù)開(kāi)發(fā)者來(lái)說(shuō)都很熟悉,很多場(chǎng)景下我們都會(huì)使用異步。
通常,我們會(huì)有一個(gè)線(xiàn)程池用于執(zhí)行異步任務(wù),提交任務(wù)的線(xiàn)程將任務(wù)提交到線(xiàn)程池就可以立馬返回,不必等到任務(wù)真正完成。如果想要知道任務(wù)的執(zhí)行結(jié)果,通常是通過(guò)傳遞一個(gè)回調(diào)函數(shù)的方式,任務(wù)結(jié)束后去調(diào)用這個(gè)函數(shù)。
同樣的原理,Java 中的異步 IO 也是一樣的,都是由一個(gè)線(xiàn)程池來(lái)負(fù)責(zé)執(zhí)行任務(wù),然后使用回調(diào)或自己去查詢(xún)結(jié)果。
大部分開(kāi)發(fā)者都知道為什么要這么設(shè)計(jì)了,這里再啰嗦一下。異步 IO 主要是為了控制線(xiàn)程數(shù)量,減少過(guò)多的線(xiàn)程帶來(lái)的內(nèi)存消耗和 CPU 在線(xiàn)程調(diào)度上的開(kāi)銷(xiāo)。
在 Unix/Linux 等系統(tǒng)中,JDK 使用了并發(fā)包中的線(xiàn)程池來(lái)管理任務(wù),具體可以查看 AsynchronousChannelGroup 的源碼。
在 Windows 操作系統(tǒng)中,提供了一個(gè)叫做?I/O Completion Ports?的方案,通常簡(jiǎn)稱(chēng)為?IOCP,操作系統(tǒng)負(fù)責(zé)管理線(xiàn)程池,其性能非常優(yōu)異,所以在 Windows 中 JDK 直接采用了 IOCP 的支持,使用系統(tǒng)支持,把更多的操作信息暴露給操作系統(tǒng),也使得操作系統(tǒng)能夠?qū)ξ覀兊?IO 進(jìn)行一定程度的優(yōu)化。
在 Linux 中其實(shí)也是有異步 IO 系統(tǒng)實(shí)現(xiàn)的,但是限制比較多,性能也一般,所以 JDK 采用了自建線(xiàn)程池的方式。
本文還是以實(shí)用為主,想要了解更多信息請(qǐng)自行查找其他資料,下面對(duì) Java 異步 IO 進(jìn)行實(shí)踐性的介紹。
總共有三個(gè)類(lèi)需要我們關(guān)注,分別是?AsynchronousSocketChannel,AsynchronousServerSocketChannel?和?AsynchronousFileChannel,只不過(guò)是在之前介紹的 FileChannel、SocketChannel 和 ServerSocketChannel 的類(lèi)名上加了個(gè)前綴?Asynchronous。
Java 異步 IO 提供了兩種使用方式,分別是返回 Future 實(shí)例和使用回調(diào)函數(shù)。
1、返回 Future 實(shí)例
返回 java.util.concurrent.Future 實(shí)例的方式我們應(yīng)該很熟悉,JDK 線(xiàn)程池就是這么使用的。Future 接口的幾個(gè)方法語(yǔ)義在這里也是通用的,這里先做簡(jiǎn)單介紹。
- future.isDone();
判斷操作是否已經(jīng)完成,包括了正常完成、異常拋出、取消
- future.cancel(true);
取消操作,方式是中斷。參數(shù) true 說(shuō)的是,即使這個(gè)任務(wù)正在執(zhí)行,也會(huì)進(jìn)行中斷。
- future.isCancelled();
是否被取消,只有在任務(wù)正常結(jié)束之前被取消,這個(gè)方法才會(huì)返回 true
- future.get();
這是我們的老朋友,獲取執(zhí)行結(jié)果,阻塞。
- future.get(10, TimeUnit.SECONDS);
如果上面的 get() 方法的阻塞你不滿(mǎn)意,那就設(shè)置個(gè)超時(shí)時(shí)間。
2、提供 CompletionHandler 回調(diào)函數(shù)
java.nio.channels.CompletionHandler 接口定義:
| 123456 | public interface CompletionHandler<V,A> {????void completed(V result, A attachment);????void failed(Throwable exc, A attachment);} |
注意,參數(shù)上有個(gè) attachment,雖然不常用,我們可以在各個(gè)支持的方法中傳遞這個(gè)參數(shù)值
| 123456789101112 | AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null);// accept 方法的第一個(gè)參數(shù)可以傳遞 attachmentlistener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {????public void completed(??????AsynchronousSocketChannel client, Object attachment) {??????????// ??????}????public void failed(Throwable exc, Object attachment) {??????????// ??????}}); |
AsynchronousFileChannel
網(wǎng)上關(guān)于 Non-Blocking IO 的介紹文章很多,但是 Asynchronous IO 的文章相對(duì)就少得多了,所以我這邊會(huì)多介紹一些相關(guān)內(nèi)容。
首先,我們就來(lái)關(guān)注異步的文件 IO,前面我們說(shuō)了,文件 IO 在所有的操作系統(tǒng)中都不支持非阻塞模式,但是我們可以對(duì)文件 IO 采用異步的方式來(lái)提高性能。
下面,我會(huì)介紹 AsynchronousFileChannel 里面的一些重要的接口,都很簡(jiǎn)單,讀者要是覺(jué)得無(wú)趣,直接滑到下一個(gè)標(biāo)題就可以了。
實(shí)例化:
| 1 | AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("/Users/hongjie/test.txt")); |
一旦實(shí)例化完成,我們就可以著手準(zhǔn)備將數(shù)據(jù)讀入到 Buffer 中:
| 12 | ByteBuffer buffer = ByteBuffer.allocate(1024);Future<Integer> result = channel.read(buffer, 0); |
異步文件通道的讀操作和寫(xiě)操作都需要提供一個(gè)文件的開(kāi)始位置,文件開(kāi)始位置為 0
除了使用返回 Future 實(shí)例的方式,也可以采用回調(diào)函數(shù)進(jìn)行操作,接口如下:
| 1234 | public abstract <A> void read(ByteBuffer dst,??????????????????????????????long position,??????????????????????????????A attachment,??????????????????????????????CompletionHandler<Integer,? super A> handler); |
順便也貼一下寫(xiě)操作的兩個(gè)版本的接口:
| 123456 | public abstract Future<Integer> write(ByteBuffer src, long position);public abstract <A> void write(ByteBuffer src,???????????????????????????????long position,???????????????????????????????A attachment,???????????????????????????????CompletionHandler<Integer,? super A> handler); |
我們可以看到,AIO 的讀寫(xiě)主要也還是與 Buffer 打交道,這個(gè)與 NIO 是一脈相承的。
另外,還提供了用于將內(nèi)存中的數(shù)據(jù)刷入到磁盤(pán)的方法:
| 1 | public abstract void force(boolean metaData) throws IOException; |
因?yàn)槲覀儗?duì)文件的寫(xiě)操作,操作系統(tǒng)并不會(huì)直接針對(duì)文件操作,系統(tǒng)會(huì)緩存,然后周期性地刷入到磁盤(pán)。如果希望將數(shù)據(jù)及時(shí)寫(xiě)入到磁盤(pán)中,以免斷電引發(fā)部分?jǐn)?shù)據(jù)丟失,可以調(diào)用此方法。參數(shù)如果設(shè)置為 true,意味著同時(shí)也將文件屬性信息更新到磁盤(pán)。
還有,還提供了對(duì)文件的鎖定功能,我們可以鎖定文件的部分?jǐn)?shù)據(jù),這樣可以進(jìn)行排他性的操作。
| 1 | public abstract Future<FileLock> lock(long position, long size, boolean shared); |
position 是要鎖定內(nèi)容的開(kāi)始位置,size 指示了要鎖定的區(qū)域大小,shared 指示需要的是共享鎖還是排他鎖
當(dāng)然,也可以使用回調(diào)函數(shù)的版本:
| 12345 | public abstract <A> void lock(long position,??????????????????????????????long size,??????????????????????????????boolean shared,??????????????????????????????A attachment,??????????????????????????????CompletionHandler<FileLock,? super A> handler); |
文件鎖定功能上還提供了 tryLock 方法,此方法會(huì)快速返回結(jié)果:
| 12 | public abstract FileLock tryLock(long position, long size, boolean shared)????throws IOException; |
這個(gè)方法很簡(jiǎn)單,就是嘗試去獲取鎖,如果該區(qū)域已被其他線(xiàn)程或其他應(yīng)用鎖住,那么立刻返回 null,否則返回 FileLock 對(duì)象。
AsynchronousFileChannel 操作大體上也就以上介紹的這些接口,還是比較簡(jiǎn)單的,這里就少一些廢話(huà)早點(diǎn)結(jié)束好了。
AsynchronousServerSocketChannel
這個(gè)類(lèi)對(duì)應(yīng)的是非阻塞 IO 的 ServerSocketChannel,大家可以類(lèi)比下使用方式。
我們就廢話(huà)少說(shuō),用代碼說(shuō)事吧:
| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657 | package com.javadoop.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.net.SocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousServerSocketChannel;import java.nio.channels.AsynchronousSocketChannel;import java.nio.channels.CompletionHandler;public class Server {????public static void main(String[] args) throws IOException {??????????// 實(shí)例化,并監(jiān)聽(tīng)端口????????AsynchronousServerSocketChannel server =????????????????AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080));????????// 自己定義一個(gè) Attachment 類(lèi),用于傳遞一些信息????????Attachment att = new Attachment();????????att.setServer(server);????????server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() {????????????@Override????????????public void completed(AsynchronousSocketChannel client, Attachment att) {????????????????try {????????????????????SocketAddress clientAddr = client.getRemoteAddress();????????????????????System.out.println("收到新的連接:" + clientAddr);????????????????????// 收到新的連接后,server 應(yīng)該重新調(diào)用 accept 方法等待新的連接進(jìn)來(lái)????????????????????att.getServer().accept(att, this);????????????????????Attachment newAtt = new Attachment();????????????????????newAtt.setServer(server);????????????????????newAtt.setClient(client);????????????????????newAtt.setReadMode(true);????????????????????newAtt.setBuffer(ByteBuffer.allocate(2048));????????????????????// 這里也可以繼續(xù)使用匿名實(shí)現(xiàn)類(lèi),不過(guò)代碼不好看,所以這里專(zhuān)門(mén)定義一個(gè)類(lèi)????????????????????client.read(newAtt.getBuffer(), newAtt, new ChannelHandler());????????????????} catch (IOException ex) {????????????????????ex.printStackTrace();????????????????}????????????}????????????@Override????????????public void failed(Throwable t, Attachment att) {????????????????System.out.println("accept failed");????????????}????????});????????// 為了防止 main 線(xiàn)程退出????????try {????????????Thread.currentThread().join();????????} catch (InterruptedException e) {????????}????}} |
看一下 ChannelHandler 類(lèi):
| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546 | package com.javadoop.aio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;public class ChannelHandler implements CompletionHandler<Integer, Attachment> {????@Override????public void completed(Integer result, Attachment att) {????????if (att.isReadMode()) {????????????// 讀取來(lái)自客戶(hù)端的數(shù)據(jù)????????????ByteBuffer buffer = att.getBuffer();????????????buffer.flip();????????????byte bytes[] = new byte[buffer.limit()];????????????buffer.get(bytes);????????????String msg = new String(buffer.array()).toString().trim();????????????System.out.println("收到來(lái)自客戶(hù)端的數(shù)據(jù): " + msg);????????????// 響應(yīng)客戶(hù)端請(qǐng)求,返回?cái)?shù)據(jù)????????????buffer.clear();????????????buffer.put("Response from server!".getBytes(Charset.forName("UTF-8")));????????????att.setReadMode(false);????????????buffer.flip();????????????// 寫(xiě)數(shù)據(jù)到客戶(hù)端也是異步????????????att.getClient().write(buffer, att, this);????????} else {????????????// 到這里,說(shuō)明往客戶(hù)端寫(xiě)數(shù)據(jù)也結(jié)束了,有以下兩種選擇:????????????// 1. 繼續(xù)等待客戶(hù)端發(fā)送新的數(shù)據(jù)過(guò)來(lái)//??????????? att.setReadMode(true);//??????????? att.getBuffer().clear();//??????????? att.getClient().read(att.getBuffer(), att, this);????????????// 2. 既然服務(wù)端已經(jīng)返回?cái)?shù)據(jù)給客戶(hù)端,斷開(kāi)這次的連接????????????try {????????????????att.getClient().close();????????????} catch (IOException e) {????????????}????????}????}????@Override????public void failed(Throwable t, Attachment att) {????????System.out.println("連接斷開(kāi)");????}} |
順便再貼一下自定義的 Attachment 類(lèi):
| 1234567 | public class Attachment {????private AsynchronousServerSocketChannel server;????private AsynchronousSocketChannel client;????private boolean isReadMode;????private ByteBuffer buffer;????// getter & setter} |
這樣,一個(gè)簡(jiǎn)單的服務(wù)端就寫(xiě)好了,接下來(lái)可以接收客戶(hù)端請(qǐng)求了。上面我們用的都是回調(diào)函數(shù)的方式,讀者要是感興趣,可以試試寫(xiě)個(gè)使用 Future 的。
AsynchronousSocketChannel
其實(shí),說(shuō)完上面的 AsynchronousServerSocketChannel,基本上讀者也就知道怎么使用 AsynchronousSocketChannel 了,和非阻塞 IO 基本類(lèi)似。
這邊做個(gè)簡(jiǎn)單演示,這樣讀者就可以配合之前介紹的 Server 進(jìn)行測(cè)試使用了。
| 12345678910111213141516171819202122232425262728293031323334 | package com.javadoop.aio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.AsynchronousSocketChannel;import java.nio.charset.Charset;import java.util.concurrent.ExecutionException;import java.util.concurrent.Future;public class Client {????public static void main(String[] args) throws Exception {????????AsynchronousSocketChannel client = AsynchronousSocketChannel.open();??????????// 來(lái)個(gè) Future 形式的????????Future<?> future = client.connect(new InetSocketAddress(8080));????????// 阻塞一下,等待連接成功????????future.get();????????Attachment att = new Attachment();????????att.setClient(client);????????att.setReadMode(false);????????att.setBuffer(ByteBuffer.allocate(2048));????????byte[] data = "I am obot!".getBytes();????????att.getBuffer().put(data);????????att.getBuffer().flip();????????// 異步發(fā)送數(shù)據(jù)到服務(wù)端????????client.write(att.getBuffer(), att, new ClientChannelHandler());????????// 這里休息一下再退出,給出足夠的時(shí)間處理數(shù)據(jù)????????Thread.sleep(2000);????}} |
往里面看下 ClientChannelHandler 類(lèi):
| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647 | package com.javadoop.aio;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.CompletionHandler;import java.nio.charset.Charset;public class ClientChannelHandler implements CompletionHandler<Integer, Attachment> {????@Override????public void completed(Integer result, Attachment att) {????????ByteBuffer buffer = att.getBuffer();????????if (att.isReadMode()) {????????????// 讀取來(lái)自服務(wù)端的數(shù)據(jù)????????????buffer.flip();????????????byte[] bytes = new byte[buffer.limit()];????????????buffer.get(bytes);????????????String msg = new String(bytes, Charset.forName("UTF-8"));????????????System.out.println("收到來(lái)自服務(wù)端的響應(yīng)數(shù)據(jù): " + msg);????????????// 接下來(lái),有以下兩種選擇:????????????// 1. 向服務(wù)端發(fā)送新的數(shù)據(jù)//??????????? att.setReadMode(false);//??????????? buffer.clear();//??????????? String newMsg = "new message from client";//??????????? byte[] data = newMsg.getBytes(Charset.forName("UTF-8"));//??????????? buffer.put(data);//??????????? buffer.flip();//??????????? att.getClient().write(buffer, att, this);????????????// 2. 關(guān)閉連接????????????try {????????????????att.getClient().close();????????????} catch (IOException e) {????????????}????????} else {????????????// 寫(xiě)操作完成后,會(huì)進(jìn)到這里????????????att.setReadMode(true);????????????buffer.clear();????????????att.getClient().read(buffer, att, this);????????}????}????@Override????public void failed(Throwable t, Attachment att) {????????System.out.println("服務(wù)器無(wú)響應(yīng)");????}} |
以上代碼都是可以運(yùn)行調(diào)試的,如果讀者碰到問(wèn)題,請(qǐng)?jiān)谠u(píng)論區(qū)留言。
Asynchronous Channel Groups
為了知識(shí)的完整性,有必要對(duì) group 進(jìn)行介紹,其實(shí)也就是介紹 AsynchronousChannelGroup 這個(gè)類(lèi)。之前我們說(shuō)過(guò),異步 IO 一定存在一個(gè)線(xiàn)程池,這個(gè)線(xiàn)程池負(fù)責(zé)接收任務(wù)、處理 IO 事件、回調(diào)等。這個(gè)線(xiàn)程池就在 group 內(nèi)部,group 一旦關(guān)閉,那么相應(yīng)的線(xiàn)程池就會(huì)關(guān)閉。
AsynchronousServerSocketChannels 和 AsynchronousSocketChannels 是屬于 group 的,當(dāng)我們調(diào)用 AsynchronousServerSocketChannel 或 AsynchronousSocketChannel 的 open() 方法的時(shí)候,相應(yīng)的 channel 就屬于默認(rèn)的 group,這個(gè) group 由 JVM 自動(dòng)構(gòu)造并管理。
如果我們想要配置這個(gè)默認(rèn)的 group,可以在 JVM 啟動(dòng)參數(shù)中指定以下系統(tǒng)變量:
- java.nio.channels.DefaultThreadPool.threadFactory
此系統(tǒng)變量用于設(shè)置 ThreadFactory,它應(yīng)該是 java.util.concurrent.ThreadFactory 實(shí)現(xiàn)類(lèi)的全限定類(lèi)名。一旦我們指定了這個(gè) ThreadFactory 以后,group 中的線(xiàn)程就會(huì)使用該類(lèi)產(chǎn)生。
- java.nio.channels.DefaultThreadPool.initialSize
此系統(tǒng)變量也很好理解,用于設(shè)置線(xiàn)程池的初始大小。
可能你會(huì)想要使用自己定義的 group,這樣可以對(duì)其中的線(xiàn)程進(jìn)行更多的控制,使用以下幾個(gè)方法即可:
- AsynchronousChannelGroup.withCachedThreadPool(ExecutorService executor, int initialSize)
- AsynchronousChannelGroup.withFixedThreadPool(int nThreads, ThreadFactory threadFactory)
- AsynchronousChannelGroup.withThreadPool(ExecutorService executor)
熟悉線(xiàn)程池的讀者對(duì)這些方法應(yīng)該很好理解,它們都是 AsynchronousChannelGroup 中的靜態(tài)方法。
至于 group 的使用就很簡(jiǎn)單了,代碼一看就懂:
| 1234 | AsynchronousChannelGroup group = AsynchronousChannelGroup????????.withFixedThreadPool(10, Executors.defaultThreadFactory());AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group);AsynchronousSocketChannel client = AsynchronousSocketChannel.open(group); |
AsynchronousFileChannels 不屬于 group。但是它們也是關(guān)聯(lián)到一個(gè)線(xiàn)程池的,如果不指定,會(huì)使用系統(tǒng)默認(rèn)的線(xiàn)程池,如果想要使用指定的線(xiàn)程池,可以在實(shí)例化的時(shí)候使用以下方法:
| 123456 | public static AsynchronousFileChannel open(Path file,???????????????????????????????????????????Set<? extends OpenOption> options,???????????????????????????????????????????ExecutorService executor,???????????????????????????????????????????FileAttribute<?>... attrs) {????...} |
到這里,異步 IO 就算介紹完成了。
小結(jié)
我想,本文應(yīng)該是說(shuō)清楚了非阻塞 IO 和異步 IO 了,對(duì)于異步 IO,由于網(wǎng)上的資料比較少,所以不免篇幅多了些。
我們也要知道,看懂了這些,確實(shí)可以學(xué)到一些東西,多了解一些知識(shí),但是我們還是很少在工作中將這些知識(shí)變成工程代碼。一般而言,我們需要在網(wǎng)絡(luò)應(yīng)用中使用 NIO 或 AIO 來(lái)提升性能,但是,在工程上,絕不是了解了一些概念,知道了一些接口就可以的,需要處理的細(xì)節(jié)還非常多。
這也是為什么 Netty/Mina 如此盛行的原因,因?yàn)樗鼈儙椭庋b好了很多細(xì)節(jié),提供給我們用戶(hù)友好的接口,后面有時(shí)間我也會(huì)對(duì) Netty 進(jìn)行介紹。
總結(jié)
以上是生活随笔為你收集整理的Java 非阻塞 IO 和异步 IO的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 关于 NIO 你不得不知道的一些“地雷”
- 下一篇: cad2014安装配置安装点不上?