Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式
本文介紹了Java中的四種I/O模型,同步阻塞,同步非阻塞,多路復(fù)用,異步阻塞。同時(shí)將NIO和BIO進(jìn)行了對(duì)比,并詳細(xì)分析了基于NIO的Reactor模式,包括經(jīng)典單線程模型以及多線程模式和多Reactor模式。
原創(chuàng)文章,轉(zhuǎn)載請(qǐng)務(wù)必將下面這段話置于文章開頭處(保留超鏈接)。
本文轉(zhuǎn)發(fā)自技術(shù)世界,原文鏈接 http://www.jasongj.com/java/nio_reactor/
Java I/O模型
同步 vs. 異步
同步I/O 每個(gè)請(qǐng)求必須逐個(gè)地被處理,一個(gè)請(qǐng)求的處理會(huì)導(dǎo)致整個(gè)流程的暫時(shí)等待,這些事件無法并發(fā)地執(zhí)行。用戶線程發(fā)起I/O請(qǐng)求后需要等待或者輪詢內(nèi)核I/O操作完成后才能繼續(xù)執(zhí)行。
異步I/O 多個(gè)請(qǐng)求可以并發(fā)地執(zhí)行,一個(gè)請(qǐng)求或者任務(wù)的執(zhí)行不會(huì)導(dǎo)致整個(gè)流程的暫時(shí)等待。用戶線程發(fā)起I/O請(qǐng)求后仍然繼續(xù)執(zhí)行,當(dāng)內(nèi)核I/O操作完成后會(huì)通知用戶線程,或者調(diào)用用戶線程注冊(cè)的回調(diào)函數(shù)。
阻塞 vs. 非阻塞
阻塞 某個(gè)請(qǐng)求發(fā)出后,由于該請(qǐng)求操作需要的條件不滿足,請(qǐng)求操作一直阻塞,不會(huì)返回,直到條件滿足。
非阻塞 請(qǐng)求發(fā)出后,若該請(qǐng)求需要的條件不滿足,則立即返回一個(gè)標(biāo)志信息告知條件不滿足,而不會(huì)一直等待。一般需要通過循環(huán)判斷請(qǐng)求條件是否滿足來獲取請(qǐng)求結(jié)果。
需要注意的是,阻塞并不等價(jià)于同步,而非阻塞并非等價(jià)于異步。事實(shí)上這兩組概念描述的是I/O模型中的兩個(gè)不同維度。
同步和異步著重點(diǎn)在于多個(gè)任務(wù)執(zhí)行過程中,后發(fā)起的任務(wù)是否必須等先發(fā)起的任務(wù)完成之后再進(jìn)行。而不管先發(fā)起的任務(wù)請(qǐng)求是阻塞等待完成,還是立即返回通過循環(huán)等待請(qǐng)求成功。
而阻塞和非阻塞重點(diǎn)在于請(qǐng)求的方法是否立即返回(或者說是否在條件不滿足時(shí)被阻塞)。
Unix下五種I/O模型
Unix 下共有五種 I/O 模型:
- 阻塞 I/O
- 非阻塞 I/O
- I/O 多路復(fù)用(select和poll)
- 信號(hào)驅(qū)動(dòng) I/O(SIGIO)
- 異步 I/O(Posix.1的aio_系列函數(shù))
阻塞I/O
如上文所述,阻塞I/O下請(qǐng)求無法立即完成則保持阻塞。阻塞I/O分為如下兩個(gè)階段。
- 階段1:等待數(shù)據(jù)就緒。網(wǎng)絡(luò) I/O 的情況就是等待遠(yuǎn)端數(shù)據(jù)陸續(xù)抵達(dá);磁盤I/O的情況就是等待磁盤數(shù)據(jù)從磁盤上讀取到內(nèi)核態(tài)內(nèi)存中。
- 階段2:數(shù)據(jù)拷貝。出于系統(tǒng)安全,用戶態(tài)的程序沒有權(quán)限直接讀取內(nèi)核態(tài)內(nèi)存,因此內(nèi)核負(fù)責(zé)把內(nèi)核態(tài)內(nèi)存中的數(shù)據(jù)拷貝一份到用戶態(tài)內(nèi)存中。
非阻塞I/O
非阻塞I/O請(qǐng)求包含如下三個(gè)階段
- socket設(shè)置為 NONBLOCK(非阻塞)就是告訴內(nèi)核,當(dāng)所請(qǐng)求的I/O操作無法完成時(shí),不要將線程睡眠,而是返回一個(gè)錯(cuò)誤碼(EWOULDBLOCK) ,這樣請(qǐng)求就不會(huì)阻塞。
- I/O操作函數(shù)將不斷的測(cè)試數(shù)據(jù)是否已經(jīng)準(zhǔn)備好,如果沒有準(zhǔn)備好,繼續(xù)測(cè)試,直到數(shù)據(jù)準(zhǔn)備好為止。整個(gè)I/O 請(qǐng)求的過程中,雖然用戶線程每次發(fā)起I/O請(qǐng)求后可以立即返回,但是為了等到數(shù)據(jù),仍需要不斷地輪詢、重復(fù)請(qǐng)求,消耗了大量的 CPU 的資源。
- 數(shù)據(jù)準(zhǔn)備好了,從內(nèi)核拷貝到用戶空間。
一般很少直接使用這種模型,而是在其他I/O模型中使用非阻塞I/O 這一特性。這種方式對(duì)單個(gè)I/O 請(qǐng)求意義不大,但給I/O多路復(fù)用提供了條件。
I/O多路復(fù)用(異步阻塞 I/O)
I/O多路復(fù)用會(huì)用到select或者poll函數(shù),這兩個(gè)函數(shù)也會(huì)使線程阻塞,但是和阻塞I/O所不同的是,這兩個(gè)函數(shù)可以同時(shí)阻塞多個(gè)I/O操作。而且可以同時(shí)對(duì)多個(gè)讀操作,多個(gè)寫操作的I/O函數(shù)進(jìn)行檢測(cè),直到有數(shù)據(jù)可讀或可寫時(shí),才真正調(diào)用I/O操作函數(shù)。
從流程上來看,使用select函數(shù)進(jìn)行I/O請(qǐng)求和同步阻塞模型沒有太大的區(qū)別,甚至還多了添加監(jiān)視Channel,以及調(diào)用select函數(shù)的額外操作,增加了額外工作。但是,使用 select以后最大的優(yōu)勢(shì)是用戶可以在一個(gè)線程內(nèi)同時(shí)處理多個(gè)Channel的I/O請(qǐng)求。用戶可以注冊(cè)多個(gè)Channel,然后不斷地調(diào)用select讀取被激活的Channel,即可達(dá)到在同一個(gè)線程內(nèi)同時(shí)處理多個(gè)I/O請(qǐng)求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達(dá)到這個(gè)目的。
調(diào)用select/poll該方法由一個(gè)用戶態(tài)線程負(fù)責(zé)輪詢多個(gè)Channel,直到某個(gè)階段1的數(shù)據(jù)就緒,再通知實(shí)際的用戶線程執(zhí)行階段2的拷貝。 通過一個(gè)專職的用戶態(tài)線程執(zhí)行非阻塞I/O輪詢,模擬實(shí)現(xiàn)了階段一的異步化。
信號(hào)驅(qū)動(dòng)I/O(SIGIO)
首先我們?cè)试Ssocket進(jìn)行信號(hào)驅(qū)動(dòng)I/O,并安裝一個(gè)信號(hào)處理函數(shù),線程繼續(xù)運(yùn)行并不阻塞。當(dāng)數(shù)據(jù)準(zhǔn)備好時(shí),線程會(huì)收到一個(gè)SIGIO 信號(hào),可以在信號(hào)處理函數(shù)中調(diào)用I/O操作函數(shù)處理數(shù)據(jù)。
異步I/O
調(diào)用aio_read 函數(shù),告訴內(nèi)核描述字,緩沖區(qū)指針,緩沖區(qū)大小,文件偏移以及通知的方式,然后立即返回。當(dāng)內(nèi)核將數(shù)據(jù)拷貝到緩沖區(qū)后,再通知應(yīng)用程序。所以異步I/O模式下,階段1和階段2全部由內(nèi)核完成,完成不需要用戶線程的參與。
幾種I/O模型對(duì)比
除異步I/O外,其它四種模型的階段2基本相同,都是從內(nèi)核態(tài)拷貝數(shù)據(jù)到用戶態(tài)。區(qū)別在于階段1不同。前四種都屬于同步I/O。
Java中四種I/O模型
上一章所述Unix中的五種I/O模型,除信號(hào)驅(qū)動(dòng)I/O外,Java對(duì)其它四種I/O模型都有所支持。其中Java最早提供的blocking I/O即是阻塞I/O,而NIO即是非阻塞I/O,同時(shí)通過NIO實(shí)現(xiàn)的Reactor模式即是I/O復(fù)用模型的實(shí)現(xiàn),通過AIO實(shí)現(xiàn)的Proactor模式即是異步I/O模型的實(shí)現(xiàn)。
從IO到NIO
面向流 vs. 面向緩沖
Java IO是面向流的,每次從流(InputStream/OutputStream)中讀一個(gè)或多個(gè)字節(jié),直到讀取完所有字節(jié),它們沒有被緩存在任何地方。另外,它不能前后移動(dòng)流中的數(shù)據(jù),如需前后移動(dòng)處理,需要先將其緩存至一個(gè)緩沖區(qū)。
Java NIO面向緩沖,數(shù)據(jù)會(huì)被讀取到一個(gè)緩沖區(qū),需要時(shí)可以在緩沖區(qū)中前后移動(dòng)處理,這增加了處理過程的靈活性。但與此同時(shí)在處理緩沖區(qū)前需要檢查該緩沖區(qū)中是否包含有所需要處理的數(shù)據(jù),并需要確保更多數(shù)據(jù)讀入緩沖區(qū)時(shí),不會(huì)覆蓋緩沖區(qū)內(nèi)尚未處理的數(shù)據(jù)。
阻塞 vs. 非阻塞
Java IO的各種流是阻塞的。當(dāng)某個(gè)線程調(diào)用read()或write()方法時(shí),該線程被阻塞,直到有數(shù)據(jù)被讀取到或者數(shù)據(jù)完全寫入。阻塞期間該線程無法處理任何其它事情。
Java NIO為非阻塞模式。讀寫請(qǐng)求并不會(huì)阻塞當(dāng)前線程,在數(shù)據(jù)可讀/寫前當(dāng)前線程可以繼續(xù)做其它事情,所以一個(gè)單獨(dú)的線程可以管理多個(gè)輸入和輸出通道。
選擇器(Selector)
Java NIO的選擇器允許一個(gè)單獨(dú)的線程同時(shí)監(jiān)視多個(gè)通道,可以注冊(cè)多個(gè)通道到同一個(gè)選擇器上,然后使用一個(gè)單獨(dú)的線程來“選擇”已經(jīng)就緒的通道。這種“選擇”機(jī)制為一個(gè)單獨(dú)線程管理多個(gè)通道提供了可能。
零拷貝
Java NIO中提供的FileChannel擁有transferTo和transferFrom兩個(gè)方法,可直接把FileChannel中的數(shù)據(jù)拷貝到另外一個(gè)Channel,或者直接把另外一個(gè)Channel中的數(shù)據(jù)拷貝到FileChannel。該接口常被用于高效的網(wǎng)絡(luò)/文件的數(shù)據(jù)傳輸和大文件拷貝。在操作系統(tǒng)支持的情況下,通過該方法傳輸數(shù)據(jù)并不需要將源數(shù)據(jù)從內(nèi)核態(tài)拷貝到用戶態(tài),再從用戶態(tài)拷貝到目標(biāo)通道的內(nèi)核態(tài),同時(shí)也避免了兩次用戶態(tài)和內(nèi)核態(tài)間的上下文切換,也即使用了“零拷貝”,所以其性能一般高于Java IO中提供的方法。
使用FileChannel的零拷貝將本地文件內(nèi)容傳輸?shù)骄W(wǎng)絡(luò)的示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
? public class NIOClient { public static void main(String[] args) throws IOException, InterruptedException { SocketChannel socketChannel = SocketChannel.open(); InetSocketAddress address = new InetSocketAddress(1234); socketChannel.connect(address); RandomAccessFile file = new RandomAccessFile( NIOClient.class.getClassLoader().getResource("test.txt").getFile(), "rw"); FileChannel channel = file.getChannel(); channel.transferTo(0, channel.size(), socketChannel); channel.close(); file.close(); socketChannel.close(); } } |
?
阻塞I/O下的服務(wù)器實(shí)現(xiàn)
單線程逐個(gè)處理所有請(qǐng)求
使用阻塞I/O的服務(wù)器,一般使用循環(huán),逐個(gè)接受連接請(qǐng)求并讀取數(shù)據(jù),然后處理下一個(gè)請(qǐng)求。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
? public class IOServer { private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class); public static void main(String[] args) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } } catch(IOException ex) { IOUtils.closeQuietly(serverSocket); LOGGER.error("Read message failed", ex); } } } |
?
為每個(gè)請(qǐng)求創(chuàng)建一個(gè)線程
上例使用單線程逐個(gè)處理所有請(qǐng)求,同一時(shí)間只能處理一個(gè)請(qǐng)求,等待I/O的過程浪費(fèi)大量CPU資源,同時(shí)無法充分使用多CPU的優(yōu)勢(shì)。下面是使用多線程對(duì)阻塞I/O模型的改進(jìn)。一個(gè)連接建立成功后,創(chuàng)建一個(gè)單獨(dú)的線程處理其I/O操作。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
? public class IOServerMultiThread { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class); public static void main(String[] args) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); new Thread( () -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }).start(); } } catch(IOException ex) { IOUtils.closeQuietly(serverSocket); LOGGER.error("Accept connection failed", ex); } } } |
?
使用線程池處理請(qǐng)求
為了防止連接請(qǐng)求過多,導(dǎo)致服務(wù)器創(chuàng)建的線程數(shù)過多,造成過多線程上下文切換的開銷。可以通過線程池來限制創(chuàng)建的線程數(shù),如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class IOServerThreadPool { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); executorService.submit(() -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream))); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }); } } catch(IOException ex) { try { serverSocket.close(); } catch (IOException e) { } LOGGER.error("Accept connection failed", ex); } } } |
?
Reactor模式
精典Reactor模式
精典的Reactor模式示意圖如下所示。
在Reactor模式中,包含如下角色
- Reactor?將I/O事件發(fā)派給對(duì)應(yīng)的Handler
- Acceptor?處理客戶端連接請(qǐng)求
- Handlers?執(zhí)行非阻塞讀/寫
最簡(jiǎn)單的Reactor模式實(shí)現(xiàn)代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); socketChannel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = socketChannel.read(buffer); if (count <= 0) { socketChannel.close(); key.cancel(); LOGGER.info("Received invalide data, close the connection"); continue; } LOGGER.info("Received message {}", new String(buffer.array())); } keys.remove(key); } } } } |
?
為了方便閱讀,上示代碼將Reactor模式中的所有角色放在了一個(gè)類中。
從上示代碼中可以看到,多個(gè)Channel可以注冊(cè)到同一個(gè)Selector對(duì)象上,實(shí)現(xiàn)了一個(gè)線程同時(shí)監(jiān)控多個(gè)請(qǐng)求狀態(tài)(Channel)。同時(shí)注冊(cè)時(shí)需要指定它所關(guān)注的事件,例如上示代碼中socketServerChannel對(duì)象只注冊(cè)了OP_ACCEPT事件,而socketChannel對(duì)象只注冊(cè)了OP_READ事件。
selector.select()是阻塞的,當(dāng)有至少一個(gè)通道可用時(shí)該方法返回可用通道個(gè)數(shù)。同時(shí)該方法只捕獲Channel注冊(cè)時(shí)指定的所關(guān)注的事件。
多工作線程Reactor模式
經(jīng)典Reactor模式中,盡管一個(gè)線程可同時(shí)監(jiān)控多個(gè)請(qǐng)求(Channel),但是所有讀/寫請(qǐng)求以及對(duì)新連接請(qǐng)求的處理都在同一個(gè)線程中處理,無法充分利用多CPU的優(yōu)勢(shì),同時(shí)讀/寫操作也會(huì)阻塞對(duì)新連接請(qǐng)求的處理。因此可以引入多線程,并行處理多個(gè)讀/寫操作,如下圖所示。
多線程Reactor模式示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { if(selector.selectNow() < 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while(iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); SelectionKey readKey = socketChannel.register(selector, SelectionKey.OP_READ); readKey.attach(new Processor()); } else if (key.isReadable()) { Processor processor = (Processor) key.attachment(); processor.process(key); } } } } } |
?
從上示代碼中可以看到,注冊(cè)完SocketChannel的OP_READ事件后,可以對(duì)相應(yīng)的SelectionKey attach一個(gè)對(duì)象(本例中attach了一個(gè)Processor對(duì)象,該對(duì)象處理讀請(qǐng)求),并且在獲取到可讀事件后,可以取出該對(duì)象。
注:attach對(duì)象及取出該對(duì)象是NIO提供的一種操作,但該操作并非Reactor模式的必要操作,本文使用它,只是為了方便演示NIO的接口。
具體的讀請(qǐng)求處理在如下所示的Processor類中。該類中設(shè)置了一個(gè)靜態(tài)的線程池處理所有請(qǐng)求。而process方法并不直接處理I/O請(qǐng)求,而是把該I/O操作提交給上述線程池去處理,這樣就充分利用了多線程的優(yōu)勢(shì),同時(shí)將對(duì)新連接的處理和讀/寫操作的處理放在了不同的線程中,讀/寫操作不再阻塞對(duì)新連接請(qǐng)求的處理。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
? public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(16); public void process(SelectionKey selectionKey) { service.submit(() -> { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); selectionKey.cancel(); LOGGER.info("{}\t Read ended", socketChannel); return null; } else if(count == 0) { return null; } LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); return null; }); } } |
多Reactor
Netty中使用的Reactor模式,引入了多Reactor,也即一個(gè)主Reactor負(fù)責(zé)監(jiān)控所有的連接請(qǐng)求,多個(gè)子Reactor負(fù)責(zé)監(jiān)控并處理讀/寫請(qǐng)求,減輕了主Reactor的壓力,降低了主Reactor壓力太大而造成的延遲。
并且每個(gè)子Reactor分別屬于一個(gè)獨(dú)立的線程,每個(gè)成功連接后的Channel的所有操作由同一個(gè)線程處理。這樣保證了同一請(qǐng)求的所有狀態(tài)和上下文在同一個(gè)線程中,避免了不必要的上下文切換,同時(shí)也方便了監(jiān)控請(qǐng)求響應(yīng)狀態(tài)。
多Reactor模式示意圖如下所示。
多Reactor示例代碼如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
? public class NIOServer { private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class); public static void main(String[] args) throws IOException { Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(1234)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); int coreNum = Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0; i < processors.length; i++) { processors[i] = new Processor(); } int index = 0; while (selector.select() > 0) { Set<SelectionKey> keys = selector.selectedKeys(); for (SelectionKey key : keys) { keys.remove(key); if (key.isAcceptable()) { ServerSocketChannel acceptServerSocketChannel = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = acceptServerSocketChannel.accept(); socketChannel.configureBlocking(false); LOGGER.info("Accept request from {}", socketChannel.getRemoteAddress()); Processor processor = processors[(int) ((index++) % coreNum)]; processor.addChannel(socketChannel); processor.wakeup(); } } } } } |
?
如上代碼所示,本文設(shè)置的子Reactor個(gè)數(shù)是當(dāng)前機(jī)器可用核數(shù)的兩倍(與Netty默認(rèn)的子Reactor個(gè)數(shù)一致)。對(duì)于每個(gè)成功連接的SocketChannel,通過round robin的方式交給不同的子Reactor。
子Reactor對(duì)SocketChannel的處理如下所示。
|
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
? public class Processor { private static final Logger LOGGER = LoggerFactory.getLogger(Processor.class); private static final ExecutorService service = Executors.newFixedThreadPool(2 * Runtime.getRuntime().availableProcessors()); private Selector selector; public Processor() throws IOException { this.selector = SelectorProvider.provider().openSelector(); start(); } public void addChannel(SocketChannel socketChannel) throws ClosedChannelException { socketChannel.register(this.selector, SelectionKey.OP_READ); } public void wakeup() { this.selector.wakeup(); } public void start() { service.submit(() -> { while (true) { if (selector.select(500) <= 0) { continue; } Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); if (key.isReadable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); SocketChannel socketChannel = (SocketChannel) key.channel(); int count = socketChannel.read(buffer); if (count < 0) { socketChannel.close(); key.cancel(); LOGGER.info("{}\t Read ended", socketChannel); continue; } else if (count == 0) { LOGGER.info("{}\t Message size is 0", socketChannel); continue; } else { LOGGER.info("{}\t Read message {}", socketChannel, new String(buffer.array())); } } } } }); } } |
?
在Processor中,同樣創(chuàng)建了一個(gè)靜態(tài)的線程池,且線程池的大小為機(jī)器核數(shù)的兩倍。每個(gè)Processor實(shí)例均包含一個(gè)Selector實(shí)例。同時(shí)每次獲取Processor實(shí)例時(shí)均提交一個(gè)任務(wù)到該線程池,并且該任務(wù)正常情況下一直循環(huán)處理,不會(huì)停止。而提交給該P(yáng)rocessor的SocketChannel通過在其Selector注冊(cè)事件,加入到相應(yīng)的任務(wù)中。由此實(shí)現(xiàn)了每個(gè)子Reactor包含一個(gè)Selector對(duì)象,并由一個(gè)獨(dú)立的線程處理。
Java進(jìn)階系列
- Java進(jìn)階(一)Annotation(注解)
- Java進(jìn)階(二)當(dāng)我們說線程安全時(shí),到底在說什么
- Java進(jìn)階(三)多線程開發(fā)關(guān)鍵技術(shù)
- Java進(jìn)階(四)線程間通信方式對(duì)比
- Java進(jìn)階(五)NIO和Reactor模式進(jìn)階
- Java進(jìn)階(六)從ConcurrentHashMap的演進(jìn)看Java多線程核心技術(shù)
from:?http://www.jasongj.com/java/nio_reactor/
總結(jié)
以上是生活随笔為你收集整理的Java进阶(五)Java I/O模型从BIO到NIO和Reactor模式的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我所理解的Java NIO
- 下一篇: 桌面程序开发入门(WinForm wit