经典|深入理解 Go高性能网络编程模型
導言
用戶空間與內核空間
I/O 模型
Non-blocking I/O
I/O 多路復用
select & poll
epoll
Go netpoller 核心
數據結構
netFD
pollDesc
實現原理
net.Listen
Listener.Accept()
Conn.Read/Conn.Write
pollDesc.waitRead/pollDesc.waitWrite
netpoll
Go netpoller 的價值
Goroutine 的調度
Go netpoller 的問題
Reactor 網絡模型
gnet
🚀 功能
參考&延伸閱讀
導言
Go 基于 I/O multiplexing 和 goroutine scheduler 構建了一個簡潔而高性能的原生網絡模型(基于 Go 的 I/O 多路復用 netpoller ),提供了 goroutine-per-connection 這樣簡單的網絡編程模式。在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發者編寫網絡應用時的心智負擔,且借助于 Go runtime scheduler 對 goroutines 的高效調度,這個原生網絡模型不論從適用性還是性能上都足以滿足絕大部分的應用場景。
然而,在工程性上能做到如此高的普適性和兼容性,最終暴露給開發者提供接口/模式如此簡潔,其底層必然是基于非常復雜的封裝,做了很多取舍,也有可能放棄了一些追求極致性能的設計和理念。事實上 Go netpoller 底層就是基于 epoll/kqueue/iocp 這些 I/O 多路復用技術來做封裝的,最終暴露出 goroutine-per-connection 這樣的極簡的開發模式給使用者。
Go netpoller 在不同的操作系統,其底層使用的 I/O 多路復用技術也不一樣,可以從 Go 源碼目錄結構和對應代碼文件了解 Go 在不同平臺下的網絡 I/O 模式的實現。比如,在 Linux 系統下基于 epoll,freeBSD 系統下基于 kqueue,以及 Windows 系統下基于 iocp。
本文將基于 Linux 平臺來解析 Go netpoller 之 I/O 多路復用的底層是如何基于 epoll 封裝實現的,從源碼層層推進,全面而深度地解析 Go netpoller 的設計理念和實現原理,以及 Go 是如何利用 netpoller 來構建它的原生網絡模型的。主要涉及到的一些概念:I/O 模型、用戶/內核空間、epoll、Linux 源碼、goroutine scheduler 等等,我會盡量簡單地講解,如果有對相關概念不熟悉的同學,還是希望能提前熟悉一下。
用戶空間與內核空間
現代操作系統都是采用虛擬存儲器,那么對 32 位操作系統而言,它的尋址空間(虛擬存儲空間)為 4G(2 的 32 次方)。操作系統的核心是內核,獨立于普通的應用程序,可以訪問受保護的內存空間,也有訪問底層硬件設備的所有權限。為了保證用戶進程不能直接操作內核(kernel),保證內核的安全,操心系統將虛擬空間劃分為兩部分,一部分為內核空間,一部分為用戶空間。針對 Linux 操作系統而言,將最高的 1G 字節(從虛擬地址 0xC0000000 到 0xFFFFFFFF),供內核使用,稱為內核空間,而將較低的 3G 字節(從虛擬地址 0x00000000 到 0xBFFFFFFF),供各個進程使用,稱為用戶空間。
現代的網絡服務的主流已經完成從 CPU 密集型到 IO 密集型的轉變,所以服務端程序對 I/O 的處理必不可少,而一旦操作 I/O 則必定要在用戶態和內核態之間來回切換。
I/O 模型
在神作《UNIX 網絡編程》里,總結歸納了 5 種 I/O 模型,包括同步和異步 I/O:
阻塞 I/O (Blocking I/O)
非阻塞 I/O (Nonblocking I/O)
I/O 多路復用 (I/O multiplexing)
信號驅動 I/O (Signal driven I/O)
異步 I/O (Asynchronous I/O)
操作系統上的 I/O 是用戶空間和內核空間的數據交互,因此 I/O 操作通常包含以下兩個步驟:
等待網絡數據到達網卡(讀就緒)/等待網卡可寫(寫就緒) –> 讀取/寫入到內核緩沖區
從內核緩沖區復制數據 –> 用戶空間(讀)/從用戶空間復制數據 -> 內核緩沖區(寫)
而判定一個 I/O 模型是同步還是異步,主要看第二步:數據在用戶和內核空間之間復制的時候是不是會阻塞當前進程,如果會,則是同步 I/O,否則,就是異步 I/O。基于這個原則,這 5 種 I/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。
這 5 種 I/O 模型的對比如下:
Non-blocking I/O
什么叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻返回而不會阻塞當前用戶進程。I/O 多路復用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會產生意想不到的問題。比如,epoll 的 ET(邊緣觸發) 模式下,如果不使用非阻塞 I/O,有極大的概率會導致阻塞 event-loop 線程,從而降低吞吐量,甚至導致 bug。
Linux 下,我們可以通過 fcntl 系統調用來設置 O_NONBLOCK 標志位,從而把 socket 設置成 Non-blocking。當對一個 Non-blocking socket 執行讀操作時,流程是這個樣子:
當用戶進程發出 read 操作時,如果 kernel 中的數據還沒有準備好,那么它并不會 block 用戶進程,而是立刻返回一個 EAGAIN error。從用戶進程角度講 ,它發起一個 read 操作后,并不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個 error 時,它就知道數據還沒有準備好,于是它可以再次發送 read 操作。一旦 kernel 中的數據準備好了,并且又再次收到了用戶進程的 system call,那么它馬上就將數據拷貝到了用戶內存,然后返回。
「所以,Non-blocking I/O 的特點是用戶進程需要不斷的主動詢問 kernel 數據好了沒有。下一節我們要講的 I/O 多路復用需要和 Non-blocking I/O 配合才能發揮出最大的威力!」
I/O 多路復用
「所謂 I/O 多路復用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時監聽多個文件描述符(I/O 事件),阻塞等待,并在其中某個文件描述符可讀寫時收到通知。I/O 復用其實復用的不是 I/O 連接,而是復用線程,讓一個 thread of control 能夠處理多個連接(I/O 事件)?!?/strong>
select & poll
#include?<sys/select.h>/*?According?to?earlier?standards?*/ #include?<sys/time.h> #include?<sys/types.h> #include?<unistd.h>int?select(int?nfds,?fd_set?*readfds,?fd_set?*writefds,?fd_set?*exceptfds,?struct?timeval?*timeout);//?和 select 緊密結合的四個宏: void?FD_CLR(int?fd,?fd_set?*set); int?FD_ISSET(int?fd,?fd_set?*set); void?FD_SET(int?fd,?fd_set?*set); void?FD_ZERO(fd_set?*set);select 是 epoll 之前 Linux 使用的 I/O 事件驅動技術。
理解 select 的關鍵在于理解 fd_set,為說明方便,取 fd_set 長度為 1 字節,fd_set 中的每一 bit 可以對應一個文件描述符 fd,則 1 字節長的 fd_set 最大可以對應 8 個 fd。select 的調用過程如下:
執行 FD_ZERO(&set), 則 set 用位表示是 0000,0000
若 fd=5, 執行 FD_SET(fd, &set); 后 set 變為 0001,0000(第 5 位置為 1)
再加入 fd=2, fd=1,則 set 變為 0001,0011
執行 select(6, &set, 0, 0, 0) 阻塞等待
若 fd=1, fd=2 上都發生可讀事件,則 select 返回,此時 set 變為 0000,0011 (注意:沒有事件發生的 fd=5 被清空)
基于上面的調用過程,可以得出 select 的特點:
可監控的文件描述符個數取決于 sizeof(fd_set) 的值。假設服務器上 sizeof(fd_set)=512,每 bit 表示一個文件描述符,則服務器上支持的最大文件描述符是 512*8=4096。fd_set 的大小調整可參考 【原創】技術系列之 網絡模型(二) 中的模型 2,可以有效突破 select 可監控的文件描述符上限
將 fd 加入 select 監控集的同時,還要再使用一個數據結構 array 保存放到 select 監控集中的 fd,一是用于在 select 返回后,array 作為源數據和 fd_set 進行 FD_ISSET 判斷。二是 select 返回后會把以前加入的但并無事件發生的 fd 清空,則每次開始 select 前都要重新從 array 取得 fd 逐一加入(FD_ZERO 最先),掃描 array 的同時取得 fd 最大值 maxfd,用于 select 的第一個參數
可見 select 模型必須在 select 前循環 array(加 fd,取 maxfd),select 返回后循環 array(FD_ISSET 判斷是否有事件發生)
所以,select 有如下的缺點:
最大并發數限制:使用 32 個整數的 32 位,即 32*32=1024 來標識 fd,雖然可修改,但是有以下第 2, 3 點的瓶頸
每次調用 select,都需要把 fd 集合從用戶態拷貝到內核態,這個開銷在 fd 很多時會很大
性能衰減嚴重:每次 kernel 都需要線性掃描整個 fd_set,所以隨著監控的描述符 fd 數量增長,其 I/O 性能會線性下降
poll 的實現和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 結構而不是 select 的 fd_set 結構,poll 解決了最大文件描述符數量限制的問題,但是同樣需要從用戶態拷貝所有的 fd 到內核態,也需要線性遍歷所有的 fd 集合,所以它和 select 只是實現細節上的區分,并沒有本質上的區別。
epoll
epoll 是 Linux kernel 2.6 之后引入的新 I/O 事件驅動技術,I/O 多路復用的核心設計是 1 個線程處理所有連接的 等待消息準備好 I/O 事件,這一點上 epoll 和 select&poll 是大同小異的。但 select&poll 錯誤預估了一件事,當數十萬并發連接存在時,可能每一毫秒只有數百個活躍的連接,同時其余數十萬連接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:返回的活躍連接 == select(全部待監控的連接) 。
什么時候會調用 select&poll 呢?在你認為需要找出有報文到達的活躍連接時,就應該調用。所以,select&poll 在高并發時是會被頻繁調用的。這樣,這個頻繁調用的方法就很有必要看看它是否有效率,因為,它的輕微效率損失都會被 高頻 二字所放大。它有效率損失嗎?顯而易見,全部待監控連接是數以十萬計的,返回的只是數百個活躍連接,這本身就是無效率的表現。被放大后就會發現,處理并發上萬個連接時,select&poll 就完全力不從心了。這個時候就該 epoll 上場了,epoll 通過一些新的設計和優化,基本上解決了 select&poll 的問題。
epoll 的 API 非常簡潔,涉及到的只有 3 個系統調用:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?//?int?epoll_create1(int?flags); int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event); int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);其中,epoll_create 創建一個 epoll 實例并返回 epollfd;epoll_ctl 注冊 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 實例上;epoll_wait 則是阻塞監聽 epoll 實例上所有的 file descriptor 的 I/O 事件,它接收一個用戶空間上的一塊內存地址 (events 數組),kernel 會在有 I/O 事件發生的時候把文件描述符列表復制到這塊內存地址上,然后 epoll_wait 解除阻塞并返回,最后用戶空間上的程序就可以對相應的 fd 進行讀寫了:
#include?<unistd.h> ssize_t?read(int?fd,?void?*buf,?size_t?count); ssize_t?write(int?fd,?const?void?*buf,?size_t?count);epoll 的工作原理如下:
與 select&poll 相比,epoll 分清了高頻調用和低頻調用。例如,epoll_ctl 相對來說就是非頻繁調用的,而 epoll_wait 則是會被高頻調用的。所以 epoll 利用 epoll_ctl 來插入或者刪除一個 fd,實現用戶態到內核態的數據拷貝,這確保了每一個 fd 在其生命周期只需要被拷貝一次,而不是每次調用 epoll_wait 的時候都拷貝一次。epoll_wait 則被設計成幾乎沒有入參的調用,相比 select&poll 需要把全部監聽的 fd 集合從用戶態拷貝至內核態的做法,epoll 的效率就高出了一大截。
在實現上 epoll 采用紅黑樹來存儲所有監聽的 fd,而紅黑樹本身插入和刪除性能比較穩定,時間復雜度 O(logN)。通過 epoll_ctl 函數添加進來的 fd 都會被放在紅黑樹的某個節點內,所以,重復添加是沒有用的。當把 fd 添加進來的時候時候會完成關鍵的一步:該 fd 會與相應的設備(網卡)驅動程序建立回調關系,也就是在內核中斷處理程序為它注冊一個回調函數,在 fd 相應的事件觸發(中斷)之后(設備就緒了),內核就會調用這個回調函數,該回調函數在內核中被稱為:ep_poll_callback ,「這個回調函數其實就是把這個 fd 添加到 rdllist 這個雙向鏈表(就緒鏈表)中」。epoll_wait 實際上就是去檢查 rdllist 雙向鏈表中是否有就緒的 fd,當 rdllist 為空(無就緒 fd)時掛起當前進程,直到 rdllist 非空時進程才被喚醒并返回。
相比于 select&poll 調用時會將全部監聽的 fd 從用戶態空間拷貝至內核態空間并線性掃描一遍找出就緒的 fd 再返回到用戶態,epoll_wait 則是直接返回已就緒 fd,因此 epoll 的 I/O 性能不會像 select&poll 那樣隨著監聽的 fd 數量增加而出現線性衰減,是一個非常高效的 I/O 事件驅動技術。
「由于使用 epoll 的 I/O 多路復用需要用戶進程自己負責 I/O 讀寫,從用戶進程的角度看,讀寫過程是阻塞的,所以 select&poll&epoll 本質上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,只需要在調用 WSARecv 或 WSASend 方法讀寫數據的時候把用戶空間的內存 buffer 提交給 kernel,kernel 負責數據在用戶空間和內核空間拷貝,完成之后就會通知用戶進程,整個過程不需要用戶進程參與,所以是真正的異步 I/O?!?/strong>
延伸
另外,我看到有些文章說 epoll 之所以性能高是因為利用了 Linux 的 mmap 內存映射讓內核和用戶進程共享了一片物理內存,用來存放就緒 fd 列表和它們的數據 buffer,所以用戶進程在 epoll_wait 返回之后用戶進程就可以直接從共享內存那里讀取/寫入數據了,這讓我很疑惑,因為首先看 epoll_wait 的函數聲明:
int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);第二個參數:就緒事件列表,是需要在用戶空間分配內存然后再傳給 epoll_wait 的,如果內核會用 mmap 設置共享內存,直接傳遞一個指針進去就行了,根本不需要在用戶態分配內存,多此一舉。其次,內核和用戶進程通過 mmap 共享內存是一件極度危險的事情,內核無法確定這塊共享內存什么時候會被回收,而且這樣也會賦予用戶進程直接操作內核數據的權限和入口,非常容易出現大的系統漏洞,因此一般極少會這么做。所以我很懷疑 epoll 是不是真的在 Linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 Linux kernel 源碼:
/**?Implement?the?event?wait?interface?for?the?eventpoll?file.?It?is?the?kernel*?part?of?the?user?space?epoll_wait(2).*/ static?int?do_epoll_wait(int?epfd,?struct?epoll_event?__user?*events,int?maxevents,?int?timeout) {.../*?Time?to?fish?for?events?...?*/error?=?ep_poll(ep,?events,?maxevents,?timeout); }//?如果?epoll_wait?入參時設定?timeout?==?0,?那么直接通過?ep_events_available?判斷當前是否有用戶感興趣的事件發生,如果有則通過?ep_send_events?進行處理 //?如果設置 timeout >?0,并且當前沒有用戶關注的事件發生,則進行休眠,并添加到 ep->wq 等待隊列的頭部;對等待事件描述符設置 WQ_FLAG_EXCLUSIVE 標志 //?ep_poll?被事件喚醒后會重新檢查是否有關注事件,如果對應的事件已經被搶走,那么?ep_poll?會繼續休眠等待 static?int?ep_poll(struct?eventpoll?*ep,?struct?epoll_event?__user?*events,?int?maxevents,?long?timeout) {...send_events:/**?Try?to?transfer?events?to?user?space.?In?case?we?get?0?events?and*?there's?still?timeout?left?over,?we?go?trying?again?in?search?of*?more?luck.*///?如果一切正常,?有?event?發生,?就開始準備數據?copy?給用戶空間了//?如果有就緒的事件發生,那么就調用?ep_send_events?將就緒的事件?copy?到用戶態內存中,//?然后返回到用戶態,否則判斷是否超時,如果沒有超時就繼續等待就緒事件發生,如果超時就返回用戶態。//?從?ep_poll?函數的實現可以看到,如果有就緒事件發生,則調用?ep_send_events?函數做進一步處理if?(!res?&&?eavail?&&!(res?=?ep_send_events(ep,?events,?maxevents))?&&?!timed_out)goto?fetch_events;... }//?ep_send_events?函數是用來向用戶空間拷貝就緒?fd?列表的,它將用戶傳入的就緒?fd?列表內存簡單封裝到 // ep_send_events_data 結構中,然后調用 ep_scan_ready_list 將就緒隊列中的事件寫入用戶空間的內存; //?用戶進程就可以訪問到這些數據進行處理 static?int?ep_send_events(struct?eventpoll?*ep,struct?epoll_event?__user?*events,?int?maxevents) {struct?ep_send_events_data?esed;esed.maxevents?=?maxevents;esed.events?=?events;//?調用?ep_scan_ready_list?函數檢查?epoll?實例?eventpoll?中的?rdllist?就緒鏈表,//?并注冊一個回調函數?ep_send_events_proc,如果有就緒?fd,則調用?ep_send_events_proc?進行處理ep_scan_ready_list(ep,?ep_send_events_proc,?&esed,?0,?false);return?esed.res; }//?調用?ep_scan_ready_list?的時候會傳遞指向?ep_send_events_proc?函數的函數指針作為回調函數, //?一旦有就緒?fd,就會調用?ep_send_events_proc?函數 static?__poll_t?ep_send_events_proc(struct?eventpoll?*ep,?struct?list_head?*head,?void?*priv) {.../**?If?the?event?mask?intersect?the?caller-requested?one,*?deliver?the?event?to?userspace.?Again,?ep_scan_ready_list()*?is?holding?ep->mtx,?so?no?operations?coming?from?userspace*?can?change?the?item.*/revents?=?ep_item_poll(epi,?&pt,?1);//?如果?revents?為?0,說明沒有就緒的事件,跳過,否則就將就緒事件拷貝到用戶態內存中if?(!revents)continue;//?將當前就緒的事件和用戶進程傳入的數據都通過?__put_user?拷貝回用戶空間,//?也就是調用?epoll_wait?之時用戶進程傳入的?fd?列表的內存if?(__put_user(revents,?&uevent->events)?||?__put_user(epi->event.data,?&uevent->data))?{list_add(&epi->rdllink,?head);ep_pm_stay_awake(epi);if?(!esed->res)esed->res?=?-EFAULT;return?0;}... }從 do_epoll_wait 開始層層跳轉,我們可以很清楚地看到最后內核是通過 __put_user 函數把就緒 fd 列表和事件返回到用戶空間,而 __put_user 正是內核用來拷貝數據到用戶空間的標準函數。此外,我并沒有在 Linux kernel 的源碼中和 epoll 相關的代碼里找到 mmap 系統調用做內存映射的邏輯,所以基本可以得出結論:epoll 在 Linux kernel 里并沒有使用 mmap 來做用戶空間和內核空間的內存共享,所以那些說 epoll 使用了 mmap 的文章都是誤解。
Go netpoller 核心
「Go netpoller 基本原理」
?Go netpoller 通過在底層對 epoll/kqueue/iocp 的封裝,從而實現了使用同步編程模式達到異步執行的效果??偨Y來說,所有的網絡操作都以網絡描述符 netFD 為中心實現。netFD 與底層 PollDesc 結構綁定,當在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當前 goroutine 存儲到這個 netFD 對應的 PollDesc 中,同時調用 gopark 把當前 goroutine 給 park 住,直到這個 netFD 上再次發生讀寫事件,才將此 goroutine 給 ready 激活重新運行。顯然,在底層通知 goroutine 再次發生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅動機制。
?總所周知,Go 是一門跨平臺的編程語言,而不同平臺針對特定的功能有不用的實現,這當然也包括了 I/O 多路復用技術,比如 Linux 里的 I/O 多路復用有 select、poll 和 epoll,而 freeBSD 或者 MacOS 里則是 kqueue,而 Windows 里則是基于異步 I/O 實現的 iocp,等等;因此,Go 為了實現底層 I/O 多路復用的跨平臺,分別基于上述的這些不同平臺的系統調用實現了多版本的 netpollers,具體的源碼路徑如下:
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
本文的解析基于 epoll 版本,如果讀者對其他平臺的 netpoller 底層實現感興趣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實現版本的機制和原理基本類似,所以了解了 epoll 版本的實現后再去學習其他版本實現應該沒什么障礙。
接下來讓我們通過分析最新的 Go 源碼(v1.15.3),全面剖析一下整個 Go netpoller 的運行機制和流程。
數據結構
netFD
net.Listen("tcp", ":8888") 方法返回了一個 *TCPListener,它是一個實現了 net.Listener 接口的 struct,而通過 listener.Accept() 接收的新連接 *TCPConn 則是一個實現了 net.Conn 接口的 struct,它內嵌了 net.conn struct。仔細閱讀上面的源碼可以發現,不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基于一個 netFD 的數據結構的操作, netFD 是一個網絡描述符,類似于 Linux 的文件描述符的概念,netFD 中包含一個 poll.FD 數據結構,而 poll.FD 中包含兩個重要的數據結構 Sysfd 和 pollDesc,前者是真正的系統文件描述符,后者對是底層事件驅動的封裝,所有的讀寫超時等操作都是通過調用后者的對應方法實現的。
netFD 和 poll.FD 的源碼:
//?Network?file?descriptor. type?netFD?struct?{pfd?poll.FD//?immutable?until?Closefamily??????intsotype??????intisConnected?bool?//?handshake?completed?or?use?of?association?with?peernet?????????stringladdr???????Addrraddr???????Addr }//?FD?is?a?file?descriptor.?The?net?and?os?packages?use?this?type?as?a //?field?of?a?larger?type?representing?a?network?connection?or?OS?file. type?FD?struct?{//?Lock?sysfd?and?serialize?access?to?Read?and?Write?methods.fdmu?fdMutex//?System?file?descriptor.?Immutable?until?Close.Sysfd?int//?I/O?poller.pd?pollDesc//?Writev?cache.iovecs?*[]syscall.Iovec//?Semaphore?signaled?when?file?is?closed.csema?uint32//?Non-zero?if?this?file?has?been?set?to?blocking?mode.isBlocking?uint32//?Whether?this?is?a?streaming?descriptor,?as?opposed?to?a//?packet-based?descriptor?like?a?UDP?socket.?Immutable.IsStream?bool//?Whether?a?zero?byte?read?indicates?EOF.?This?is?false?for?a//?message?based?socket?connection.ZeroReadIsEOF?bool//?Whether?this?is?a?file?rather?than?a?network?socket.isFile?bool }pollDesc
前面提到了 pollDesc 是底層事件驅動的封裝,netFD 通過它來完成各種 I/O 相關的操作,它的定義如下:
type?pollDesc?struct?{runtimeCtx?uintptr }這里的 struct 只包含了一個指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在 runtime.pollDesc 這里:
func?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }//?Network?poller?descriptor. // //?No?heap?pointers. // //go:notinheap type?pollDesc?struct?{link?*pollDesc?//?in?pollcache,?protected?by?pollcache.lock//?The?lock?protects?pollOpen,?pollSetDeadline,?pollUnblock?and?deadlineimpl?operations.//?This?fully?covers?seq,?rt?and?wt?variables.?fd?is?constant?throughout?the?PollDesc?lifetime.//?pollReset,?pollWait,?pollWaitCanceled?and?runtime·netpollready?(IO?readiness?notification)//?proceed?w/o?taking?the?lock.?So?closing,?everr,?rg,?rd,?wg?and?wd?are?manipulated//?in?a?lock-free?way?by?all?operations.//?NOTE(dvyukov):?the?following?code?uses?uintptr?to?store?*g?(rg/wg),//?that?will?blow?up?when?GC?starts?moving?objects.lock????mutex?//?protects?the?following?fieldsfd??????uintptrclosing?booleverr???bool????//?marks?event?scanning?error?happeneduser????uint32??//?user?settable?cookierseq????uintptr?//?protects?from?stale?read?timersrg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?read?or?nilrt??????timer???//?read?deadline?timer?(set?if?rt.f?!=?nil)rd??????int64???//?read?deadlinewseq????uintptr?//?protects?from?stale?write?timerswg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?write?or?nilwt??????timer???//?write?deadline?timerwd??????int64???//?write?deadline }這里重點關注里面的 rg 和 wg,這里兩個 uintptr "萬能指針"類型,取值分別可能是 pdReady、pdWait、等待 file descriptor 就緒的 goroutine 也就是 g 數據結構以及 nil,它們是實現喚醒 goroutine 的關鍵。
runtime.pollDesc 包含自身類型的一個指針,用來保存下一個 runtime.pollDesc 的地址,以此來實現鏈表,可以減少數據結構的大小,所有的 runtime.pollDesc 保存在 runtime.pollCache 結構中,定義如下:
type?pollCache?struct?{lock??mutexfirst?*pollDesc//?PollDesc?objects?must?be?type-stable,//?because?we?can?get?ready?notification?from?epoll/kqueue//?after?the?descriptor?is?closed/reused.//?Stale?notifications?are?detected?using?seq?variable,//?seq?is?incremented?when?deadlines?are?changed?or?descriptor?is?reused. }因為 runtime.pollCache 是一個在 runtime 包里的全局變量,因此需要用一個互斥鎖來避免 data race 問題,從它的名字也能看出這是一個用于緩存的數據結構,也就是用來提高性能的,具體如何實現呢?
const?pollBlockSize?=?4?*?1024func?(c?*pollCache)?alloc()?*pollDesc?{lock(&c.lock)if?c.first?==?nil?{const?pdSize?=?unsafe.Sizeof(pollDesc{})n?:=?pollBlockSize?/?pdSizeif?n?==?0?{n?=?1}//?Must?be?in?non-GC?memory?because?can?be?referenced//?only?from?epoll/kqueue?internals.mem?:=?persistentalloc(n*pdSize,?0,?&memstats.other_sys)for?i?:=?uintptr(0);?i?<?n;?i++?{pd?:=?(*pollDesc)(add(mem,?i*pdSize))pd.link?=?c.firstc.first?=?pd}}pd?:=?c.firstc.first?=?pd.linklockInit(&pd.lock,?lockRankPollDesc)unlock(&c.lock)return?pd }Go runtime 會在調用 poll_runtime_pollOpen 往 epoll 實例注冊 fd 之時首次調用 runtime.pollCache.alloc方法時批量初始化大小 4KB 的 runtime.pollDesc 結構體的鏈表,初始化過程中會調用 runtime.persistentalloc 來為這些數據結構分配不會被 GC 回收的內存,確保這些數據結構只能被 epoll和kqueue 在內核空間去引用。
再往后每次調用這個方法則會先判斷鏈表頭是否已經分配過值了,若是,則直接返回表頭這個 pollDesc,這種批量初始化數據進行緩存而后每次都直接從緩存取數據的方式是一種很常見的性能優化手段,在這里這種方式可以有效地提升 netpoller 的吞吐量。
Go runtime 會在關閉 pollDesc 之時調用 runtime.pollCache.free 釋放內存:
func?(c?*pollCache)?free(pd?*pollDesc)?{lock(&c.lock)pd.link?=?c.firstc.first?=?pdunlock(&c.lock) }實現原理
使用 Go 編寫一個典型的 TCP echo server:
package?mainimport?("log""net" )func?main()?{listen,?err?:=?net.Listen("tcp",?":8888")if?err?!=?nil?{log.Println("listen?error:?",?err)return}for?{conn,?err?:=?listen.Accept()if?err?!=?nil?{log.Println("accept?error:?",?err)break}//?start?a?new?goroutine?to?handle?the?new?connection.go?HandleConn(conn)} }func?HandleConn(conn?net.Conn)?{defer?conn.Close()packet?:=?make([]byte,?1024)for?{//?block?here?if?socket?is?not?available?for?reading?data.n,?err?:=?conn.Read(packet)if?err?!=?nil?{log.Println("read?socket?error:?",?err)return}//?same?as?above,?block?here?if?socket?is?not?available?for?writing._,?_?=?conn.Write(packet[:n])} }上面是一個基于 Go 原生網絡模型(基于 netpoller)編寫的一個 TCP server,模式是 goroutine-per-connection ,在這種模式下,開發者使用的是同步的模式去編寫異步的邏輯而且對于開發者來說 I/O 是否阻塞是無感知的,也就是說開發者無需考慮 goroutines 甚至更底層的線程、進程的調度和上下文切換。而 Go netpoller 最底層的事件驅動技術肯定是基于 epoll/kqueue/iocp 這一類的 I/O 事件驅動技術,只不過是把這些調度和上下文切換的工作轉移到了 runtime 的 Go scheduler,讓它來負責調度 goroutines,從而極大地降低了程序員的心智負擔!
Go 的這種同步模式的網絡服務器的基本架構通常如下:
上面的示例代碼中相關的在源碼里的幾個數據結構和方法:
//?TCPListener?is?a?TCP?network?listener.?Clients?should?typically //?use?variables?of?type?Listener?instead?of?assuming?TCP. type?TCPListener?struct?{fd?*netFDlc?ListenConfig }//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }//?TCPConn?is?an?implementation?of?the?Conn?interface?for?TCP?network //?connections. type?TCPConn?struct?{conn }//?Conn type?conn?struct?{fd?*netFD }type?conn?struct?{fd?*netFD }func?(c?*conn)?ok()?bool?{?return?c?!=?nil?&&?c.fd?!=?nil?}//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }//?Write?implements?the?Conn?Write?method. func?(c?*conn)?Write(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Write(b)if?err?!=?nil?{err?=?&OpError{Op:?"write",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }net.Listen
調用 net.Listen 之后,底層會通過 Linux 的系統調用 socket 方法創建一個 fd 分配給 listener,并用以來初始化 listener 的 netFD ,接著調用 netFD 的 listenStream 方法完成對 socket 的 bind&listen 操作以及對 netFD 的初始化(主要是對 netFD 里的 pollDesc 的初始化),調用鏈是 runtime.runtime_pollServerInit --> runtime.poll_runtime_pollServerInit --> runtime.netpollGenericInit,主要做的事情是:
調用 epollcreate1 創建一個 epoll 實例 epfd,作為整個 runtime 的唯一 event-loop 使用;
調用 runtime.nonblockingPipe 創建一個用于和 epoll 實例通信的管道,這里為什么不用更新且更輕量的 eventfd 呢?我個人猜測是為了兼容更多以及更老的系統版本;
將 netpollBreakRd 通知信號量封裝成 epollevent 事件結構體注冊進 epoll 實例。
相關源碼如下:
//?調用?linux?系統調用?socket?創建?listener?fd?并設置為為阻塞?I/O s,?err?:=?socketFunc(family,?sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC,?proto) //?On?Linux?the?SOCK_NONBLOCK?and?SOCK_CLOEXEC?flags?were //?introduced?in?2.6.27?kernel?and?on?FreeBSD?both?flags?were //?introduced?in?10?kernel.?If?we?get?an?EINVAL?error?on?Linux //?or?EPROTONOSUPPORT?error?on?FreeBSD,?fall?back?to?using //?socket?without?them.socketFunc????????func(int,?int,?int)?(int,?error)??=?syscall.Socket//?用上面創建的?listener?fd?初始化?listener?netFD if?fd,?err?=?newFD(s,?family,?sotype,?net);?err?!=?nil?{poll.CloseFunc(s)return?nil,?err }//?對?listener?fd?進行?bind&listen?操作,并且調用?init?方法完成初始化 func?(fd?*netFD)?listenStream(laddr?sockaddr,?backlog?int,?ctrlFn?func(string,?string,?syscall.RawConn)?error)?error?{...//?完成綁定操作if?err?=?syscall.Bind(fd.pfd.Sysfd,?lsa);?err?!=?nil?{return?os.NewSyscallError("bind",?err)}//?完成監聽操作if?err?=?listenFunc(fd.pfd.Sysfd,?backlog);?err?!=?nil?{return?os.NewSyscallError("listen",?err)}//?調用?init,內部會調用?poll.FD.Init,最后調用?pollDesc.initif?err?=?fd.init();?err?!=?nil?{return?err}lsa,?_?=?syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa),?nil)return?nil }//?使用?sync.Once?來確保一個?listener?只持有一個?epoll?實例 var?serverInit?sync.Once//?netFD.init?會調用?poll.FD.Init?并最終調用到?pollDesc.init, //?它會創建?epoll?實例并把?listener?fd?加入監聽隊列 func?(pd?*pollDesc)?init(fd?*FD)?error?{//?runtime_pollServerInit?通過?`go:linkname`?鏈接到具體的實現函數?poll_runtime_pollServerInit,//?接著再調用?netpollGenericInit,然后會根據不同的系統平臺去調用特定的?netpollinit?來創建?epoll?實例serverInit.Do(runtime_pollServerInit)//?runtime_pollOpen?內部調用了?netpollopen?來將?listener?fd?注冊到?//?epoll?實例中,另外,它會初始化一個?pollDesc?并返回ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}//?把真正初始化完成的?pollDesc?實例賦值給當前的?pollDesc?代表自身的指針,//?后續使用直接通過該指針操作pd.runtimeCtx?=?ctxreturn?nil }var?(//?全局唯一的?epoll?fd,只在?listener?fd?初始化之時被指定一次epfd?int32?=?-1?//?epoll?descriptor )//?netpollinit?會創建一個?epoll?實例,然后把?epoll?fd?賦值給?epfd, //?后續?listener?以及它?accept?的所有?sockets?有關?epoll?的操作都是基于這個全局的?epfd func?netpollinit()?{epfd?=?epollcreate1(_EPOLL_CLOEXEC)if?epfd?<?0?{epfd?=?epollcreate(1024)if?epfd?<?0?{println("runtime:?epollcreate?failed?with",?-epfd)throw("runtime:?netpollinit?failed")}closeonexec(epfd)}r,?w,?errno?:=?nonblockingPipe()if?errno?!=?0?{println("runtime:?pipe?failed?with",?-errno)throw("runtime:?pipe?failed")}ev?:=?epollevent{events:?_EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data))?=?&netpollBreakRderrno?=?epollctl(epfd,?_EPOLL_CTL_ADD,?r,?&ev)if?errno?!=?0?{println("runtime:?epollctl?failed?with",?-errno)throw("runtime:?epollctl?failed")}netpollBreakRd?=?uintptr(r)netpollBreakWr?=?uintptr(w) }//?netpollopen?會被?runtime_pollOpen?調用,注冊?fd?到?epoll?實例, //?注意這里使用的是?epoll?的?ET?模式,同時會利用萬能指針把?pollDesc?保存到?epollevent?的一個?8?位的字節數組?data?里 func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32?{var?ev?epolleventev.events?=?_EPOLLIN?|?_EPOLLOUT?|?_EPOLLRDHUP?|?_EPOLLET*(**pollDesc)(unsafe.Pointer(&ev.data))?=?pdreturn?-epollctl(epfd,?_EPOLL_CTL_ADD,?int32(fd),?&ev) }我們前面提到的 epoll 的三個基本調用,Go 在源碼里實現了對那三個調用的封裝:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?? int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);?? int?epoll_wait(int?epfd,?struct?epoll_event?*?events,?int?maxevents,?int?timeout);//?Go?對上面三個調用的封裝 func?netpollinit() func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32 func?netpoll(block?bool)?gListnetFD 就是通過這三個封裝來對 epoll 進行創建實例、注冊 fd 和等待事件操作的。
Listener.Accept()
netpoll accept socket 的工作流程如下:
服務端的 netFD 在 listen 時會創建 epoll 的實例,并將 listenerFD 加入 epoll 的事件隊列
netFD 在 accept 時將返回的 connFD 也加入 epoll 的事件隊列
netFD 在讀寫時出現 syscall.EAGAIN 錯誤,通過 pollDesc 的 waitRead 方法將當前的 goroutine park 住,直到 ready,從 pollDesc 的 waitRead 中返回
Listener.Accept() 接收來自客戶端的新連接,具體還是調用 netFD.accept 方法來完成這個功能:
//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }func?(fd?*netFD)?accept()?(netfd?*netFD,?err?error)?{//?調用?poll.FD?的?Accept?方法接受新的?socket?連接,返回?socket?的?fdd,?rsa,?errcall,?err?:=?fd.pfd.Accept()if?err?!=?nil?{if?errcall?!=?""?{err?=?wrapSyscallError(errcall,?err)}return?nil,?err}//?以?socket?fd?構造一個新的?netFD,代表這個新的?socketif?netfd,?err?=?newFD(d,?fd.family,?fd.sotype,?fd.net);?err?!=?nil?{poll.CloseFunc(d)return?nil,?err}//?調用?netFD?的?init?方法完成初始化if?err?=?netfd.init();?err?!=?nil?{fd.Close()return?nil,?err}lsa,?_?:=?syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa),?netfd.addrFunc()(rsa))return?netfd,?nil }netFD.accept 方法里會再調用 poll.FD.Accept ,最后會使用 Linux 的系統調用 accept 來完成新連接的接收,并且會把 accept 的 socket 設置成非阻塞 I/O 模式:
//?Accept?wraps?the?accept?network?call. func?(fd?*FD)?Accept()?(int,?syscall.Sockaddr,?string,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?-1,?nil,?"",?err}defer?fd.readUnlock()if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?-1,?nil,?"",?err}for?{//?使用?linux?系統調用?accept?接收新連接,創建對應的?sockets,?rsa,?errcall,?err?:=?accept(fd.Sysfd)//?因為?listener?fd?在創建的時候已經設置成非阻塞的了,//?所以 accept 方法會直接返回,不管有沒有新連接到來;如果 err == nil 則表示正常建立新連接,直接返回if?err?==?nil?{return?s,?rsa,?"",?err}//?如果?err?!=?nil,則判斷?err?==?syscall.EAGAIN,符合條件則進入?pollDesc.waitRead?方法switch?err?{case?syscall.EAGAIN:if?fd.pd.pollable()?{//?如果當前沒有發生期待的?I/O?事件,那么?waitRead?會通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}case?syscall.ECONNABORTED://?This?means?that?a?socket?on?the?listen//?queue?was?closed?before?we?Accept()ed?it;//?it's?a?silly?error,?so?try?again.continue}return?-1,?nil,?errcall,?err} }//?使用?linux?的?accept?系統調用接收新連接并把這個?socket?fd?設置成非阻塞?I/O ns,?sa,?err?:=?Accept4Func(s,?syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) //?On?Linux?the?accept4?system?call?was?introduced?in?2.6.28 //?kernel?and?on?FreeBSD?it?was?introduced?in?10?kernel.?If?we //?get?an?ENOSYS?error?on?both?Linux?and?FreeBSD,?or?EINVAL //?error?on?Linux,?fall?back?to?using?accept.//?Accept4Func?is?used?to?hook?the?accept4?call. var?Accept4Func?func(int,?int)?(int,?syscall.Sockaddr,?error)?=?syscall.Accept4pollDesc.waitRead 方法主要負責檢測當前這個 pollDesc 的上層 netFD 對應的 fd 是否有『期待的』I/O 事件發生,如果有就直接返回,否則就 park 住當前的 goroutine 并持續等待直至對應的 fd 上發生可讀/可寫或者其他『期待的』I/O 事件為止,然后它就會返回到外層的 for 循環,讓 goroutine 繼續執行邏輯。
poll.FD.Accept() 返回之后,會構造一個對應這個新 socket 的 netFD,然后調用 init() 方法完成初始化,這個 init 過程和前面 net.Listen() 是一樣的,調用鏈:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最終又會走到這里:
var?serverInit?sync.Oncefunc?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }然后把這個 socket fd 注冊到 listener 的 epoll 實例的事件隊列中去,等待 I/O 事件。
Conn.Read/Conn.Write
我們先來看看 Conn.Read 方法是如何實現的,原理其實和 Listener.Accept 是一樣的,具體調用鏈還是首先調用 conn 的 netFD.Read ,然后內部再調用 poll.FD.Read ,最后使用 Linux 的系統調用 read: syscall.Read 完成數據讀取:
//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }func?(fd?*netFD)?Read(p?[]byte)?(n?int,?err?error)?{n,?err?=?fd.pfd.Read(p)runtime.KeepAlive(fd)return?n,?wrapSyscallError("read",?err) }//?Read?implements?io.Reader. func?(fd?*FD)?Read(p?[]byte)?(int,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?0,?err}defer?fd.readUnlock()if?len(p)?==?0?{//?If?the?caller?wanted?a?zero?byte?read,?return?immediately//?without?trying?(but?after?acquiring?the?readLock).//?Otherwise?syscall.Read?returns?0,?nil?which?looks?like//?io.EOF.//?TODO(bradfitz):?make?it?wait?for?readability??(Issue?15735)return?0,?nil}if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?0,?err}if?fd.IsStream?&&?len(p)?>?maxRW?{p?=?p[:maxRW]}for?{//?嘗試從該?socket?讀取數據,因為?socket?在被?listener?accept?的時候設置成//?了非阻塞?I/O,所以這里同樣也是直接返回,不管有沒有可讀的數據n,?err?:=?syscall.Read(fd.Sysfd,?p)if?err?!=?nil?{n?=?0//?err?==?syscall.EAGAIN?表示當前沒有期待的?I/O?事件發生,也就是?socket?不可讀if?err?==?syscall.EAGAIN?&&?fd.pd.pollable()?{//?如果當前沒有發生期待的?I/O?事件,那么?waitRead?//?會通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}//?On?MacOS?we?can?see?EINTR?here?if?the?user//?pressed?^Z.??See?issue?#22838.if?runtime.GOOS?==?"darwin"?&&?err?==?syscall.EINTR?{continue}}err?=?fd.eofError(n,?err)return?n,?err} }conn.Write 和 conn.Read 的原理是一致的,它也是通過類似 pollDesc.waitRead 的 pollDesc.waitWrite 來 park 住 goroutine 直至期待的 I/O 事件發生才返回恢復執行。
pollDesc.waitRead/pollDesc.waitWrite
pollDesc.waitRead 內部調用了 poll.runtime_pollWait --> runtime.poll_runtime_pollWait 來達成無 I/O 事件時 park 住 goroutine 的目的:
//go:linkname?poll_runtime_pollWait?internal/poll.runtime_pollWait func?poll_runtime_pollWait(pd?*pollDesc,?mode?int)?int?{err?:=?netpollcheckerr(pd,?int32(mode))if?err?!=?pollNoError?{return?err}//?As?for?now?only?Solaris,?illumos,?and?AIX?use?level-triggered?IO.if?GOOS?==?"solaris"?||?GOOS?==?"illumos"?||?GOOS?==?"aix"?{netpollarm(pd,?mode)}//?進入?netpollblock?并且判斷是否有期待的?I/O?事件發生,//?這里的?for?循環是為了一直等到?io?readyfor?!netpollblock(pd,?int32(mode),?false)?{err?=?netpollcheckerr(pd,?int32(mode))if?err?!=?0?{return?err}//?Can?happen?if?timeout?has?fired?and?unblocked?us,//?but?before?we?had?a?chance?to?run,?timeout?has?been?reset.//?Pretend?it?has?not?happened?and?retry.}return?0 }//?returns?true?if?IO?is?ready,?or?false?if?timedout?or?closed //?waitio?-?wait?only?for?completed?IO,?ignore?errors func?netpollblock(pd?*pollDesc,?mode?int32,?waitio?bool)?bool?{//?gpp?保存的是?goroutine?的數據結構?g,這里會根據?mode?的值決定是?rg?還是?wg,//?前面提到過,rg?和?wg?是用來保存等待?I/O?就緒的?gorouine?的,后面調用?gopark?之后,//?會把當前的?goroutine?的抽象數據結構?g?存入?gpp?這個指針,也就是?rg?或者?wggpp?:=?&pd.rgif?mode?==?'w'?{gpp?=?&pd.wg}//?set?the?gpp?semaphore?to?WAIT//?這個?for?循環是為了等待?io?ready?或者?io?waitfor?{old?:=?*gpp//?gpp?==?pdReady?表示此時已有期待的?I/O?事件發生,//?可以直接返回?unblock?當前?goroutine?并執行響應的?I/O?操作if?old?==?pdReady?{*gpp?=?0return?true}if?old?!=?0?{throw("runtime:?double?wait")}//?如果沒有期待的?I/O?事件發生,則通過原子操作把?gpp?的值置為?pdWait?并退出?for?循環if?atomic.Casuintptr(gpp,?0,?pdWait)?{break}}//?need?to?recheck?error?states?after?setting?gpp?to?WAIT//?this?is?necessary?because?runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl//?do?the?opposite:?store?to?closing/rd/wd,?membarrier,?load?of?rg/wg//?waitio?此時是?false,netpollcheckerr?方法會檢查當前?pollDesc?對應的?fd?是否是正常的,//?通常來說??netpollcheckerr(pd,?mode)?==?0?是成立的,所以這里會執行?gopark?//?把當前?goroutine?給?park?住,直至對應的?fd?上發生可讀/可寫或者其他『期待的』I/O?事件為止,//?然后?unpark?返回,在?gopark?內部會把當前?goroutine?的抽象數據結構?g?存入//?gpp(pollDesc.rg/pollDesc.wg)?指針里,以便在后面的?netpoll?函數取出?pollDesc?之后,//?把?g?添加到鏈表里返回,接著重新調度?goroutineif?waitio?||?netpollcheckerr(pd,?mode)?==?0?{//?注冊?netpollblockcommit?回調給?gopark,在?gopark?內部會執行它,保存當前?goroutine?到?gppgopark(netpollblockcommit,?unsafe.Pointer(gpp),?waitReasonIOWait,?traceEvGoBlockNet,?5)}//?be?careful?to?not?lose?concurrent?READY?notificationold?:=?atomic.Xchguintptr(gpp,?0)if?old?>?pdWait?{throw("runtime:?corrupted?polldesc")}return?old?==?pdReady }//?gopark?會停住當前的?goroutine?并且調用傳遞進來的回調函數?unlockf,從上面的源碼我們可以知道這個函數是 //?netpollblockcommit func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.//?gopark?最終會調用?park_m,在這個函數內部會調用?unlockf,也就是?netpollblockcommit,//?然后會把當前的?goroutine,也就是?g?數據結構保存到?pollDesc?的?rg?或者?wg?指針里mcall(park_m) }//?park?continuation?on?g0. func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{//?調用?netpollblockcommit,把當前的?goroutine,//?也就是?g?數據結構保存到?pollDesc?的?rg?或者?wg?指針里ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }//?netpollblockcommit?在?gopark?函數里被調用 func?netpollblockcommit(gp?*g,?gpp?unsafe.Pointer)?bool?{//?通過原子操作把當前?goroutine?抽象的數據結構?g,也就是這里的參數?gp?存入?gpp?指針,//?此時?gpp?的值是?pollDesc?的?rg?或者?wg?指針r?:=?atomic.Casuintptr((*uintptr)(gpp),?pdWait,?uintptr(unsafe.Pointer(gp)))if?r?{//?Bump?the?count?of?goroutines?waiting?for?the?poller.//?The?scheduler?uses?this?to?decide?whether?to?block//?waiting?for?the?poller?if?there?is?nothing?else?to?do.atomic.Xadd(&netpollWaiters,?1)}return?r }pollDesc.waitWrite 的內部實現原理和 pollDesc.waitRead 是一樣的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,這里就不再贅述。
netpoll
前面已經從源碼的層面分析完了 netpoll 是如何通過 park goroutine 從而達到阻塞 Accept/Read/Write 的效果,而通過調用 gopark,goroutine 會被放置在某個等待隊列中,這里是放到了 epoll 的 "interest list" 里,底層數據結構是由紅黑樹實現的 ?eventpoll.rbr,此時 G 的狀態由 _Grunning為_Gwaitting ,因此 G 必須被手動喚醒(通過 goready ),否則會丟失任務,應用層阻塞通常使用這種方式。
所以我們現在可以來從整體的層面來概括 Go 的網絡業務 goroutine 是如何被規劃調度的了:
?首先,client 連接 server 的時候,listener 通過 accept 調用接收新 connection,每一個新 connection 都啟動一個 goroutine 處理,accept 調用會把該 connection 的 fd 連帶所在的 goroutine 上下文信息封裝注冊到 epoll 的監聽列表里去,當 goroutine 調用 conn.Read 或者 conn.Write 等需要阻塞等待的函數時,會被 gopark 給封存起來并使之休眠,讓 P 去執行本地調度隊列里的下一個可執行的 goroutine,往后 Go scheduler 會在循環調度的 runtime.schedule() 函數以及 sysmon 監控線程中調用 runtime.nepoll 以獲取可運行的 goroutine 列表并通過調用 injectglist 把剩下的 g 放入全局調度隊列或者當前 P 本地調度隊列去重新執行。
那么當 I/O 事件發生之后,netpoller 是通過什么方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過 runtime.netpoll。
?runtime.netpoll 的核心邏輯是:
根據調用方的入參 delay,設置對應的調用 epollwait 的 timeout 值;
調用 epollwait 等待發生了可讀/可寫事件的 fd;
循環 epollwait 返回的事件列表,處理對應的事件類型, 組裝可運行的 goroutine 鏈表并返回。
Go 在多種場景下都可能會調用 netpoll 檢查文件描述符狀態,netpoll 里會調用 epoll_wait 從 epoll 的 eventpoll.rdllist 就緒雙向鏈表返回,從而得到 I/O 就緒的 socket fd 列表,并根據取出最初調用 epoll_ctl 時保存的上下文信息,恢復 g。所以執行完netpoll 之后,會返回一個就緒 fd 列表對應的 goroutine 鏈表,接下來將就緒的 goroutine 通過調用 injectglist 加入到全局調度隊列或者 P 的本地調度隊列中,啟動 M 綁定 P 去執行。
具體調用 netpoll 的地方,首先在 Go runtime scheduler 循環調度 goroutines 之時就有可能會調用 netpoll 獲取到已就緒的 fd 對應的 goroutine 來調度執行。
首先 Go scheduler 的核心方法 runtime.schedule() 里會調用一個叫 runtime.findrunable() 的方法獲取可運行的 goroutine 來執行,而在 runtime.findrunable() 方法里就調用了 runtime.netpoll 獲取已就緒的 fd 列表對應的 goroutine 列表:
//?One?round?of?scheduler:?find?a?runnable?goroutine?and?execute?it. //?Never?returns. func?schedule()?{...if?gp?==?nil?{gp,?inheritTime?=?findrunnable()?//?blocks?until?work?is?available}... }//?Finds?a?runnable?goroutine?to?execute. //?Tries?to?steal?from?other?P's,?get?g?from?global?queue,?poll?network. func?findrunnable()?(gp?*g,?inheritTime?bool)?{...//?Poll?network.if?netpollinited()?&&?(atomic.Load(&netpollWaiters)?>?0?||?pollUntil?!=?0)?&&?atomic.Xchg64(&sched.lastpoll,?0)?!=?0?{atomic.Store64(&sched.pollUntil,?uint64(pollUntil))if?_g_.m.p?!=?0?{throw("findrunnable:?netpoll?with?p")}if?_g_.m.spinning?{throw("findrunnable:?netpoll?with?spinning")}if?faketime?!=?0?{//?When?using?fake?time,?just?poll.delta?=?0}list?:=?netpoll(delta)?//?同步阻塞調用?netpoll,直至有可用的?goroutineatomic.Store64(&sched.pollUntil,?0)atomic.Store64(&sched.lastpoll,?uint64(nanotime()))if?faketime?!=?0?&&?list.empty()?{//?Using?fake?time?and?nothing?is?ready;?stop?M.//?When?all?M's?stop,?checkdead?will?call?timejump.stopm()goto?top}lock(&sched.lock)_p_?=?pidleget()?//?查找是否有空閑的?P?可以來就緒的?goroutineunlock(&sched.lock)if?_p_?==?nil?{injectglist(&list)?//?如果當前沒有空閑的?P,則把就緒的?goroutine?放入全局調度隊列等待被執行}?else?{//?如果當前有空閑的?P,則?pop?出一個?g,返回給調度器去執行,//?并通過調用?injectglist?把剩下的?g?放入全局調度隊列或者當前?P?本地調度隊列acquirep(_p_)if?!list.empty()?{gp?:=?list.pop()injectglist(&list)casgstatus(gp,?_Gwaiting,?_Grunnable)if?trace.enabled?{traceGoUnpark(gp,?0)}return?gp,?false}if?wasSpinning?{_g_.m.spinning?=?trueatomic.Xadd(&sched.nmspinning,?1)}goto?top}}?else?if?pollUntil?!=?0?&&?netpollinited()?{pollerPollUntil?:=?int64(atomic.Load64(&sched.pollUntil))if?pollerPollUntil?==?0?||?pollerPollUntil?>?pollUntil?{netpollBreak()}}stopm()goto?top }另外, sysmon 監控線程會在循環過程中檢查距離上一次 runtime.netpoll 被調用是否超過了 10ms,若是則會去調用它拿到可運行的 goroutine 列表并通過調用 injectglist 把 g 列表放入全局調度隊列或者當前 P 本地調度隊列等待被執行:
//?Always?runs?without?a?P,?so?write?barriers?are?not?allowed. // //go:nowritebarrierrec func?sysmon()?{...//?poll?network?if?not?polled?for?more?than?10mslastpoll?:=?int64(atomic.Load64(&sched.lastpoll))if?netpollinited()?&&?lastpoll?!=?0?&&?lastpoll+10*1000*1000?<?now?{atomic.Cas64(&sched.lastpoll,?uint64(lastpoll),?uint64(now))list?:=?netpoll(0)?//?non-blocking?-?returns?list?of?goroutinesif?!list.empty()?{//?Need?to?decrement?number?of?idle?locked?M's//?(pretending?that?one?more?is?running)?before?injectglist.//?Otherwise?it?can?lead?to?the?following?situation://?injectglist?grabs?all?P's?but?before?it?starts?M's?to?run?the?P's,//?another?M?returns?from?syscall,?finishes?running?its?G,//?observes?that?there?is?no?work?to?do?and?no?other?running?M's//?and?reports?deadlock.incidlelocked(-1)injectglist(&list)incidlelocked(1)}}... }Go runtime 在程序啟動的時候會創建一個獨立的 M 作為監控線程,叫 sysmon ,這個線程為系統級的 daemon 線程,無需 P 即可運行, sysmon 每 20us~10ms 運行一次。sysmon 中以輪詢的方式執行以下操作(如上面的代碼所示):
以非阻塞的方式調用 runtime.netpoll ,從中找出能從網絡 I/O 中喚醒的 g 列表,并通過調用 injectglist 把 g 列表放入全局調度隊列或者當前 P 本地調度隊列等待被執行,調度觸發時,有可能從這個全局 runnable 調度隊列獲取 g。然后再循環調用 startm ,直到所有 P 都不處于 _Pidle 狀態。
調用 retake ,搶占長時間處于 _Psyscall 狀態的 P。
綜上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設計出了自己的 I/O 多路復用 netpoller,成功地讓 Listener.Accept / conn.Read / conn.Write 等方法從開發者的角度看來是同步模式。
Go netpoller 的價值
通過前面對源碼的分析,我們現在知道 Go netpoller 依托于 runtime scheduler,為開發者提供了一種強大的同步網絡編程模式;然而,Go netpoller 存在的意義卻遠不止于此,Go netpoller I/O 多路復用搭配 Non-blocking I/O 而打造出來的這個原生網絡模型,它最大的價值是把網絡 I/O 的控制權牢牢掌握在 Go 自己的 runtime 里,關于這一點我們需要從 Go 的 runtime scheduler 說起,Go 的 G-P-M 調度模型如下:
G 在運行過程中如果被阻塞在某個 system call 操作上,那么不光 G 會阻塞,執行該 G 的 M 也會解綁 P(實質是被 sysmon 搶走了),與 G 一起進入 sleep 狀態。如果此時有 idle 的 M,則 P 與其綁定繼續執行其他 G;如果沒有 idle M,但仍然有其他 G 要去執行,那么就會創建一個新的 M。當阻塞在 system call 上的 G 完成 syscall 調用后,G 會去嘗試獲取一個可用的 P,如果沒有可用的 P,那么 G 會被標記為 _Grunnable 并把它放入全局的 runqueue 中等待調度,之前的那個 sleep 的 M 將再次進入 sleep。
現在清楚為什么 netpoll 為什么一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網絡 I/O 的 goroutine 陷入到系統調用從而進入內核態,因為一旦進入內核態,整個程序的控制權就會發生轉移(到內核),不再屬于用戶進程了,那么也就無法借助于 Go 強大的 runtime scheduler 來調度業務程序的并發了;而有了 netpoll 之后,借助于非阻塞 I/O ,G 就再也不會因為系統調用的讀寫而 (長時間) 陷入內核態,當 G 被阻塞在某個 network I/O 操作上時,實際上它不是因為陷入內核態被阻塞住了,而是被 Go runtime 調用 gopark 給 park 住了,此時 G 會被放置到某個 wait queue 中,而 M 會嘗試運行下一個 _Grunnable 的 G,如果此時沒有 _Grunnable 的 G 供 M 運行,那么 M 將解綁 P,并進入 sleep 狀態。當 I/O available,在 epoll 的 eventpoll.rdr 中等待的 G 會被放到 eventpoll.rdllist 鏈表里并通過 netpoll 中的 epoll_wait 系統調用返回放置到全局調度隊列或者 P 的本地調度隊列,標記為 _Grunnable ,等待 P 綁定 M 恢復執行。
Goroutine 的調度
這一小節主要是講處理網絡 I/O 的 goroutines 阻塞之后,Go scheduler 具體是如何像前面幾個章節所說的那樣,避免讓操作網絡 I/O 的 goroutine 陷入到系統調用從而進入內核態的,而是封存 goroutine 然后讓出 CPU 的使用權從而令 P 可以去調度本地調度隊列里的下一個 goroutine 的。
「溫馨提示」:這一小節屬于延伸閱讀,涉及到的知識點更偏系統底層,需要有一定的匯編語言基礎才能通讀,另外,這一節對 Go scheduler 的講解僅僅涉及核心的一部分,不會把整個調度器都講一遍(事實上如果真要解析 Go scheduler 的話恐怕重開一篇幾萬字的文章才能基本講清楚。。。),所以也要求讀者對 Go 的并發調度器有足夠的了解,因此這一節可能會稍顯深奧。當然這一節也可選擇不讀,因為通過前面的整個解析,我相信讀者應該已經能夠基本掌握 Go netpoller 處理網絡 I/O 的核心細節了,以及能從宏觀層面了解 netpoller 對業務 goroutines 的基本調度了。而這一節主要是通過對 goroutines 調度細節的剖析,能夠加深讀者對整個 Go netpoller 的徹底理解,接上前面幾個章節,形成一個完整的閉環。如果對調度的底層細節沒興趣的話這也可以直接跳過這一節,對理解 Go netpoller 的基本原理影響不大,不過還是建議有條件的讀者可以看看。
從源碼可知,Go scheduler 的調度 goroutine 過程中所調用的核心函數鏈如下:
runtime.schedule?-->?runtime.execute?-->?runtime.gogo?-->?goroutine?code?-->?runtime.goexit?-->?runtime.goexit1?-->?runtime.mcall?-->?runtime.goexit0?-->?runtime.schedule?Go scheduler 會不斷循環調用 runtime.schedule() 去調度 goroutines,而每個 goroutine 執行完成并退出之后,會再次調用 runtime.schedule(),使得調度器回到調度循環去執行其他的 goroutine,不斷循環,永不停歇。
當我們使用 go 關鍵字啟動一個新 goroutine 時,最終會調用 runtime.newproc --> runtime.newproc1,來得到 g,runtime.newproc1 會先從 P 的 gfree 緩存鏈表中查找可用的 g,若緩存未生效,則會新創建 g 給當前的業務函數,最后這個 g 會被傳給 runtime.gogo 去真正執行。
?這里首先需要了解一個 gobuf 的結構體,它用來保存 goroutine 的調度信息,是 runtime.gogo 的入參:
//?gobuf?存儲?goroutine?調度上下文信息的結構體 type?gobuf?struct?{//?The?offsets?of?sp,?pc,?and?g?are?known?to?(hard-coded?in)?libmach.////?ctxt?is?unusual?with?respect?to?GC:?it?may?be?a//?heap-allocated?funcval,?so?GC?needs?to?track?it,?but?it//?needs?to?be?set?and?cleared?from?assembly,?where?it's//?difficult?to?have?write?barriers.?However,?ctxt?is?really?a//?saved,?live?register,?and?we?only?ever?exchange?it?between//?the?real?register?and?the?gobuf.?Hence,?we?treat?it?as?a//?root?during?stack?scanning,?which?means?assembly?that?saves//?and?restores?it?doesn't?need?write?barriers.?It's?still//?typed?as?a?pointer?so?that?any?other?writes?from?Go?get//?write?barriers.sp???uintptr?//?Stack?Pointer?棧指針pc???uintptr?//?Program?Counter?程序計數器g????guintptr?//?持有當前?gobuf?的?goroutinectxt?unsafe.Pointerret??sys.Uintreglr???uintptrbp???uintptr?//?for?GOEXPERIMENT=framepointer }執行 runtime.execute(),進而調用 runtime.gogo:
func?execute(gp?*g,?inheritTime?bool)?{_g_?:=?getg()//?Assign?gp.m?before?entering?_Grunning?so?running?Gs?have?an//?M._g_.m.curg?=?gpgp.m?=?_g_.mcasgstatus(gp,?_Grunnable,?_Grunning)gp.waitsince?=?0gp.preempt?=?falsegp.stackguard0?=?gp.stack.lo?+?_StackGuardif?!inheritTime?{_g_.m.p.ptr().schedtick++}//?Check?whether?the?profiler?needs?to?be?turned?on?or?off.hz?:=?sched.profilehzif?_g_.m.profilehz?!=?hz?{setThreadCPUProfiler(hz)}if?trace.enabled?{//?GoSysExit?has?to?happen?when?we?have?a?P,?but?before?GoStart.//?So?we?emit?it?here.if?gp.syscallsp?!=?0?&&?gp.sysblocktraced?{traceGoSysExit(gp.sysexitticks)}traceGoStart()}//?gp.sched?就是?gobufgogo(&gp.sched) }這里還需要了解一個概念:g0,Go G-P-M 調度模型中,g 代表 goroutine,而實際上一共有三種 g:
執行用戶代碼的 g;
執行調度器代碼的 g,也即是 g0;
執行 runtime.main 初始化工作的 main goroutine;
第一種 g 就是使用 go 關鍵字啟動的 goroutine,也是我們接觸最多的一類 g;第三種 g 是調度器啟動之后用來執行的一系列初始化工作的,包括但不限于啟動 sysmon 監控線程、內存初始化和啟動 GC 等等工作;第二種 g 叫 g0,用來執行調度器代碼,g0 在底層和其他 g 是一樣的數據結構,但是性質上有很大的區別,首先 g0 的棧大小是固定的,比如在 Linux 或者其他 Unix-like 的系統上一般是固定 8MB,不能動態伸縮,而普通的 g 初始棧大小是 2KB,可按需擴展,g0 其實就是線程棧,我們知道每個線程被創建出來之時都需要操作系統為之分配一個初始固定的線程棧,就是前面說的 8MB 大小的棧,g0 棧就代表了這個線程棧,因此每一個 m 都需要綁定一個 g0 來執行調度器代碼,然后跳轉到執行用戶代碼的地方。
runtime.gogo 是真正去執行 goroutine 代碼的函數,這個函數由匯編實現,為什么需要用匯編?因為 gogo 的工作是完成線程 M 上的堆棧切換:從系統堆棧 g0 切換成 goroutine gp,也就是 CPU 使用權和堆棧的切換,這種切換本質上是對 CPU 的 PC、SP 等寄存器和堆棧指針的更新,而這一類精度的底層操作別說是 Go,就算是最貼近底層的 C 也無法做到,這種程度的操作已超出所有高級語言的范疇,因此只能借助于匯編來實現。
runtime.gogo 在不同的 CPU 架構平臺上的實現各不相同,但是核心原理殊途同歸,我們這里選用 amd64 架構的匯編實現來分析,我會在關鍵的地方加上解釋:
// func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8// 將第一個 FP 偽寄存器所指向的 gobuf 的第一個參數存入 BX 寄存器, // gobuf 的一個參數即是 SP 指針MOVQ buf+0(FP), BXMOVQ gobuf_g(BX), DX // 將 gp.sched.g 保存到 DX 寄存器MOVQ 0(DX), CX // make sure g != nil// 將 tls (thread local storage) 保存到 CX 寄存器,然后把 gp.sched.g 放到 tls[0],// 這樣以后調用 getg() 之時就可以通過 TLS 直接獲取到當前 goroutine 的 g 結構體實例,// 進而可以得到 g 所在的 m 和 p,TLS 里一開始存儲的是系統堆棧 g0 的地址get_tls(CX)MOVQ DX, g(CX)// 下面的指令則是對函數棧的 BP/SP 寄存器(指針)的存取,// 最后進入到指定的代碼區域,執行函數棧幀MOVQ gobuf_sp(BX), SP // restore SPMOVQ gobuf_ret(BX), AXMOVQ gobuf_ctxt(BX), DXMOVQ gobuf_bp(BX), BP// 這里是在清空 gp.sched,因為前面已經把 gobuf 里的字段值都存入了寄存器,// 所以 gp.sched 就可以提前清空了,不需要等到后面 GC 來回收,減輕 GC 的負擔MOVQ $0, gobuf_sp(BX) // clear to help garbage collectorMOVQ $0, gobuf_ret(BX)MOVQ $0, gobuf_ctxt(BX)MOVQ $0, gobuf_bp(BX)// 把 gp.sched.pc 值放入 BX 寄存器// PC 指針指向 gogo 退出時需要執行的函數地址MOVQ gobuf_pc(BX), BX// 用 BX 寄存器里的值去修改 CPU 的 IP 寄存器,// 這樣就可以根據 CS:IP 寄存器的段地址+偏移量跳轉到 BX 寄存器里的地址,也就是 gp.sched.pcJMP BXruntime.gogo 函數接收 gp.sched 這個 gobuf 結構體實例,其中保存了函數棧寄存器 SP/PC/BP,如果熟悉操作系統原理的話可以知道這些寄存器是 CPU 進行函數調用和返回時切換對應的函數棧幀所需的寄存器,而 goroutine 的執行和函數調用的原理是一致的,也是 CPU 寄存器的切換過程,所以這里的幾個寄存器當前存的就是 G 的函數執行棧,當 goroutine 在處理網絡 I/O 之時,如果恰好處于 I/O 就緒的狀態的話,則正常完成 runtime.gogo,并在最后跳轉到特定的地址,那么這個地址是哪里呢?
我們知道 CPU 執行函數的時候需要知道函數在內存里的代碼段地址和偏移量,然后才能去取來函數棧執行,而典型的提供代碼段地址和偏移量的寄存器就是 CS 和 IP 寄存器,而 JMP BX 指令則是用 BX 寄存器去更新 IP 寄存器,而 BX 寄存器里的值是 gp.sched.pc,那么這個 PC 指針究竟是指向哪里呢?讓我們來看另一處源碼。
眾所周知,啟動一個新的 goroutine 是通過 go 關鍵字來完成的,而 go compiler 會在編譯期間利用 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 這兩個函數將 go 關鍵字翻譯成 runtime.newproc 函數調用,而 runtime.newproc 接收了函數指針和其大小之后,會獲取 goroutine 和調用處的程序計數器,接著再調用 runtime.newproc1:
//?Create?a?new?g?in?state?_Grunnable,?starting?at?fn,?with?narg?bytes //?of?arguments?starting?at?argp.?callerpc?is?the?address?of?the?go //?statement?that?created?this.?The?caller?is?responsible?for?adding //?the?new?g?to?the?scheduler. // //?This?must?run?on?the?system?stack?because?it's?the?continuation?of //?newproc,?which?cannot?split?the?stack. // //go:systemstack func?newproc1(fn?*funcval,?argp?unsafe.Pointer,?narg?int32,?callergp?*g,?callerpc?uintptr)?*g?{...memclrNoHeapPointers(unsafe.Pointer(&newg.sched),?unsafe.Sizeof(newg.sched))newg.sched.sp?=?spnewg.stktopsp?=?sp//?把?goexit?函數地址存入?gobuf?的?PC?指針里newg.sched.pc?=?funcPC(goexit)?+?sys.PCQuantum?//?+PCQuantum?so?that?previous?instruction?is?in?same?functionnewg.sched.g?=?guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched,?fn)newg.gopc?=?callerpcnewg.ancestors?=?saveAncestors(callergp)newg.startpc?=?fn.fnif?_g_.m.curg?!=?nil?{newg.labels?=?_g_.m.curg.labels}if?isSystemGoroutine(newg,?false)?{atomic.Xadd(&sched.ngsys,?+1)}casgstatus(newg,?_Gdead,?_Grunnable)... }這里可以看到,newg.sched.pc 被設置了 runtime.goexit 的函數地址,newg 就是后面 runtime.gogo 執行的 goroutine,因此 runtime.gogo 最后的匯編指令 JMP BX是跳轉到了 runtime.goexit,讓我們來繼續看看這個函數做了什么:
// The top-most function running on a goroutine // returns to goexit+PCQuantum. Defined as ABIInternal // so as to make it identifiable to traceback (this // function it used as a sentinel; traceback wants to // see the func PC, not a wrapper PC). TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0BYTE $0x90 // NOPCALL runtime·goexit1(SB) // does not return// traceback from goexit1 must hit code range of goexitBYTE $0x90 // NOP這個函數也是匯編實現的,但是非常簡單,就是直接調用 runtime·goexit1:
//?Finishes?execution?of?the?current?goroutine. func?goexit1()?{if?raceenabled?{racegoend()}if?trace.enabled?{traceGoEnd()}mcall(goexit0) }調用 runtime.mcall函數:
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g.// 切換回 g0 的系統堆棧,執行 fn(g) TEXT runtime·mcall(SB), NOSPLIT, $0-8// 取入參 funcval 對象的指針存入 DI 寄存器,此時 fn.fn 是 goexit0 的地址MOVQ fn+0(FP), DIget_tls(CX)MOVQ g(CX), AX // save state in g->schedMOVQ 0(SP), BX // caller's PCMOVQ BX, (g_sched+gobuf_pc)(AX)LEAQ fn+0(FP), BX // caller's SPMOVQ BX, (g_sched+gobuf_sp)(AX)MOVQ AX, (g_sched+gobuf_g)(AX)MOVQ BP, (g_sched+gobuf_bp)(AX)// switch to m->g0 & its stack, call fnMOVQ g(CX), BXMOVQ g_m(BX), BX// 把 g0 的棧指針存入 SI 寄存器,后面需要用到MOVQ m_g0(BX), SICMPQ SI, AX // if g == m->g0 call badmcallJNE 3(PC)MOVQ $runtime·badmcall(SB), AXJMP AX// 這兩個指令是把 g0 地址存入到 TLS 里,// 然后從 SI 寄存器取出 g0 的棧指針,// 替換掉 SP 寄存器里存的當前 g 的棧指針MOVQ SI, g(CX) // g = m->g0MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.spPUSHQ AXMOVQ DI, DX// 入口處的第一個指令已經把 funcval 實例對象的指針存入了 DI 寄存器,// 0(DI) 表示取出 DI 的第一個成員,即 goexit0 函數地址,再存入 DIMOVQ 0(DI), DICALL DI // 調用 DI 寄存器里的地址,即 goexit0POPQ AXMOVQ $runtime·badmcall2(SB), AXJMP AXRET可以看到 runtime.mcall 函數的主要邏輯是從當前 goroutine 切換回 g0 的系統堆棧,然后調用 fn(g),此處的 g 即是當前運行的 goroutine,這個方法會保存當前運行的 G 的 PC/SP 到 g->sched 里,以便該 G 可以在以后被重新恢復執行,因為也涉及到寄存器和堆棧指針的操作,所以也需要使用匯編實現,該函數最后會在 g0 系統堆棧下執行 runtime.goexit0:
func?goexit0(gp?*g)?{_g_?:=?getg()casgstatus(gp,?_Grunning,?_Gdead)if?isSystemGoroutine(gp,?false)?{atomic.Xadd(&sched.ngsys,?-1)}gp.m?=?nillocked?:=?gp.lockedm?!=?0gp.lockedm?=?0_g_.m.lockedg?=?0gp.preemptStop?=?falsegp.paniconfault?=?falsegp._defer?=?nil?//?should?be?true?already?but?just?in?case.gp._panic?=?nil?//?non-nil?for?Goexit?during?panic.?points?at?stack-allocated?data.gp.writebuf?=?nilgp.waitreason?=?0gp.param?=?nilgp.labels?=?nilgp.timer?=?nilif?gcBlackenEnabled?!=?0?&&?gp.gcAssistBytes?>?0?{//?Flush?assist?credit?to?the?global?pool.?This?gives//?better?information?to?pacing?if?the?application?is//?rapidly?creating?an?exiting?goroutines.scanCredit?:=?int64(gcController.assistWorkPerByte?*?float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit,?scanCredit)gp.gcAssistBytes?=?0}dropg()if?GOARCH?==?"wasm"?{?//?no?threads?yet?on?wasmgfput(_g_.m.p.ptr(),?gp)schedule()?//?never?returns}if?_g_.m.lockedInt?!=?0?{print("invalid?m->lockedInt?=?",?_g_.m.lockedInt,?"\n")throw("internal?lockOSThread?error")}gfput(_g_.m.p.ptr(),?gp)if?locked?{//?The?goroutine?may?have?locked?this?thread?because//?it?put?it?in?an?unusual?kernel?state.?Kill?it//?rather?than?returning?it?to?the?thread?pool.//?Return?to?mstart,?which?will?release?the?P?and?exit//?the?thread.if?GOOS?!=?"plan9"?{?//?See?golang.org/issue/22227.gogo(&_g_.m.g0.sched)}?else?{//?Clear?lockedExt?on?plan9?since?we?may?end?up?re-using//?this?thread._g_.m.lockedExt?=?0}}schedule() }runtime.goexit0 的主要工作是就是
利用 CAS 操作把 g 的狀態從 _Grunning 更新為 _Gdead;
對 g 做一些清理操作,把一些字段值置空;
調用 runtime.dropg 解綁 g 和 m;
把 g 放入 p 存儲 g 的 gfree 鏈表作為緩存,后續如果需要啟動新的 goroutine 則可以直接從鏈表里取而不用重新初始化分配內存。
最后,調用 runtime.schedule() 再次進入調度循環去調度新的 goroutines,永不停歇。
另一方面,如果 goroutine 處于 I/O 不可用狀態,我們前面已經分析過 netpoller 利用非阻塞 I/O + I/O 多路復用避免了陷入系統調用,所以此時會調用 runtime.gopark 并把 goroutine 暫時封存在用戶態空間,并休眠當前的 goroutine,因此不會阻塞 runtime.gogo 的匯編執行,而是通過 runtime.mcall 調用 runtime.park_m:
func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.mcall(park_m) }func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }runtime.mcall 方法我們在前面已經介紹過,它主要的工作就是是從當前 goroutine 切換回 g0 的系統堆棧,然后調用 fn(g),而此時 runtime.mcall 調用執行的是 runtime.park_m,這個方法里會利用 CAS 把當前運行的 goroutine -- gp 的狀態 從 _Grunning 切換到 _Gwaiting,表明該 goroutine 已進入到等待喚醒狀態,此時封存和休眠 G 的操作就完成了,只需等待就緒之后被重新喚醒執行即可。最后調用 runtime.schedule() 再次進入調度循環,去執行下一個 goroutine,充分利用 CPU。
至此,我們完成了對 Go netpoller 原理剖析的整個閉環。
Go netpoller 的問題
Go netpoller 的設計不可謂不精巧、性能也不可謂不高,配合 goroutine 開發網絡應用的時候就一個字:爽。因此 Go 的網絡編程模式是及其簡潔高效的,然而,沒有任何一種設計和架構是完美的, goroutine-per-connection 這種模式雖然簡單高效,但是在某些極端的場景下也會暴露出問題:goroutine 雖然非常輕量,它的自定義棧內存初始值僅為 2KB,后面按需擴容;海量連接的業務場景下, goroutine-per-connection ,此時 goroutine 數量以及消耗的資源就會呈線性趨勢暴漲,雖然 Go scheduler 內部做了 g 的緩存鏈表,可以一定程度上緩解高頻創建銷毀 goroutine 的壓力,但是對于瞬時性暴漲的長連接場景就無能為力了,大量的 goroutines 會被不斷創建出來,從而對 Go runtime scheduler 造成極大的調度壓力和侵占系統資源,然后資源被侵占又反過來影響 Go scheduler 的調度,進而導致性能下降。
Reactor 網絡模型
目前 Linux 平臺上主流的高性能網絡庫/框架中,大都采用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。
Reactor 模式本質上指的是使用 I/O 多路復用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。
通常設置一個主線程負責做 event-loop 事件循環和 I/O 讀寫,通過 select/poll/epoll_wait 等系統調用監聽 I/O 事件,業務邏輯提交給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統調用上,這樣可以最大限度的復用 event-loop 線程,讓一個線程能服務于多個 sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數上(select/poll/epoll_wait)。
Reactor 模式的基本工作流程如下:
Server 端完成在 bind&listen 之后,將 listenfd 注冊到 epollfd 中,最后進入 event-loop 事件循環。循環過程中會調用 select/poll/epoll_wait 阻塞等待,若有在 listenfd 上的新連接事件則解除阻塞返回,并調用 socket.accept 接收新連接 connfd,并將 connfd 加入到 epollfd 的 I/O 復用(監聽)隊列。
當 connfd 上發生可讀/可寫事件也會解除 select/poll/epoll_wait 的阻塞等待,然后進行 I/O 讀寫操作,這里讀寫 I/O 都是非阻塞 I/O,這樣才不會阻塞 event-loop 的下一個循環。然而,這樣容易割裂業務邏輯,不易理解和維護。
調用 read 讀取數據之后進行解碼并放入隊列中,等待工作線程處理。
工作線程處理完數據之后,返回到 event-loop 線程,由這個線程負責調用 write 把數據寫回 client。
accept 連接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因為 Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環。「實際上 event loop 可能也可以是多線程的,只是一個線程里只有一個 select/poll/epoll_wait」。
上面提到了 Go netpoller 在某些場景下可能因為創建太多的 goroutine 而過多地消耗系統資源,而在現實世界的網絡業務中,服務器持有的海量連接中在極短的時間窗口內只有極少數是 active 而大多數則是 idle,就像這樣(非真實數據,僅僅是為了比喻):
那么為每一個連接指派一個 goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路復用進而只需要使用少量線程即可管理海量連接的設計就可以在這樣網絡業務中大顯身手了:
在絕大部分應用場景下,我推薦大家還是遵循 Go 的 best practices,使用原生的 Go 網絡庫來構建自己的網絡應用。然而,在某些極度追求性能、壓榨系統資源以及技術棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業務層)的業務場景下,我們可以考慮自己構建 Reactor 網絡模型。
gnet
Github:?https://github.com/panjf2000/gnet
gnet?是一個基于事件驅動的高性能和輕量級網絡框架。它直接使用 epoll 和 kqueue 系統調用而非標準 Go 網絡包:net 來構建網絡應用,它的工作原理類似兩個開源的網絡庫:netty 和 libuv,這也使得gnet 達到了一個遠超 Go net 的性能表現。
gnet 設計開發的初衷不是為了取代 Go 的標準網絡庫:net,而是為了創造出一個類似于 Redis、Haproxy 能高效處理網絡包的 Go 語言網絡服務器框架。
gnet 的賣點在于它是一個高性能、輕量級、非阻塞的純 Go 實現的傳輸層(TCP/UDP/Unix Domain Socket)網絡框架,開發者可以使用 gnet 來實現自己的應用層網絡協議(HTTP、RPC、Redis、WebSocket 等等),從而構建出自己的應用層網絡應用:比如在 gnet 上實現 HTTP 協議就可以創建出一個 HTTP 服務器 或者 Web 開發框架,實現 Redis 協議就可以創建出自己的 Redis 服務器等等。
gnet,在某些極端的網絡業務場景,比如海量連接、高頻短連接、網絡小包等等場景,gnet 在性能和資源占用上都遠超 Go 原生的 net 包(基于 netpoller)。
gnet 已經實現了 Multi-Reactors 和 Multi-Reactors + Goroutine Pool 兩種網絡模型,也得益于這些網絡模型,使得 gnet 成為一個高性能和低損耗的 Go 網絡框架:
主從 Reactors 模型主從 Reactors + Goroutine Pool 模型
🚀 功能
[x] 高性能的基于多線程/Go程網絡模型的 event-loop 事件驅動
[x] 內置 goroutine 池,由開源庫 ants 提供支持
[x] 內置 bytes 內存池,由開源庫 bytebufferpool 提供支持
[x] 整個生命周期是無鎖的
[x] 簡單易用的 APIs
[x] 基于 Ring-Buffer 的高效且可重用的內存 buffer
[x] 支持多種網絡協議/IPC 機制:TCP、UDP 和 Unix Domain Socket
[x] 支持多種負載均衡算法:Round-Robin(輪詢)、Source-Addr-Hash(源地址哈希) 和 Least-Connections(最少連接數)
[x] 支持兩種事件驅動機制:「Linux」 里的 epoll 以及 「FreeBSD/DragonFly/Darwin」 里的 kqueue
[x] 支持異步寫操作
[x] 靈活的事件定時器
[x] SO_REUSEPORT 端口重用
[x] 內置多種編解碼器,支持對 TCP 數據流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec,而且支持自定制編解碼器
[x] 支持 Windows 平臺,基于 IOCP 事件驅動機制 Go 標準網絡庫
[ ] 實現 gnet 客戶端
參考&延伸閱讀
The Go netpoller
Nonblocking I/O
epoll(7) — Linux manual page
I/O Multiplexing: The select and poll Functions
The method to epoll’s madness
Scalable Go Scheduler Design Doc
Scheduling In Go : Part I - OS Scheduler
Scheduling In Go : Part II - Go Scheduler
Scheduling In Go : Part III - Concurrency
Goroutines, Nonblocking I/O, And Memory Usage
IO多路復用與Go網絡庫的實現
關于select函數中timeval和fd_set重新設置的問題
A Million WebSockets and Go
Going Infinite, handling 1M websockets connections in Go
字節跳動在 Go 網絡庫上的實踐
- END -
看完一鍵三連在看,轉發,點贊
是對文章最大的贊賞,極客重生感謝你
推薦閱讀
五個半小時
深入理解Linux異步I/O框架 io_uring
深入理解高并發服務器性能優化
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 求點贊,在看,分享三連
總結
以上是生活随笔為你收集整理的经典|深入理解 Go高性能网络编程模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五个半小时
- 下一篇: 云计算学习路线和经典资料推荐