nio高并发编程
之前http://blog.csdn.net/sunmenggmail/article/details/8638480
已經整理過,這次是2.0版
參考:
http://daizuan.iteye.com/blog/1112909
http://daizuan.iteye.com/blog/1113471
http://www.cnblogs.com/pingh/archive/2013/07/30/3224990.html
http://www.cnblogs.com/ajian005/archive/2012/09/27/2753662.html(相當好,總結了開源框架)
陷阱1:處理事件忘記移除key
在select返回值大于0的情況下,循環處理
Selector.selectedKeys集合,每處理一個必須從Set中移除
?不移除的后果是本次的就緒的key集合下次會再次返回,導致無限循環,CPU消耗100%
?陷阱2:Selector返回的key集合非線程安全
Selector.selectedKeys/keys 返回的集合都是非線程安全的
Selector.selectedKeys返回的可移除
Selector.keys 不可變
對selected keys的處理必須單線程處理或者適當同步
陷阱3:正確注冊Channel和更新interest
直接注冊不可嗎?
channel.register(selector, ops, attachment);
不是不可以,效率問題
至少加兩次鎖,鎖競爭激烈
Channel本身的regLock,競爭幾乎沒有
Selector內部的key集合,競爭激烈
更好的方式:加入緩沖隊列,等待注冊,reactor單線程處理
同樣,SelectionKey.interest(ops)
在linux上會阻塞,需要獲取selector內部鎖做同步
在win32上不會阻塞
屏蔽平臺差異,避免鎖的激烈競爭,采用類似注冊channel的方式:
?
陷阱4:正確處理OP_WRITE
OP_WRITE處理不當很容易導致CPU 100%
OP_WRITE觸發條件:
? ?前提:interest了OP_WRITE
? ?觸發條件:
? ? ? ? socket發送緩沖區可寫
? ? ? ? 遠端關閉
? ? ? ? 有錯誤發生
正確的處理方式:
? ?僅在已經連接的channel上注冊
? ?僅在有數據可寫的時候才注冊
? ?觸發之后立即取消注冊,否則會繼續觸發導致循環
? ?處理完成后視情況決定是否繼續注冊
? ? ?沒有完全寫入,繼續注冊
? ? ?全部寫入,無需注冊
陷阱5:正確取消注冊channel
SelectableChannel一旦注冊將一直有效直到明確取消
怎么取消注冊?
? ?channel.close(),內部會調用key.cancel()
? ?key.cancel();
? ?中斷channel的讀寫所在線程引起的channel關閉
但是這樣還不夠!
? ?key.cancel()僅僅是將key加入cancelledKeys
? ?直到下一次select才真正處理
? ?并且channel的socketfd只有在真正取消注冊后才會close(fd)
后果是什么?
? 服務端,問題不大,select調用頻繁
? 客戶端,通常只有一個連接,關閉channel之后,沒有調用select就關閉了selector
? sockfd沒有關閉,停留在CLOSE_WAIT狀態
正確的處理方式,取消注冊也應當作為事件交給reactor處理,及時wakeup做select
適當的時候調用selector.selectNow()
? Netty在超過256連接關閉的時候主動調用一次selectNow
陷阱6:同時注冊OP_ACCPET和OP_READ,同時注冊OP_CONNECT和OP_WRITE
在底層來說,只有兩種事件:read和write
Java NIO還引入了OP_ACCEPT和OP_CONNECT
? OP_ACCEPT、OP_READ == Read
? OP_CONNECT、OP_WRITE == Write
同時注冊OP_ACCEPT和OP_READ ,或者同時注冊OP_CONNECT和OP_WRITE在不同平臺上產生錯誤的行為,避免這樣做!
陷阱7:正確處理connect
SocketChannel.connect方法在非阻塞模式下可能返回false,切記判斷返回值
? ? 如果是loopback連接,可能直接返回true,表示連接成功
? ? 返回false,后續處理
? ? ? ?注冊channel到selector,監聽OP_CONNECT事件
? ? ? ?在OP_CONNECT觸發后,調用SocketChannel.finishConnect成功后,連接才真正建立
陷阱:
? ? 沒有判斷connect返回值
? ? 沒有調用finishConnect
? ? 在OP_CONNECT觸發后,沒有移除OP_CONNECT,導致SelectionKey一直處于就緒狀態,空耗CPU
? ? ? ?OP_CONNECT只能在還沒有連接的channel上注冊
忠告
盡量不要嘗試實現自己的nio框架,除非有經驗豐富的工程師
盡量使用經過廣泛實踐的開源NIO框架Mina、Netty3、xSocket
盡量使用最新穩定版JDK
遇到問題的時候,也許你可以先看下java的bug database
elector自身是線程安全的,而他的key set卻不是。在一次選擇發生的過程中,對于key的關心事件的修改要等到下一次select的時候才會生效。 另外,key和其代表的channel有可能在任何時候被cancel和close。因此存在于key set中的key并不代表其key是有效的,也不代表其channel是open的。如果key有可能被其他的線程取消或關閉channel,程序必須小 心的同步檢查這些條件。?
阻塞了的select可以通過調用selector的wakeup方法來喚醒。
http://blog.csdn.net/cutesource/article/details/6192016
如何正確使用NIO來構架網絡服務器一直是最近思考的一個問題,于是乎分析了一下Jetty、Tomcat和Mina有關NIO的源碼,發現大伙都基于類似的方式,我感覺這應該算是NIO構架網絡服務器的經典模式,并基于這種模式寫了個小小網絡服務器,壓力測試了一下,效果還不錯。廢話不多說,先看看三者是如何使用NIO的。
Jetty Connector的實現
先看看有關類圖:
其中:
SelectChannelConnector負責組裝各組件
SelectSet負責偵聽客戶端請求
SelectChannelEndPoint負責IO的讀和寫
HttpConnection負責邏輯處理
在整個服務端處理請求的過程可以分為三個階段,時序圖如下所示:
階段一:監聽并建立連接
這一過程主要是啟動一個線程負責accept新連接,監聽到后分配給相應的SelectSet,分配的策略就是輪詢。
階段二:監聽客戶端的請求
這一過程主要是啟動多個線程(線程數一般為服務器CPU的個數),讓SelectSet監聽所管轄的channel隊列,每個SelectSet維護一個Selector,這個Selector監聽隊列里所有的channel,一旦有讀事件,從線程池里拿線程去做處理請求
階段三:處理請求
這一過程就是每次客戶端請求的數據處理過程,值得注意的是為了不讓后端的業務處理阻礙Selector監聽新的請求,就多線程來分隔開監聽請求和處理請求兩個階段。
由此可以大致總結出Jetty有關NIO使用的模式,如下圖所示:
最核心就是把三件不同的事情隔離開,并用不同規模的線程去處理,最大限度地利用NIO的異步和通知特性
下面再來看看Tomcat是如何使用NIO來構架Connector這塊
先看看Tomcat Connector這塊的類圖:
其中:
NioEndpoint負責組裝各部件
Acceptor負責監聽新連接,并把連接交給Poller
Poller負責監聽所管轄的channel隊列,并把請求交給SocketProcessor處理
SocketProcessor負責數據處理,并把請求傳遞給后端業務處理模塊
在整個服務端處理請求的過程可以分為三個階段,時序圖如下所示:
階段一:監聽并建立連接
這一階段主要是Acceptor監聽新連接,并輪詢取一個Poller ,把連接交付給Poller
階段二:監聽客戶端的請求
這一過程主要是讓每個Poller監聽所管轄的channel隊列,select到新請求后交付給SocketProcessor處理
階段三:處理請求
這一過程就是從多線程執行SocketProcessor,做數據和業務處理
于是乎我們發現拋開具體代碼細節,Tomcat和Jetty在NIO的使用方面是非常一致的,采用的模式依然是下圖:
Mina框架
最后我們再看看NIO方面最著名的框架Mina,拋開Mina有關session和處理鏈條等方面的設計,單單挑出前端網絡層處理來看,也采用的是與Jetty和Tomcat類似的模式,只不過它做了些簡化,它沒有隔開請求偵聽和請求處理兩個階段,因此,宏觀上看它只分為兩個階段。
先看看它的類圖:
其中:
SocketAcceptor起線程調用SocketAcceptor.Work負責新連接偵聽,并交給SocketIoProcessor處理
SocketIoProcessor起線程調用SocketIoProcessor.Work負責偵聽所管轄的channel隊列, select到新請求后交給IoFilterChain處理
IoFilterChain組裝了mina的處理鏈條
在整個服務端處理請求的過程可以分為兩個階段,時序圖如下所示:
階段一:監聽并建立連接
階段二:監聽并處理客戶端的請求
?
總結來看Jetty、tomcat和Mina,我們也大概清楚了該如何基于NIO來構架網絡服務器,通過這個提煉出來的模式,我寫了個很簡單的NIO Server,在保持連接的情況下,可以很輕松的保持6萬連接(由于有65535連接限制),并能在負載只有3左右的情況下(4核),承擔3到4萬的TPS請求(當然做的事情很簡單,僅僅是把buffer轉化為自定義協議的包,然后再把包轉為buffer寫到客戶端)。因此簡單地實踐一下可以證明這個模式的有效性,不妨再看看這個圖,希望對大伙以后寫server有用:
安裝這個架構,寫了個粗略的版本,以后有機會一定要看看jetty等是怎么優雅的實現的
//server
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.*; import java.util.concurrent.*;public class Server {private ConcurrentLinkedQueue<SelectionKey> m_conn = new ConcurrentLinkedQueue<SelectionKey>(); private ConcurrentLinkedQueue<SelectionKey> m_req = new ConcurrentLinkedQueue<SelectionKey>();private final int m_processNum = 3;private final int m_worksNum = 3;private final int m_port = 3562;private ServerSocketChannel channel ;private boolean connQuEpt = true;private boolean reqQuEpt = true;private Selector selector;//for connectionprivate List<Selector> m_reqSelector = new ArrayList<Selector>();public void listen() throws IOException{selector = Selector.open();channel = ServerSocketChannel.open();channel.configureBlocking(false);channel.socket().bind(new InetSocketAddress(m_port));channel.register(selector, SelectionKey.OP_ACCEPT);new Thread(new ConnectionHander()).start();//new Thread(new RequestManager()).start();creatRequestHanders();new Thread(new ProcessManager()).start();}/*class RequestManager implements Runnable {private ExecutorService m_reqPool;public RequestManager() {m_reqPool = Executors.newFixedThreadPool(m_processNum, new RequestThreadFactor());}public void run() {while (true) {}}}*/void creatRequestHanders() {try {for (int i = 0; i < m_processNum; ++i) {Selector slt = Selector.open();m_reqSelector.add(slt);RequestHander req = new RequestHander();req.setSelector(slt);new Thread(req).start();}}catch(IOException e) {e.printStackTrace();}}class ProcessManager implements Runnable {private ExecutorService m_workPool;public ProcessManager() {m_workPool = Executors.newFixedThreadPool(m_worksNum);}public void run() {SelectionKey key;while(true) {//太消耗cpu//應該要加一個wait,但是這樣就有鎖了while((key = m_req.poll()) !=null) {ProcessRequest preq = new ProcessRequest();preq.setKey(key);m_workPool.execute(preq);}}}}/*class RequestThread extends Thread {private Selector selector;public RequestThread(Runnable r) {super(r);try {selector = Selector.open();}catch(IOException e) {e.printStackTrace();//todo}}}class RequestThreadFactor implements ThreadFactory {public Thread newThread(Runnable r) {return new RequestThread(r);}}*///監視請求連接class ConnectionHander implements Runnable {int idx = 0;@Overridepublic void run() {System.out.println("listenning to connection");while (true) {try {selector.select();Set<SelectionKey> selectKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectKeys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();m_conn.add(key);int num = m_reqSelector.size();m_reqSelector.get(idx).wakeup();//防止監聽request的進程都在堵塞中idx =(idx + 1)%num;}}catch(IOException e) {e.printStackTrace();}}}}//監視讀操作class RequestHander implements Runnable {private Selector selector;public void setSelector(Selector slt) {selector = slt;}public void run() {try {SelectionKey key;System.out.println(Thread.currentThread() + "listenning to request");while (true) {selector.select();while((key = m_conn.poll()) != null) {ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();//接受一個連接sc.configureBlocking(false);sc.register(selector, SelectionKey.OP_READ);System.out.println(Thread.currentThread() + "a connected line");}Set<SelectionKey> keys = selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while(it.hasNext()) {SelectionKey keytmp = it.next();it.remove();if (keytmp.isReadable()) {m_req.add(keytmp);}}}}catch(IOException e) {e.printStackTrace();}}}//讀數據并進行處理和發送返回class ProcessRequest implements Runnable {SelectionKey key;public void setKey(SelectionKey key) {this.key = key;}public void run() {ByteBuffer buffer = ByteBuffer.allocate(1024);SocketChannel sc = (SocketChannel) key.channel();String msg = null;try{int readBytes = 0;int ret;try{while((ret = sc.read(buffer)) > 0) {}}catch(IOException e) {}finally {buffer.flip();}if (readBytes > 0) {msg = Charset.forName("utf-8").decode(buffer).toString();buffer = null;}}finally {if(buffer != null)buffer.clear();}try {System.out.println("server received [ " + msg +"] from client address : " + sc.getRemoteAddress());Thread.sleep(2000);sc.write(ByteBuffer.wrap((msg + " server response ").getBytes(Charset.forName("utf-8"))));}catch(Exception e) {}}}public static void main(String[] args) {// TODO Auto-generated method stubServer server = new Server();try {server.listen();}catch(IOException e) {}}}//client
package javatest;import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.*;public class Client implements Runnable {// 空閑計數器,如果空閑超過10次,將檢測server是否中斷連接.private static int idleCounter = 0;private Selector selector;private SocketChannel socketChannel;private ByteBuffer temp = ByteBuffer.allocate(1024);public static void main(String[] args) throws IOException {Client client= new Client();new Thread(client).start();//client.sendFirstMsg();}public Client() throws IOException {// 同樣的,注冊鬧鐘.this.selector = Selector.open();// 連接遠程serversocketChannel = SocketChannel.open();// 如果快速的建立了連接,返回true.如果沒有建立,則返回false,并在連接后出發Connect事件.Boolean isConnected = socketChannel.connect(new InetSocketAddress("localhost", 3562));socketChannel.configureBlocking(false);SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);if (isConnected) {this.sendFirstMsg();} else {// 如果連接還在嘗試中,則注冊connect事件的監聽. connect成功以后會出發connect事件.key.interestOps(SelectionKey.OP_CONNECT);}}public void sendFirstMsg() throws IOException {String msg = "Hello NIO.";socketChannel.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));}@Overridepublic void run() {while (true) {try {// 阻塞,等待事件發生,或者1秒超時. num為發生事件的數量.int num = this.selector.select(1000);if (num ==0) {idleCounter ++;if(idleCounter >10) {// 如果server斷開了連接,發送消息將失敗.try {this.sendFirstMsg();} catch(ClosedChannelException e) {e.printStackTrace();this.socketChannel.close();return;}}continue;} else {idleCounter = 0;}Set<SelectionKey> keys = this.selector.selectedKeys();Iterator<SelectionKey> it = keys.iterator();while (it.hasNext()) {SelectionKey key = it.next();it.remove();if (key.isConnectable()) {// socket connectedSocketChannel sc = (SocketChannel)key.channel();if (sc.isConnectionPending()) {sc.finishConnect();}// send first message;this.sendFirstMsg();}if (key.isReadable()) {// msg received.SocketChannel sc = (SocketChannel)key.channel();this.temp = ByteBuffer.allocate(1024);int count = sc.read(temp);if (count<0) {sc.close();continue;}// 切換buffer到讀狀態,內部指針歸位.temp.flip();String msg = Charset.forName("UTF-8").decode(temp).toString();System.out.println("Client received ["+msg+"] from server address:" + sc.getRemoteAddress());Thread.sleep(1000);// echo back.sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));// 清空buffertemp.clear();}}} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}}}總結
- 上一篇: 安装ubuntu 13.04
- 下一篇: Http和Socket连接区别