Python异步非阻塞IO多路复用Select/Poll/Epoll使用
生活随笔
收集整理的這篇文章主要介紹了
Python异步非阻塞IO多路复用Select/Poll/Epoll使用
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
來源:http://www.haiyun.me/archives/1056.html
有許多封裝好的異步非阻塞IO多路復用框架,底層在linux基于最新的epoll實現,為了更好的使用,了解其底層原理還是有必要的。
下面記錄下分別基于Select/Poll/Epoll的echo server實現。
Python Select Server,可監控事件數量有限制:
#!/usr/bin/python # -*- coding: utf-8 -*- import socket import select import Queueserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) server.bind(server_address) server.listen(5) print "服務器啟動成功,監聽IP:" , server_address message_queues = {} #超時,毫秒 timeout = 5000 #監聽哪些事件 READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR) READ_WRITE = (READ_ONLY|select.POLLOUT) #新建輪詢事件對象 poller = select.poll() #注冊本機監聽socket到等待可讀事件事件集合 poller.register(server,READ_ONLY) #文件描述符到socket映射 fd_to_socket = {server.fileno():server,} while True:print "等待活動連接......"#輪詢注冊的事件集合events = poller.poll(timeout)if not events:print "poll超時,無活動連接,重新poll......"continueprint "有" , len(events), "個新事件,開始處理......"for fd ,flag in events:s = fd_to_socket[fd]#可讀事件if flag & (select.POLLIN | select.POLLPRI) :if s is server :#如果socket是監聽的server代表有新連接connection , client_address = s.accept()print "新連接:" , client_addressconnection.setblocking(False)fd_to_socket[connection.fileno()] = connection#加入到等待讀事件集合poller.register(connection,READ_ONLY)message_queues[connection] = Queue.Queue()else :#接收客戶端發送的數據data = s.recv(1024)if data:print "收到數據:" , data , "客戶端:" , s.getpeername()message_queues[s].put(data)#修改讀取到消息的連接到等待寫事件集合poller.modify(s,READ_WRITE)else :# Close the connectionprint " closing" , s.getpeername()# Stop listening for input on the connectionpoller.unregister(s)s.close()del message_queues[s]#連接關閉事件elif flag & select.POLLHUP :print " Closing ", s.getpeername() ,"(HUP)"poller.unregister(s)s.close()#可寫事件elif flag & select.POLLOUT :try:msg = message_queues[s].get_nowait()except Queue.Empty:print s.getpeername() , " queue empty"poller.modify(s,READ_ONLY)else :print "發送數據:" , data , "客戶端:" , s.getpeername()s.send(msg)#異常事件elif flag & select.POLLERR:print " exception on" , s.getpeername()poller.unregister(s)s.close()del message_queues[s]</span>Python Epoll Server,基于回調的事件通知模式,輕松管理大量連接:
#!/usr/bin/python # -*- coding: utf-8 -*- import socket, select import Queueserversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_address = ("192.168.1.5", 8080) serversocket.bind(server_address) serversocket.listen(1) print "服務器啟動成功,監聽IP:" , server_address serversocket.setblocking(0) timeout = 10 #新建epoll事件對象,后續要監控的事件添加到其中 epoll = select.epoll() #添加服務器監聽fd到等待讀事件集合 epoll.register(serversocket.fileno(), select.EPOLLIN) message_queues = {}fd_to_socket = {serversocket.fileno():serversocket,} while True:print "等待活動連接......"#輪詢注冊的事件集合events = epoll.poll(timeout)if not events:print "epoll超時無活動連接,重新輪詢......"continueprint "有" , len(events), "個新事件,開始處理......"for fd, event in events:socket = fd_to_socket[fd]#可讀事件if event & select.EPOLLIN:#如果活動socket為服務器所監聽,有新連接if socket == serversocket:connection, address = serversocket.accept()print "新連接:" , addressconnection.setblocking(0)#注冊新連接fd到待讀事件集合epoll.register(connection.fileno(), select.EPOLLIN)fd_to_socket[connection.fileno()] = connectionmessage_queues[connection] = Queue.Queue()#否則為客戶端發送的數據else:data = socket.recv(1024)if data:print "收到數據:" , data , "客戶端:" , socket.getpeername()message_queues[socket].put(data)#修改讀取到消息的連接到等待寫事件集合epoll.modify(fd, select.EPOLLOUT)#可寫事件elif event & select.EPOLLOUT:try:msg = message_queues[socket].get_nowait()except Queue.Empty:print socket.getpeername() , " queue empty"epoll.modify(fd, select.EPOLLIN)else :print "發送數據:" , data , "客戶端:" , socket.getpeername()socket.send(msg)#關閉事件elif event & select.EPOLLHUP:epoll.unregister(fd)fd_to_socket[fd].close()del fd_to_socket[fd] epoll.unregister(serversocket.fileno()) epoll.close() serversocket.close()
總結
以上是生活随笔為你收集整理的Python异步非阻塞IO多路复用Select/Poll/Epoll使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++ 基本数据类型 的 字节数
- 下一篇: 轻量级分布式任务调度平台 XXL-JOB