【经典】5种IO模型 | IO多路复用
上篇回顧:靜態服務器+壓測
3.2.概念篇
1.同步與異步
同步是指一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成后,依賴的任務才能算完成。
異步是指不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什么工作。然后繼續執行下面代碼邏輯,只要自己完成了整個任務就算完成了(異步一般使用狀態、通知和回調)
PS:項目里面一般是這樣的:(個人經驗)
- 讀一般都是實時返回,代碼一般都是await xxx()
- 異步:現在用戶寫了篇文章,可以異步操作,就算沒真正寫到數據庫也可以返回:發表成功(大不了失敗提示一下)
- 同步:用戶獲取訂單信息,你如果異步就會這樣了:提示下獲取成功,然后一片空白...用戶不卸載就怪了...
2.阻塞與非阻塞
阻塞是指調用結果返回之前,當前線程會被掛起,一直處于等待消息通知,不能夠執行其他業務(大部分代碼都是這樣的)
非阻塞是指在不能立刻得到結果之前,該函數不會阻塞當前線程,而會立刻返回(繼續執行下面代碼,或者重試機制走起)
PS:項目里面重試機制為啥一般都是3次?
3.五種IO模型
對于一次IO訪問,數據會先被拷貝到內核的緩沖區中,然后才會從內核的緩沖區拷貝到應用程序的地址空間。需要經歷兩個階段:
由于存在這兩個階段,Linux產生了下面五種IO模型(以socket為例)
- 當用戶進程調用了recvfrom等阻塞方法時,內核進入IO的第1個階段:準備數據(內核需要等待足夠的數據再拷貝)這個過程需要等待,用戶進程會被阻塞,等內核將數據準備好,然后拷貝到用戶地址空間,內核返回結果,用戶進程才從阻塞態進入就緒態
- Linux中默認情況下所有的socket都是阻塞的
- 當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那么它并不會block用戶進程,而是立刻返回一個error。
- 用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,于是它可以再次發送read操作
- 一旦kernel中的數據準備好了,并且又再次收到了用戶進程的system call,那么它馬上就將數據拷貝到了用戶內存,然后返回
- 非阻塞IO模式下用戶進程需要不斷地詢問內核的數據準備好了沒有
- 通過一種機制,一個進程可以監視多個文件描述符(套接字描述符)一旦某個文件描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作(這樣就不需要每個用戶進程不斷的詢問內核數據準備好了沒)
- 常用的IO多路復用方式有select、poll和epoll
- 內核文件描述符就緒后,通過信號通知用戶進程,用戶進程再通過系統調用讀取數據。
- 此方式屬于同步IO(實際讀取數據到用戶進程緩存的工作仍然是由用戶進程自己負責的)
- 用戶進程發起read操作之后,立刻就可以開始去做其它的事。內核收到一個異步IO read之后,會立刻返回,不會阻塞用戶進程。
- 內核會等待數據準備完成,然后將數據拷貝到用戶內存,當這一切都完成之后,內核會給用戶進程發送一個signal告訴它read操作完成了
4.Unix圖示
貼一下Unix編程里面的圖:
**非阻塞IO** **IO復用** **信號IO** **異步AIO**3.3.IO多路復用
開始之前咱們通過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False))
import time import socketdef select(socket_addr_list):for client_socket, client_addr in socket_addr_list:try:data = client_socket.recv(2048)if data:print(f"[來自{client_addr}的消息:]\n")print(data.decode("utf-8"))client_socket.send(b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>")else:# 沒有消息是觸發異常,空消息是斷開連接client_socket.close() # 關閉客戶端連接socket_addr_list.remove((client_socket, client_addr))print(f"[客戶端{client_addr}已斷開連接,當前連接數:{len(socket_addr_list)}]")except Exception:passdef main():# 存放客戶端集合socket_addr_list = list()with socket.socket() as tcp_server:# 防止端口綁定的設置tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)tcp_server.bind(('', 8080))tcp_server.listen()tcp_server.setblocking(False) # 服務端非阻塞while True:try:client_socket, client_addr = tcp_server.accept()client_socket.setblocking(False) # 客戶端非阻塞socket_addr_list.append((client_socket, client_addr))except Exception:passelse:print(f"[來自{client_addr}的連接,當前連接數:{len(socket_addr_list)}]")# 防止客戶端斷開后出錯if socket_addr_list:# 輪詢查看客戶端有沒有消息select(socket_addr_list) # 引用傳參time.sleep(0.01)if __name__ == "__main__":main()輸出:
可以思考下:
- PS:一個線程里面只能有一個死循環,現在程序需要兩個死循環,so ==> 放一起咯
- PS:沒有消息是觸發異常,空消息是斷開連接
- PS:dict在循環的過程中,del會引發異常
1.Select
select和上面的有點類似,就是輪詢的過程交給了操作系統:
kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程
來個和上面等同的案例:
import select import socketdef main():with socket.socket() as tcp_server:tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)tcp_server.bind(('', 8080))tcp_server.listen()socket_info_dict = dict()socket_list = [tcp_server] # 監測列表while True:# 劣勢:select列表數量有限制read_list, write_list, error_list = select.select(socket_list, [], [])for item in read_list:# 服務端迎接新的連接if item == tcp_server:client_socket, client_address = item.accept()socket_list.append(client_socket)socket_info_dict[client_socket] = client_addressprint(f"[{client_address}已連接,當前連接數:{len(socket_list)-1}]")# 客戶端發來else:data = item.recv(2048)if data:print(data.decode("utf-8"))item.send(b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>")else:item.close()socket_list.remove(item)info = socket_info_dict[item]print(f"[{info}已斷開,當前連接數:{len(socket_list)-1}]")if __name__ == "__main__":main()輸出和上面一樣
擴展說明:
select 函數監視的文件描述符分3類,分別是writefds、readfds、和exceptfds。調用后select函數會阻塞,直到有描述符就緒函數返回(有數據可讀、可寫、或者有except)或者超時(timeout指定等待時間,如果立即返回設為null即可)
select的一個缺點在于單個進程能夠監視的文件描述符的數量存在最大限制,在Linux上一般為1024(64位=>2048)
然后Poll就出現了,就是把上限給去掉了,本質并沒變,還是使用的輪詢
2.EPoll
epoll在內核2.6中提出(Linux獨有),使用一個文件描述符管理多個描述符,將用戶關心的文件描述符的事件存放到內核的一個事件表中,采用監聽回調的機制,這樣在用戶空間和內核空間的copy只需一次,避免再次遍歷就緒的文件描述符列表
先來看個案例吧:(輸出和上面一樣)
import socket import selectdef main():with socket.socket() as tcp_server:tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)tcp_server.bind(('', 8080))tcp_server.listen()# epoll是linux獨有的epoll = select.epoll()# tcp_server注冊到epoll中epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)# key-valuefd_socket_dict = dict()# 回調需要自己處理while True:# 返回可讀寫的socket fd 集合poll_list = epoll.poll()for fd, event in poll_list:# 服務器的socketif fd == tcp_server.fileno():client_socket, client_addr = tcp_server.accept()fd = client_socket.fileno()fd_socket_dict[fd] = (client_socket, client_addr)# 把客戶端注冊進epoll中epoll.register(fd, select.EPOLLIN | select.EPOLLET)else: # 客戶端client_socket, client_addr = fd_socket_dict[fd]data = client_socket.recv(2048)print(f"[來自{client_addr}的消息,當前連接數:{len(fd_socket_dict)}]\n")if data:print(data.decode("utf-8"))client_socket.send(b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>")else:del fd_socket_dict[fd]print(f"[{client_addr}已離線,當前連接數:{len(fd_socket_dict)}]\n")# 從epoll中注銷epoll.unregister(fd)client_socket.close()if __name__ == "__main__":main()擴展:epoll的兩種工作模式
LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序可以不立即處理該事件。下次調用epoll_wait時,會再次響應應用程序并通知此事件。LT模式是默認的工作模式。
LT模式同時支持阻塞和非阻塞socket。
ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程序,應用程序必須立即處理該事件。如果不處理,下次調用epoll_wait時,不會再次響應應用程序并通知此事件。
ET是高速工作方式,只支持非阻塞socket(ET模式減少了epoll事件被重復觸發的次數,因此效率要比LT模式高)
Code提煉一下:
PS:epoll不一定比Select性能高,一般都是分場景的:
其實IO多路復用還有一個kqueue,和epoll類似,下面的通用寫法中有包含
3.通用寫法(Selector)
一般來說:Linux下使用epoll,Win下使用select(IO多路復用會這個通用的即可)
先看看Python源代碼:
# 選擇級別:epoll|kqueue|devpoll > poll > select if 'KqueueSelector' in globals():DefaultSelector = KqueueSelector elif 'EpollSelector' in globals():DefaultSelector = EpollSelector elif 'DevpollSelector' in globals():DefaultSelector = DevpollSelector elif 'PollSelector' in globals():DefaultSelector = PollSelector else:DefaultSelector = SelectSelector實戰案例:(可讀和可寫可以不分開)
import socket import selectors# Linux下使用epoll,Win下使用select Selector = selectors.DefaultSelector()class Task(object):def __init__(self):# 存放客戶端fd和socket鍵值對self.fd_socket_dict = dict()def run(self):self.server = socket.socket()self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)self.server.bind(('', 8080))self.server.listen()# 把Server注冊到epollSelector.register(self.server.fileno(), selectors.EVENT_READ,self.connected)def connected(self, key):"""客戶端連接時處理"""client_socket, client_address = self.server.accept()fd = client_socket.fileno()self.fd_socket_dict[fd] = (client_socket, client_address)# 注冊一個客戶端讀的事件(服務端去讀消息)Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)print(f"{client_address}已連接,當前連接數:{len(self.fd_socket_dict)}")def call_back_reads(self, key):"""客戶端可讀時處理"""# 一個fd只能注冊一次,監測可寫的時候需要把可讀給注銷Selector.unregister(key.fd)client_socket, client_address = self.fd_socket_dict[key.fd]print(f"[來自{client_address}的消息:]\n")data = client_socket.recv(2048)if data:print(data.decode("utf-8"))# 注冊一個客戶端寫的事件(服務端去發消息)Selector.register(key.fd, selectors.EVENT_WRITE,self.call_back_writes)else:client_socket.close()del self.fd_socket_dict[key.fd]print(f"{client_address}已斷開,當前連接數:{len(self.fd_socket_dict)}")def call_back_writes(self, key):"""客戶端可寫時處理"""Selector.unregister(key.fd)client_socket, client_address = self.fd_socket_dict[key.fd]client_socket.send(b"ok")Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)def main():t = Task()t.run()while True:ready = Selector.select()for key, obj in ready:# 需要自己回調call_back = key.datacall_back(key)if __name__ == "__main__":main()Code提煉一下:
- Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
- Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
業余拓展:
select, iocp, epoll,kqueue及各種I/O復用機制 https://blog.csdn.net/shallwake/article/details/5265287kqueue用法簡介 http://www.cnblogs.com/luminocean/p/5631336.html下級預估:協程篇 or 網絡深入篇
轉載于:https://www.cnblogs.com/dunitian/p/10099343.html
總結
以上是生活随笔為你收集整理的【经典】5种IO模型 | IO多路复用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: jQuery函数attr()和prop(
- 下一篇: 前端基础!!!前端三层?