Python网络编程中的select 和 poll I/O复用的简单使用
From: http://www.cnblogs.com/coser/archive/2012/01/06/2315216.html
首先列一下,sellect、poll、epoll三者的區別?
select?
select最早于1983年出現在4.2BSD中,它通過一個select()系統調用來監視多個文件描述符的數組,當select()返回后,該數組中就緒的文件描述符便會被內核修改標志位,使得進程可以獲得這些文件描述符從而進行后續的讀寫操作。
select目前幾乎在所有的平臺上支持,其良好跨平臺支持也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。
select的一個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改宏定義甚至重新編譯內核的方式提升這一限制。
另外,select()所維護的存儲大量文件描述符的數據結構,隨著文件描述符數量的增大,其復制的開銷也線性增長。同時,由于網絡響應時間的延遲使得大量TCP連接處于非活躍狀態,但調用select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。
poll?
poll在1986年誕生于System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大文件描述符數量的限制。
poll和select同樣存在一個缺點就是,包含大量文件描述符的數組被整體復制于用戶態和內核的地址空間之間,而不論這些文件描述符是否就緒,它的開銷隨著文件描述符數量的增加而線性增大。
另外,select()和poll()將就緒的文件描述符告訴進程后,如果進程沒有對其進行IO操作,那么下次調用select()和poll()的時候將再次報告這些文件描述符,所以它們一般不會丟失就緒的消息,這種方式稱為水平觸發(Level Triggered)。
epoll?
直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。
epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那么它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。
epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這里也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。
另一個本質的改進在于epoll采用基于事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法后,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來注冊一個文件描述符,一旦基于某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。
使用 select :?
在python中,select函數是一個對底層操作系統的直接訪問的接口。它用來監控sockets、files和pipes,等待IO完成(Waiting for I/O completion)。當有可讀、可寫或是異常事件產生時,select可以很容易的監控到。?
select.select(rlist, wlist, xlist[, timeout]) 傳遞三個參數,一個為輸入而觀察的文件對象列表,一個為輸出而觀察的文件對象列表和一個觀察錯誤異常的文件列表。第四個是一個可選參數,表示超時秒數。其返回3個tuple,每個tuple都是一個準備好的對象列表,它和前邊的參數是一樣的順序。下面,主要結合代碼,簡單說說select的使用。?
Server端程序:?
1、該程序主要是利用socket進行通信,接收客戶端發送過來的數據,然后再發還給客戶端。?
2、首先建立一個TCP/IP socket,并將其設為非阻塞,然后進行bind和listen。?
3、通過select函數獲取到三種文件列表,分別對每個列表的每個元素進行輪詢,對不同socket進行不同的處理,最外層循環直到inputs列表為空為止?
4、當設置timeout參數時,如果發生了超時,select函數會返回三個空列表。?
代碼如下(代碼中已經有很詳細的注釋,這里就不過多解釋了):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | ''' Created on 2012-1-6 The echo server example from the socket section can be extanded to watche for more than one connection at a time by using select() .The new version starts out by creating a nonblocking TCP/IP socket and configuring it to listen on an address @author: xiaojay ''' importselect importsocket importQueue #create a socket server =socket.socket(socket.AF_INET,socket.SOCK_STREAM) server.setblocking(False) #set option reused server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR? , 1) server_address=('192.168.1.102',10001) server.bind(server_address) server.listen(10) #sockets from which we except to read inputs =[server] #sockets from which we expect to write outputs =[] #Outgoing message queues (socket:Queue) message_queues ={} #A optional parameter for select is TIMEOUT timeout =20 whileinputs: ????print"waiting for next event" ????readable , writable , exceptional =select.select(inputs, outputs, inputs, timeout) ????# When timeout reached , select return three empty lists ????ifnot(readable orwritable orexceptional) : ????????print"Time out ! " ????????break;??? ????fors inreadable : ????????ifs isserver: ????????????# A "readable" socket is ready to accept a connection ????????????connection, client_address =s.accept() ????????????print"??? connection from ", client_address ????????????connection.setblocking(0) ????????????inputs.append(connection) ????????????message_queues[connection] =Queue.Queue() ????????else: ????????????data =s.recv(1024) ????????????ifdata : ????????????????print" received ", data , "from ",s.getpeername() ????????????????message_queues[s].put(data) ????????????????# Add output channel for response??? ????????????????ifs notinoutputs: ????????????????????outputs.append(s) ????????????else: ????????????????#Interpret empty result as closed connection ????????????????print"? closing", client_address ????????????????ifs inoutputs : ????????????????????outputs.remove(s) ????????????????inputs.remove(s) ????????????????s.close() ????????????????#remove message queue ????????????????delmessage_queues[s] ????fors inwritable: ????????try: ????????????next_msg =message_queues[s].get_nowait() ????????exceptQueue.Empty: ????????????print" ", s.getpeername() , 'queue empty' ????????????outputs.remove(s) ????????else: ????????????print" sending ", next_msg , " to ", s.getpeername() ????????????s.send(next_msg) ????? ????fors inexceptional: ????????print" exception condition on ", s.getpeername() ????????#stop listening for input on the connection ????????inputs.remove(s) ????????ifs inoutputs: ????????????outputs.remove(s) ????????s.close() ????????#Remove message queue ????????delmessage_queues[s] ???????????????????? |
Client端程序:?
Client端創建多個socket進行server鏈接,用于觀察使用select函數的server端如何進行處理。?
代碼如下(代碼中已經有很詳細的注釋,這里就不過多解釋了):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | ''' Created on 2012-1-5 The example client program uses some sockets to demonstrate how the server with select() manages multiple connections at the same time . The client starts by connecting each TCP/IP socket to the server @author: peter ''' importsocket messages =["This is the message", ????????????"It will be sent", ????????????"in parts "] print"Connect to the server" server_address =("192.168.1.102",10001) #Create a TCP/IP sock socks =[] fori inrange(10): ????socks.append(socket.socket(socket.AF_INET,socket.SOCK_STREAM)) fors insocks: ????s.connect(server_address) counter =0 formessage inmessages : ????#Sending message from different sockets ????fors insocks: ????????counter+=1 ????????print"? %s sending %s"%(s.getpeername(),message+" version "+str(counter)) ????????s.send(message+" version "+str(counter)) ????#Read responses on both sockets ????fors insocks: ????????data =s.recv(1024) ????????print" %s received %s"%(s.getpeername(),data) ????????ifnotdata: ????????????print"closing socket ",s.getpeername() ????????????s.close() |
?
使用Poll:
Server端:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 | ''' Created on 2012-1-6 The poll function provides similar features to select() , but the underlying implementation is more efficient. But poll() is not supported under windows . @author: xiaojay ''' importsocket importselect importQueue # Create a TCP/IP socket, and then bind and listen server =socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address =("192.168.1.102", 10001) print?"Starting up on %s port %s"%server_address server.bind(server_address) server.listen(5) message_queues ={} #The timeout value is represented in milliseconds, instead of seconds. timeout =1000 # Create a limit for the event READ_ONLY =( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE =(READ_ONLY|select.POLLOUT) # Set up the poller poller =select.poll() poller.register(server,READ_ONLY) #Map file descriptors to socket objects fd_to_socket ={server.fileno():server,} whileTrue: ????print"Waiting for the next event" ????events =poller.poll(timeout) ????print"*"*20 ????printlen(events) ????printevents ????print"*"*20 ????forfd ,flag in?events: ????????s =fd_to_socket[fd] ????????ifflag & (select.POLLIN | select.POLLPRI) : ????????????ifs isserver : ????????????????# A readable socket is ready to accept a connection ????????????????connection , client_address =s.accept() ????????????????print" Connection ", client_address ????????????????connection.setblocking(False) ????????????????? ????????????????fd_to_socket[connection.fileno()] =connection ????????????????poller.register(connection,READ_ONLY) ????????????????? ????????????????#Give the connection a queue to send data ????????????????message_queues[connection]? =Queue.Queue() ????????????else: ????????????????data =s.recv(1024) ????????????????ifdata: ????????????????????# A readable client socket has data ????????????????????print"? received %s from %s "%(data, s.getpeername()) ????????????????????message_queues[s].put(data) ????????????????????poller.modify(s,READ_WRITE) ????????????????else: ????????????????????# Close the connection ????????????????????print"? closing", s.getpeername() ????????????????????# Stop listening for input on the connection ????????????????????poller.unregister(s) ????????????????????s.close() ????????????????????delmessage_queues[s] ????????elifflag & select.POLLHUP : ????????????#A client that "hang up" , to be closed. ????????????print" Closing ", s.getpeername() ,"(HUP)" ????????????poller.unregister(s) ????????????s.close() ????????elifflag & select.POLLOUT : ????????????#Socket is ready to send data , if there is any to send ????????????try: ????????????????next_msg =message_queues[s].get_nowait() ????????????exceptQueue.Empty: ????????????????# No messages waiting so stop checking ????????????????prints.getpeername() , " queue empty" ????????????????poller.modify(s,READ_ONLY) ????????????else: ????????????????print" sending %s to %s"%(next_msg , s.getpeername()) ????????????????s.send(next_msg) ????????elifflag & select.POLLERR: ????????????#Any events with POLLERR cause the server to close the socket ????????????print"? exception on", s.getpeername() ????????????poller.unregister(s) ????????????s.close() ????????????delmessage_queues[s] |
?
總結
以上是生活随笔為你收集整理的Python网络编程中的select 和 poll I/O复用的简单使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安装教程之maven下载及安装
- 下一篇: 图片以base64格式显示出来