选择器的并发性
4.3.4 并發(fā)性
選擇器對(duì)象是線程安全的,但它們包含的鍵集合不是。通過keys( )和selectKeys( )返回的鍵的集合是Selector對(duì)象內(nèi)部的私有的Set對(duì)象集合的直接引用。這些集合可能在任意時(shí)間被改變。已注冊(cè)的鍵的集合是只讀的。如果您試圖修改它,那么您得到的獎(jiǎng)品將是一個(gè)java.lang.UnsupportedOperationException,但是當(dāng)您在觀察它們的時(shí)候,它們可能發(fā)生了改變的話,您仍然會(huì)遇到麻煩。Iterator對(duì)象是快速失敗的(fail-fast):如果底層的Set被改變了,它們將會(huì)拋出java.util.ConcurrentModificationException,因此如果您期望在多個(gè)線程間共享選擇器和/或鍵,請(qǐng)對(duì)此做好準(zhǔn)備。您可以直接修改選擇鍵,但請(qǐng)注意您這么做時(shí)可能會(huì)徹底破壞另一個(gè)線程的Iterator。
如果在多個(gè)線程并發(fā)地訪問一個(gè)選擇器的鍵的集合的時(shí)候存在任何問題,您可以采取一些步驟來合理地同步訪問。在執(zhí)行選擇操作時(shí),選擇器在Selector對(duì)象上進(jìn)行同步,然后是已注冊(cè)的鍵的集合,最后是已選擇的鍵的集合,按照這樣的順序。已取消的鍵的集合也在選擇過程的的第1步和第3步之間保持同步(當(dāng)與已取消的鍵的集合相關(guān)的通道被注銷時(shí))
在多線程的場(chǎng)景中,如果您需要對(duì)任何一個(gè)鍵的集合進(jìn)行更改,不管是直接更改還是其他操作帶來的副作用,您都需要首先以相同的順序,在同一對(duì)象上進(jìn)行同步。鎖的過程是非常重要的。如果競爭的線程沒有以相同的順序請(qǐng)求鎖,就將會(huì)有死鎖的潛在隱患。如果您可以確保否其他線程不會(huì)同時(shí)訪問選擇器,那么就不必要進(jìn)行同步了。
Selector類的close( )方法與slect( )方法的同步方式是一樣的,因此也有一直阻塞的可能性。在選擇過程還在進(jìn)行的過程中,所有對(duì)close( )的調(diào)用都會(huì)被阻塞,直到選擇過程結(jié)束,或者執(zhí)行選擇的線程進(jìn)入睡眠。在后面的情況下,執(zhí)行選擇的線程將會(huì)在執(zhí)行關(guān)閉的線程獲得鎖是立即被喚醒,并關(guān)閉選擇器(參見4.3.2小節(jié))。
4.4 異步可關(guān)閉性
任何時(shí)候都有可能關(guān)閉一個(gè)通道或者取消一個(gè)選擇鍵。除非您采取步驟進(jìn)行同步,否則鍵的狀態(tài)及相關(guān)的通道將發(fā)生意料之外的改變。一個(gè)特定的鍵的集合中的一個(gè)鍵的存在并不保證鍵仍然是有效的,或者它相關(guān)的通道仍然是打開的。
關(guān)閉通道的過程不應(yīng)該是一個(gè)耗時(shí)的操作。NIO的設(shè)計(jì)者們特別想要阻止這樣的可能性:一個(gè)線程在關(guān)閉一個(gè)處于選擇操作中的通道時(shí),被阻塞于無限期的等待。當(dāng)一個(gè)通道關(guān)閉時(shí),它相關(guān)的鍵也就都被取消了。這并不會(huì)影響正在進(jìn)行的select( ),但這意味著在您調(diào)用select( )之前仍然是有效的鍵,在返回時(shí)可能會(huì)變?yōu)闊o效。您總是可以使用由選擇器的selectKeys( )方法返回的已選擇的鍵的集合:請(qǐng)不要自己維護(hù)鍵的集合。理解3.4.5小節(jié)描述的選擇過程,對(duì)于避免遇到問題而言是非常重要的。
您可以參考4.3.2小節(jié),以詳細(xì)了解一個(gè)在select( )中阻塞的線程是如何被喚醒的。如果您試圖使用一個(gè)已經(jīng)失效的鍵,大多數(shù)方法將拋出CancelledKeyException。但是,您可以安全地從從已取消的鍵中獲取通道的句柄。如果通道已經(jīng)關(guān)閉時(shí),仍然試圖使用它的話,在大多數(shù)情況下將引發(fā)ClosedChannelException
?
4.5 選擇過程的可擴(kuò)展性
我多次提到選擇器可以簡化用單線程同時(shí)管理多個(gè)可選擇通道的實(shí)現(xiàn)。使用一個(gè)線程來為多個(gè)通道提供服務(wù),通過消除管理各個(gè)線程的額外開銷,可能會(huì)降低復(fù)雜性并可能大幅提升性能。但只使用一個(gè)線程來服務(wù)所有可選擇的通道是否是一個(gè)好主意呢?這要看情況。
對(duì)單CPU的系統(tǒng)而言這可能是一個(gè)好主意,因?yàn)樵谌魏吻闆r下都只有一個(gè)線程能夠運(yùn)行。通過消除在線程之間進(jìn)行上下文切換帶來的額外開銷,總吞吐量可以得到提高。但對(duì)于一個(gè)多CPU的系統(tǒng)呢?在一個(gè)有n個(gè)CPU的系統(tǒng)上,當(dāng)一個(gè)單一的線程線性地輪流處理每一個(gè)線程時(shí),可能有n-1個(gè)cpu處于空閑狀態(tài)
那么讓不同道請(qǐng)求不同的服務(wù)類的辦法如何?想象一下,如果一個(gè)應(yīng)用程序?yàn)榇罅康姆植际降膫鞲衅饔涗浶畔?。每個(gè)傳感器在服務(wù)線程遍歷每個(gè)就緒的通道時(shí)需要等待數(shù)秒鐘。這在響應(yīng)時(shí)間不重要時(shí)是可以的。但對(duì)于高優(yōu)先級(jí)的連接(如操作命令),如果只用一個(gè)線程為所有通道提供服務(wù),將不得不在隊(duì)列中等待。不同的應(yīng)用程序的要求也是不同的。您采用的策略會(huì)受到您嘗試解決的問題的影響。
在第一個(gè)場(chǎng)景中,如果您想要將更多的線程來為通道提供服務(wù),請(qǐng)抵抗住使用多個(gè)選擇器的欲望。在大量通道上執(zhí)行就緒選擇并不會(huì)有很大的開銷,大多數(shù)工作是由底層操作系統(tǒng)完成的。管理多個(gè)選擇器并隨機(jī)地將通道分派給它們當(dāng)中的一個(gè)并不是這個(gè)問題的合理的解決方案。這只會(huì)形成這個(gè)場(chǎng)景的一個(gè)更小的版本。一個(gè)更好的策略是對(duì)所有的可選擇通道使用一個(gè)選擇器,并將對(duì)就緒通道的服務(wù)委托給其他線程。您只用一個(gè)線程監(jiān)控通道的就緒狀態(tài)并使用一個(gè)協(xié)調(diào)好的工作線程池來處理共接收到的數(shù)據(jù)。根據(jù)部署的條件,線程池的大小是可以調(diào)整的(或者它自己進(jìn)行動(dòng)態(tài)的調(diào)整)。對(duì)可選擇通道的管理仍然是簡單的,而簡單的就是好的。
第二個(gè)場(chǎng)景中,某些通道要求比其他通道更高的響應(yīng)速度,可以通過使用兩個(gè)選擇器來解決:一個(gè)為命令連接服務(wù),另一個(gè)為普通連接服務(wù)。但這種場(chǎng)景也可以使用與第一個(gè)場(chǎng)景十分相似的辦法來解決。與將所有準(zhǔn)備好的通道放到同一個(gè)線程池的做法不同,通道可以根據(jù)功能由不同的工作線程來處理。它們可能可以是日志線程池,命令/控制線程池,狀態(tài)請(qǐng)求線程池,等等。
例 4-2的代碼是例4-1的一般性的選擇循環(huán)的擴(kuò)展。它覆寫了readDataFromSocket( )方法,并使用線程池來為準(zhǔn)備好數(shù)據(jù)用于讀取的通道提供服務(wù)。與在主線程中同步地讀取數(shù)據(jù)不同,這個(gè)版本的實(shí)現(xiàn)將SelectionKey對(duì)象傳遞給為其服務(wù)的工作線程。
?
例 4-2. 使用線程池來為通道提供服務(wù)
?
/*** */ package test.noi.select;import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator;/*** * Simple echo-back server which listens for incoming stream connections and* * echoes back whatever it reads. A single Selector object is used to listen to* * the server socket (to accept new connections) and all the active socket* * channels.* * @author Ron Hitchens (ron@ronsoft.com)*/ public class SelectSockets {public static int PORT_NUMBER = 1234;public static void main(String[] argv) throws Exception {new SelectSockets().go(argv);}public void go(String[] argv) throws Exception {int port = PORT_NUMBER;if (argv.length > 0) {// Override default listen portport = Integer.parseInt(argv[0]);}System.out.println("Listening on port " + port);ServerSocketChannel serverChannel = ServerSocketChannel.open();// Get the associated ServerSocket to bind it withServerSocket serverSocket = serverChannel.socket();// Create a new Selector for use belowSelector selector = Selector.open();// Set the port the server channel will listen toserverSocket.bind(new InetSocketAddress(port));// Set nonblocking mode for the listening socketserverChannel.configureBlocking(false);// Register the ServerSocketChannel with the Selector serverChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {// This may block for a long time. Upon returning, the// selected set contains keys of the ready channels.int n = selector.select();if (n == 0) {continue;// nothing to do }// Get an iterator over the set of selected keysIterator it = selector.selectedKeys().iterator();// Look at each key in the selected setwhile (it.hasNext()) {SelectionKey key = (SelectionKey) it.next();// Is a new connection coming in?if (key.isAcceptable()) {ServerSocketChannel server = (ServerSocketChannel) key.channel();SocketChannel channel = server.accept();registerChannel(selector, channel, SelectionKey.OP_READ);sayHello(channel);}// Is there data to read on this channel?if (key.isReadable()) {readDataFromSocket(key);}// Remove key from selected set; it's been handled it.remove();}}}/*** * Register the given channel with the given selector for the given ** operations of interest*/protected void registerChannel(Selector selector,SelectableChannel channel, int ops) throws Exception {if (channel == null) {return; // could happen}// Set the new channel nonblockingchannel.configureBlocking(false);// Register it with the selector channel.register(selector, ops);}// ----------------------------------------------------------// Use the same byte buffer for all channels. A single thread is// servicing all the channels, so no danger of concurrent acccess.private ByteBuffer buffer = ByteBuffer.allocateDirect(1024);/*** * Sample data handler method for a channel with data ready to read. * * @param* key * A SelectionKey object associated with a channel determined by * the* selector to be ready for reading. If the channel returns* * * an EOF condition, it is closed here, which automatically * invalidates* the associated key. The selector will then * de-register the channel on* the next select call.*/protected void readDataFromSocket(SelectionKey key) throws Exception {SocketChannel socketChannel = (SocketChannel) key.channel();int count;buffer.clear();// Empty buffer// Loop while data is available;channel is nonblockingwhile ((count = socketChannel.read(buffer)) > 0) {buffer.flip();// Make buffer readable// Send the data; don't assume it goes all at oncewhile (buffer.hasRemaining()) {socketChannel.write(buffer);}// WARNING: the above loop is evil. Because// it's writing back to the same nonblocking// channel it read the data from, this code can// potentially spin in a busy loop. In real life// you'd do something more useful than this. buffer.clear();// Empty buffer }if (count < 0) {// Close channel on EOF, invalidates the key socketChannel.close();}}/*** * Spew a greeting to the incoming client connection. * * @param channel ** The newly connected SocketChannel to say hello to.*/private void sayHello(SocketChannel channel) throws Exception {buffer.clear();buffer.put("Hi there!\r\n".getBytes());buffer.flip();channel.write(buffer);} }?
?
?
由于執(zhí)行選擇過程的線程將重新循環(huán)并幾乎立即再次調(diào)用select( ),鍵的interest集合將被修改,并將interest(感興趣的操作)從讀取就緒(read-rreadiness)狀態(tài)中移除。這將防止選擇器重復(fù)地調(diào)用readDataFromSocket( )(因?yàn)橥ǖ廊匀粫?huì)準(zhǔn)備好讀取數(shù)據(jù),直到工作線程從它那里讀取數(shù)據(jù))。當(dāng)工作線程結(jié)束為通道提供的服務(wù)時(shí),它將再次更新鍵的ready集合,來將interest重新放到讀取就緒集合中。它也會(huì)在選擇器上顯式地調(diào)用wakeup( )。如果主線程在select( )中被阻塞,這將使它繼續(xù)執(zhí)行。這個(gè)選擇循環(huán)會(huì)再次執(zhí)行一個(gè)輪回(可能什么也沒做)并帶著被更新的鍵重新進(jìn)入select( )。
?
?
?
以上內(nèi)容出自 nio 一書
?
轉(zhuǎn)載于:https://www.cnblogs.com/mjorcen/p/4203867.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
- 上一篇: 关于redis实现单点登录的一点思路
- 下一篇: 胡适:一个最低限度的国学书目