Go netpoller 网络模型之源码全面解析
作者:allanpan,騰訊 IEG 后臺開發(fā)工程師
近兩萬字長文從 Linux 底層 Nonblocking I/O、 I/O multiplexing: select/epoll 以及 Go 源碼全方位剖析 Go 語言的網(wǎng)絡(luò)模型和底層實現(xiàn);最后介紹分析當(dāng)前主流的高性能開源網(wǎng)絡(luò)庫所使用的經(jīng)典 Reactors 模式,以及如何基于此實現(xiàn)一個?(在某些特定場景下)?比 Go 原生網(wǎng)絡(luò)庫性能更好的網(wǎng)絡(luò)庫。可能是全網(wǎng)最詳盡的 Go 網(wǎng)絡(luò)底層剖析文章,一文帶你完全吃透 Go 語言的網(wǎng)絡(luò)編程底層原理。
導(dǎo)言
Go 基于 I/O multiplexing 和 goroutine scheduler 構(gòu)建了一個簡潔而高性能的原生網(wǎng)絡(luò)模型(基于 Go 的 I/O 多路復(fù)用 netpoller ),提供了 goroutine-per-connection 這樣簡單的網(wǎng)絡(luò)編程模式。在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯,極大地降低了開發(fā)者編寫網(wǎng)絡(luò)應(yīng)用時的心智負擔(dān),且借助于 Go runtime scheduler 對 goroutines 的高效調(diào)度,這個原生網(wǎng)絡(luò)模型不論從適用性還是性能上都足以滿足絕大部分的應(yīng)用場景。
然而,在工程性上能做到如此高的普適性和兼容性,最終暴露給開發(fā)者提供接口/模式如此簡潔,其底層必然是基于非常復(fù)雜的封裝,做了很多取舍,也有可能放棄了一些追求極致性能的設(shè)計和理念。事實上 Go netpoller 底層就是基于 epoll/kqueue/iocp 這些 I/O 多路復(fù)用技術(shù)來做封裝的,最終暴露出 goroutine-per-connection 這樣的極簡的開發(fā)模式給使用者。
Go netpoller 在不同的操作系統(tǒng),其底層使用的 I/O 多路復(fù)用技術(shù)也不一樣,可以從 Go 源碼目錄結(jié)構(gòu)和對應(yīng)代碼文件了解 Go 在不同平臺下的網(wǎng)絡(luò) I/O 模式的實現(xiàn)。比如,在 Linux 系統(tǒng)下基于 epoll,freeBSD 系統(tǒng)下基于 kqueue,以及 Windows 系統(tǒng)下基于 iocp。
本文將基于 Linux 平臺來解析 Go netpoller 之 I/O 多路復(fù)用的底層是如何基于 epoll 封裝實現(xiàn)的,從源碼層層推進,全面而深度地解析 Go netpoller 的設(shè)計理念和實現(xiàn)原理,以及 Go 是如何利用 netpoller 來構(gòu)建它的原生網(wǎng)絡(luò)模型的。主要涉及到的一些概念:I/O 模型、用戶/內(nèi)核空間、epoll、Linux 源碼、goroutine scheduler 等等,我會盡量簡單地講解,如果有對相關(guān)概念不熟悉的同學(xué),還是希望能提前熟悉一下。
用戶空間與內(nèi)核空間
現(xiàn)代操作系統(tǒng)都是采用虛擬存儲器,那么對 32 位操作系統(tǒng)而言,它的尋址空間(虛擬存儲空間)為 4G(2 的 32 次方)。操作系統(tǒng)的核心是內(nèi)核,獨立于普通的應(yīng)用程序,可以訪問受保護的內(nèi)存空間,也有訪問底層硬件設(shè)備的所有權(quán)限。為了保證用戶進程不能直接操作內(nèi)核(kernel),保證內(nèi)核的安全,操心系統(tǒng)將虛擬空間劃分為兩部分,一部分為內(nèi)核空間,一部分為用戶空間。針對 Linux 操作系統(tǒng)而言,將最高的 1G 字節(jié)(從虛擬地址 0xC0000000 到 0xFFFFFFFF),供內(nèi)核使用,稱為內(nèi)核空間,而將較低的 3G 字節(jié)(從虛擬地址 0x00000000 到 0xBFFFFFFF),供各個進程使用,稱為用戶空間。
現(xiàn)代的網(wǎng)絡(luò)服務(wù)的主流已經(jīng)完成從 CPU 密集型到 IO 密集型的轉(zhuǎn)變,所以服務(wù)端程序?qū)?I/O 的處理必不可少,而一旦操作 I/O 則必定要在用戶態(tài)和內(nèi)核態(tài)之間來回切換。I/O 模型
在神作《UNIX 網(wǎng)絡(luò)編程》里,總結(jié)歸納了 5 種 I/O 模型,包括同步和異步 I/O:
阻塞 I/O (Blocking I/O)
非阻塞 I/O (Nonblocking I/O)
I/O 多路復(fù)用 (I/O multiplexing)
信號驅(qū)動 I/O (Signal driven I/O)
異步 I/O (Asynchronous I/O)
操作系統(tǒng)上的 I/O 是用戶空間和內(nèi)核空間的數(shù)據(jù)交互,因此 I/O 操作通常包含以下兩個步驟:
等待網(wǎng)絡(luò)數(shù)據(jù)到達網(wǎng)卡(讀就緒)/等待網(wǎng)卡可寫(寫就緒) –> 讀取/寫入到內(nèi)核緩沖區(qū)
從內(nèi)核緩沖區(qū)復(fù)制數(shù)據(jù) –> 用戶空間(讀)/從用戶空間復(fù)制數(shù)據(jù) -> 內(nèi)核緩沖區(qū)(寫)
而判定一個 I/O 模型是同步還是異步,主要看第二步:數(shù)據(jù)在用戶和內(nèi)核空間之間復(fù)制的時候是不是會阻塞當(dāng)前進程,如果會,則是同步 I/O,否則,就是異步 I/O。基于這個原則,這 5 種 I/O 模型中只有一種異步 I/O 模型:Asynchronous I/O,其余都是同步 I/O 模型。
這 5 種 I/O 模型的對比如下:
Non-blocking I/O
什么叫非阻塞 I/O,顧名思義就是:所有 I/O 操作都是立刻返回而不會阻塞當(dāng)前用戶進程。I/O 多路復(fù)用通常情況下需要和非阻塞 I/O 搭配使用,否則可能會產(chǎn)生意想不到的問題。比如,epoll 的 ET(邊緣觸發(fā)) 模式下,如果不使用非阻塞 I/O,有極大的概率會導(dǎo)致阻塞 event-loop 線程,從而降低吞吐量,甚至導(dǎo)致 bug。
Linux 下,我們可以通過 fcntl 系統(tǒng)調(diào)用來設(shè)置 O_NONBLOCK 標(biāo)志位,從而把 socket 設(shè)置成 Non-blocking。當(dāng)對一個 Non-blocking socket 執(zhí)行讀操作時,流程是這個樣子:
當(dāng)用戶進程發(fā)出 read 操作時,如果 kernel 中的數(shù)據(jù)還沒有準(zhǔn)備好,那么它并不會 block 用戶進程,而是立刻返回一個 EAGAIN error。
從用戶進程角度講 ,它發(fā)起一個 read 操作后,并不需要等待,而是馬上就得到了一個結(jié)果。用戶進程判斷結(jié)果是一個 error 時,它就知道數(shù)據(jù)還沒有準(zhǔn)備好,于是它可以再次發(fā)送 read 操作。一旦 kernel 中的數(shù)據(jù)準(zhǔn)備好了,并且又再次收到了用戶進程的 system call,那么它馬上就將數(shù)據(jù)拷貝到了用戶內(nèi)存,然后返回。所以,Non-blocking I/O 的特點是用戶進程需要不斷的主動詢問 kernel 數(shù)據(jù)好了沒有。下一節(jié)我們要講的 I/O 多路復(fù)用需要和 Non-blocking I/O 配合才能發(fā)揮出最大的威力!
I/O 多路復(fù)用
所謂 I/O 多路復(fù)用指的就是 select/poll/epoll 這一系列的多路選擇器:支持單一線程同時監(jiān)聽多個文件描述符(I/O 事件),阻塞等待,并在其中某個文件描述符可讀寫時收到通知。I/O 復(fù)用其實復(fù)用的不是 I/O 連接,而是復(fù)用線程,讓一個 thread of control 能夠處理多個連接(I/O 事件)。
select & poll
#include?<sys/select.h>/*?According?to?earlier?standards?*/ #include?<sys/time.h> #include?<sys/types.h> #include?<unistd.h>int?select(int?nfds,?fd_set?*readfds,?fd_set?*writefds,?fd_set?*exceptfds,?struct?timeval?*timeout);//?和 select 緊密結(jié)合的四個宏: void?FD_CLR(int?fd,?fd_set?*set); int?FD_ISSET(int?fd,?fd_set?*set); void?FD_SET(int?fd,?fd_set?*set); void?FD_ZERO(fd_set?*set);select 是 epoll 之前 Linux 使用的 I/O 事件驅(qū)動技術(shù)。
理解 select 的關(guān)鍵在于理解 fd_set,為說明方便,取 fd_set 長度為 1 字節(jié),fd_set 中的每一 bit 可以對應(yīng)一個文件描述符 fd,則 1 字節(jié)長的 fd_set 最大可以對應(yīng) 8 個 fd。select 的調(diào)用過程如下:
執(zhí)行 FD_ZERO(&set), 則 set 用位表示是 0000,0000
若 fd=5, 執(zhí)行 FD_SET(fd, &set); 后 set 變?yōu)?0001,0000(第 5 位置為 1)
再加入 fd=2, fd=1,則 set 變?yōu)?0001,0011
執(zhí)行 select(6, &set, 0, 0, 0) 阻塞等待
若 fd=1, fd=2 上都發(fā)生可讀事件,則 select 返回,此時 set 變?yōu)?0000,0011 (注意:沒有事件發(fā)生的 fd=5 被清空)
基于上面的調(diào)用過程,可以得出 select 的特點:
可監(jiān)控的文件描述符個數(shù)取決于 sizeof(fd_set) 的值。假設(shè)服務(wù)器上 sizeof(fd_set)=512,每 bit 表示一個文件描述符,則服務(wù)器上支持的最大文件描述符是 512*8=4096。fd_set 的大小調(diào)整可參考 【原創(chuàng)】技術(shù)系列之 網(wǎng)絡(luò)模型(二) 中的模型 2,可以有效突破 select 可監(jiān)控的文件描述符上限
將 fd 加入 select 監(jiān)控集的同時,還要再使用一個數(shù)據(jù)結(jié)構(gòu) array 保存放到 select 監(jiān)控集中的 fd,一是用于在 select 返回后,array 作為源數(shù)據(jù)和 fd_set 進行 FD_ISSET 判斷。二是 select 返回后會把以前加入的但并無事件發(fā)生的 fd 清空,則每次開始 select 前都要重新從 array 取得 fd 逐一加入(FD_ZERO 最先),掃描 array 的同時取得 fd 最大值 maxfd,用于 select 的第一個參數(shù)
可見 select 模型必須在 select 前循環(huán) array(加 fd,取 maxfd),select 返回后循環(huán) array(FD_ISSET 判斷是否有事件發(fā)生)
所以,select 有如下的缺點:
最大并發(fā)數(shù)限制:使用 32 個整數(shù)的 32 位,即 32*32=1024 來標(biāo)識 fd,雖然可修改,但是有以下第 2, 3 點的瓶頸
每次調(diào)用 select,都需要把 fd 集合從用戶態(tài)拷貝到內(nèi)核態(tài),這個開銷在 fd 很多時會很大
性能衰減嚴重:每次 kernel 都需要線性掃描整個 fd_set,所以隨著監(jiān)控的描述符 fd 數(shù)量增長,其 I/O 性能會線性下降
poll 的實現(xiàn)和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 結(jié)構(gòu)而不是 select 的 fd_set 結(jié)構(gòu),poll 解決了最大文件描述符數(shù)量限制的問題,但是同樣需要從用戶態(tài)拷貝所有的 fd 到內(nèi)核態(tài),也需要線性遍歷所有的 fd 集合,所以它和 select 只是實現(xiàn)細節(jié)上的區(qū)分,并沒有本質(zhì)上的區(qū)別。
epoll
epoll 是 Linux kernel 2.6 之后引入的新 I/O 事件驅(qū)動技術(shù),I/O 多路復(fù)用的核心設(shè)計是 1 個線程處理所有連接的 等待消息準(zhǔn)備好 I/O 事件,這一點上 epoll 和 select&poll 是大同小異的。但 select&poll 錯誤預(yù)估了一件事,當(dāng)數(shù)十萬并發(fā)連接存在時,可能每一毫秒只有數(shù)百個活躍的連接,同時其余數(shù)十萬連接在這一毫秒是非活躍的。select&poll 的使用方法是這樣的:返回的活躍連接 == select(全部待監(jiān)控的連接) 。
什么時候會調(diào)用 select&poll 呢?在你認為需要找出有報文到達的活躍連接時,就應(yīng)該調(diào)用。所以,select&poll 在高并發(fā)時是會被頻繁調(diào)用的。這樣,這個頻繁調(diào)用的方法就很有必要看看它是否有效率,因為,它的輕微效率損失都會被 高頻 二字所放大。它有效率損失嗎?顯而易見,全部待監(jiān)控連接是數(shù)以十萬計的,返回的只是數(shù)百個活躍連接,這本身就是無效率的表現(xiàn)。被放大后就會發(fā)現(xiàn),處理并發(fā)上萬個連接時,select&poll 就完全力不從心了。這個時候就該 epoll 上場了,epoll 通過一些新的設(shè)計和優(yōu)化,基本上解決了 select&poll 的問題。
epoll 的 API 非常簡潔,涉及到的只有 3 個系統(tǒng)調(diào)用:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?//?int?epoll_create1(int?flags); int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event); int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);其中,epoll_create 創(chuàng)建一個 epoll 實例并返回 epollfd;epoll_ctl 注冊 file descriptor 等待的 I/O 事件(比如 EPOLLIN、EPOLLOUT 等) 到 epoll 實例上;epoll_wait 則是阻塞監(jiān)聽 epoll 實例上所有的 file descriptor 的 I/O 事件,它接收一個用戶空間上的一塊內(nèi)存地址 (events 數(shù)組),kernel 會在有 I/O 事件發(fā)生的時候把文件描述符列表復(fù)制到這塊內(nèi)存地址上,然后 epoll_wait 解除阻塞并返回,最后用戶空間上的程序就可以對相應(yīng)的 fd 進行讀寫了:
#include?<unistd.h> ssize_t?read(int?fd,?void?*buf,?size_t?count); ssize_t?write(int?fd,?const?void?*buf,?size_t?count);epoll 的工作原理如下:
與 select&poll 相比,epoll 分清了高頻調(diào)用和低頻調(diào)用。例如,epoll_ctl 相對來說就是非頻繁調(diào)用的,而 epoll_wait 則是會被高頻調(diào)用的。所以 epoll 利用 epoll_ctl 來插入或者刪除一個 fd,實現(xiàn)用戶態(tài)到內(nèi)核態(tài)的數(shù)據(jù)拷貝,這確保了每一個 fd 在其生命周期只需要被拷貝一次,而不是每次調(diào)用 epoll_wait 的時候都拷貝一次。epoll_wait 則被設(shè)計成幾乎沒有入?yún)⒌恼{(diào)用,相比 select&poll 需要把全部監(jiān)聽的 fd 集合從用戶態(tài)拷貝至內(nèi)核態(tài)的做法,epoll 的效率就高出了一大截。
在實現(xiàn)上 epoll 采用紅黑樹來存儲所有監(jiān)聽的 fd,而紅黑樹本身插入和刪除性能比較穩(wěn)定,時間復(fù)雜度 O(logN)。通過 epoll_ctl 函數(shù)添加進來的 fd 都會被放在紅黑樹的某個節(jié)點內(nèi),所以,重復(fù)添加是沒有用的。當(dāng)把 fd 添加進來的時候時候會完成關(guān)鍵的一步:該 fd 會與相應(yīng)的設(shè)備(網(wǎng)卡)驅(qū)動程序建立回調(diào)關(guān)系,也就是在內(nèi)核中斷處理程序為它注冊一個回調(diào)函數(shù),在 fd 相應(yīng)的事件觸發(fā)(中斷)之后(設(shè)備就緒了),內(nèi)核就會調(diào)用這個回調(diào)函數(shù),該回調(diào)函數(shù)在內(nèi)核中被稱為:ep_poll_callback ,這個回調(diào)函數(shù)其實就是把這個 fd 添加到 rdllist 這個雙向鏈表(就緒鏈表)中。epoll_wait 實際上就是去檢查 rdllist 雙向鏈表中是否有就緒的 fd,當(dāng) rdllist 為空(無就緒 fd)時掛起當(dāng)前進程,直到 rdllist 非空時進程才被喚醒并返回。
相比于 select&poll 調(diào)用時會將全部監(jiān)聽的 fd 從用戶態(tài)空間拷貝至內(nèi)核態(tài)空間并線性掃描一遍找出就緒的 fd 再返回到用戶態(tài),epoll_wait 則是直接返回已就緒 fd,因此 epoll 的 I/O 性能不會像 select&poll 那樣隨著監(jiān)聽的 fd 數(shù)量增加而出現(xiàn)線性衰減,是一個非常高效的 I/O 事件驅(qū)動技術(shù)。
由于使用 epoll 的 I/O 多路復(fù)用需要用戶進程自己負責(zé) I/O 讀寫,從用戶進程的角度看,讀寫過程是阻塞的,所以 select&poll&epoll 本質(zhì)上都是同步 I/O 模型,而像 Windows 的 IOCP 這一類的異步 I/O,只需要在調(diào)用 WSARecv 或 WSASend 方法讀寫數(shù)據(jù)的時候把用戶空間的內(nèi)存 buffer 提交給 kernel,kernel 負責(zé)數(shù)據(jù)在用戶空間和內(nèi)核空間拷貝,完成之后就會通知用戶進程,整個過程不需要用戶進程參與,所以是真正的異步 I/O。
延伸
另外,我看到有些文章說 epoll 之所以性能高是因為利用了 Linux 的 mmap 內(nèi)存映射讓內(nèi)核和用戶進程共享了一片物理內(nèi)存,用來存放就緒 fd 列表和它們的數(shù)據(jù) buffer,所以用戶進程在 epoll_wait 返回之后用戶進程就可以直接從共享內(nèi)存那里讀取/寫入數(shù)據(jù)了,這讓我很疑惑,因為首先看 epoll_wait 的函數(shù)聲明:
int?epoll_wait(int?epfd,?struct?epoll_event?*events,?int?maxevents,?int?timeout);第二個參數(shù):就緒事件列表,是需要在用戶空間分配內(nèi)存然后再傳給 epoll_wait 的,如果內(nèi)核會用 mmap 設(shè)置共享內(nèi)存,直接傳遞一個指針進去就行了,根本不需要在用戶態(tài)分配內(nèi)存,多此一舉。其次,內(nèi)核和用戶進程通過 mmap 共享內(nèi)存是一件極度危險的事情,內(nèi)核無法確定這塊共享內(nèi)存什么時候會被回收,而且這樣也會賦予用戶進程直接操作內(nèi)核數(shù)據(jù)的權(quán)限和入口,非常容易出現(xiàn)大的系統(tǒng)漏洞,因此一般極少會這么做。所以我很懷疑 epoll 是不是真的在 Linux kernel 里用了 mmap,我就去看了下最新版本(5.3.9)的 Linux kernel 源碼:
/**?Implement?the?event?wait?interface?for?the?eventpoll?file.?It?is?the?kernel*?part?of?the?user?space?epoll_wait(2).*/ static?int?do_epoll_wait(int?epfd,?struct?epoll_event?__user?*events,int?maxevents,?int?timeout) {.../*?Time?to?fish?for?events?...?*/error?=?ep_poll(ep,?events,?maxevents,?timeout); }//?如果?epoll_wait?入?yún)r設(shè)定?timeout?==?0,?那么直接通過?ep_events_available?判斷當(dāng)前是否有用戶感興趣的事件發(fā)生,如果有則通過?ep_send_events?進行處理 //?如果設(shè)置 timeout >?0,并且當(dāng)前沒有用戶關(guān)注的事件發(fā)生,則進行休眠,并添加到 ep->wq 等待隊列的頭部;對等待事件描述符設(shè)置 WQ_FLAG_EXCLUSIVE 標(biāo)志 //?ep_poll?被事件喚醒后會重新檢查是否有關(guān)注事件,如果對應(yīng)的事件已經(jīng)被搶走,那么?ep_poll?會繼續(xù)休眠等待 static?int?ep_poll(struct?eventpoll?*ep,?struct?epoll_event?__user?*events,?int?maxevents,?long?timeout) {...send_events:/**?Try?to?transfer?events?to?user?space.?In?case?we?get?0?events?and*?there's?still?timeout?left?over,?we?go?trying?again?in?search?of*?more?luck.*///?如果一切正常,?有?event?發(fā)生,?就開始準(zhǔn)備數(shù)據(jù)?copy?給用戶空間了//?如果有就緒的事件發(fā)生,那么就調(diào)用?ep_send_events?將就緒的事件?copy?到用戶態(tài)內(nèi)存中,//?然后返回到用戶態(tài),否則判斷是否超時,如果沒有超時就繼續(xù)等待就緒事件發(fā)生,如果超時就返回用戶態(tài)。//?從?ep_poll?函數(shù)的實現(xiàn)可以看到,如果有就緒事件發(fā)生,則調(diào)用?ep_send_events?函數(shù)做進一步處理if?(!res?&&?eavail?&&!(res?=?ep_send_events(ep,?events,?maxevents))?&&?!timed_out)goto?fetch_events;... }//?ep_send_events?函數(shù)是用來向用戶空間拷貝就緒?fd?列表的,它將用戶傳入的就緒?fd?列表內(nèi)存簡單封裝到 // ep_send_events_data 結(jié)構(gòu)中,然后調(diào)用 ep_scan_ready_list 將就緒隊列中的事件寫入用戶空間的內(nèi)存; //?用戶進程就可以訪問到這些數(shù)據(jù)進行處理 static?int?ep_send_events(struct?eventpoll?*ep,struct?epoll_event?__user?*events,?int?maxevents) {struct?ep_send_events_data?esed;esed.maxevents?=?maxevents;esed.events?=?events;//?調(diào)用?ep_scan_ready_list?函數(shù)檢查?epoll?實例?eventpoll?中的?rdllist?就緒鏈表,//?并注冊一個回調(diào)函數(shù)?ep_send_events_proc,如果有就緒?fd,則調(diào)用?ep_send_events_proc?進行處理ep_scan_ready_list(ep,?ep_send_events_proc,?&esed,?0,?false);return?esed.res; }//?調(diào)用?ep_scan_ready_list?的時候會傳遞指向?ep_send_events_proc?函數(shù)的函數(shù)指針作為回調(diào)函數(shù), //?一旦有就緒?fd,就會調(diào)用?ep_send_events_proc?函數(shù) static?__poll_t?ep_send_events_proc(struct?eventpoll?*ep,?struct?list_head?*head,?void?*priv) {.../**?If?the?event?mask?intersect?the?caller-requested?one,*?deliver?the?event?to?userspace.?Again,?ep_scan_ready_list()*?is?holding?ep->mtx,?so?no?operations?coming?from?userspace*?can?change?the?item.*/revents?=?ep_item_poll(epi,?&pt,?1);//?如果?revents?為?0,說明沒有就緒的事件,跳過,否則就將就緒事件拷貝到用戶態(tài)內(nèi)存中if?(!revents)continue;//?將當(dāng)前就緒的事件和用戶進程傳入的數(shù)據(jù)都通過?__put_user?拷貝回用戶空間,//?也就是調(diào)用?epoll_wait?之時用戶進程傳入的?fd?列表的內(nèi)存if?(__put_user(revents,?&uevent->events)?||?__put_user(epi->event.data,?&uevent->data))?{list_add(&epi->rdllink,?head);ep_pm_stay_awake(epi);if?(!esed->res)esed->res?=?-EFAULT;return?0;}... }從 do_epoll_wait 開始層層跳轉(zhuǎn),我們可以很清楚地看到最后內(nèi)核是通過 __put_user 函數(shù)把就緒 fd 列表和事件返回到用戶空間,而 __put_user 正是內(nèi)核用來拷貝數(shù)據(jù)到用戶空間的標(biāo)準(zhǔn)函數(shù)。此外,我并沒有在 Linux kernel 的源碼中和 epoll 相關(guān)的代碼里找到 mmap 系統(tǒng)調(diào)用做內(nèi)存映射的邏輯,所以基本可以得出結(jié)論:epoll 在 Linux kernel 里并沒有使用 mmap 來做用戶空間和內(nèi)核空間的內(nèi)存共享,所以那些說 epoll 使用了 mmap 的文章都是誤解。
Go netpoller 核心
Go netpoller 基本原理
Go netpoller 通過在底層對 epoll/kqueue/iocp 的封裝,從而實現(xiàn)了使用同步編程模式達到異步執(zhí)行的效果。總結(jié)來說,所有的網(wǎng)絡(luò)操作都以網(wǎng)絡(luò)描述符 netFD 為中心實現(xiàn)。netFD 與底層 PollDesc 結(jié)構(gòu)綁定,當(dāng)在一個 netFD 上讀寫遇到 EAGAIN 錯誤時,就將當(dāng)前 goroutine 存儲到這個 netFD 對應(yīng)的 PollDesc 中,同時調(diào)用 gopark 把當(dāng)前 goroutine 給 park 住,直到這個 netFD 上再次發(fā)生讀寫事件,才將此 goroutine 給 ready 激活重新運行。顯然,在底層通知 goroutine 再次發(fā)生讀寫等事件的方式就是 epoll/kqueue/iocp 等事件驅(qū)動機制。
總所周知,Go 是一門跨平臺的編程語言,而不同平臺針對特定的功能有不用的實現(xiàn),這當(dāng)然也包括了 I/O 多路復(fù)用技術(shù),比如 Linux 里的 I/O 多路復(fù)用有 select、poll 和 epoll,而 freeBSD 或者 MacOS 里則是 kqueue,而 Windows 里則是基于異步 I/O 實現(xiàn)的 iocp,等等;因此,Go 為了實現(xiàn)底層 I/O 多路復(fù)用的跨平臺,分別基于上述的這些不同平臺的系統(tǒng)調(diào)用實現(xiàn)了多版本的 netpollers,具體的源碼路徑如下:
src/runtime/netpoll_epoll.go
src/runtime/netpoll_kqueue.go
src/runtime/netpoll_solaris.go
src/runtime/netpoll_windows.go
src/runtime/netpoll_aix.go
src/runtime/netpoll_fake.go
本文的解析基于 epoll 版本,如果讀者對其他平臺的 netpoller 底層實現(xiàn)感興趣,可以在閱讀完本文后自行翻閱其他 netpoller 源碼,所有實現(xiàn)版本的機制和原理基本類似,所以了解了 epoll 版本的實現(xiàn)后再去學(xué)習(xí)其他版本實現(xiàn)應(yīng)該沒什么障礙。
接下來讓我們通過分析最新的 Go 源碼(v1.15.3),全面剖析一下整個 Go netpoller 的運行機制和流程。
數(shù)據(jù)結(jié)構(gòu)
netFD
net.Listen("tcp", ":8888") 方法返回了一個 *TCPListener,它是一個實現(xiàn)了 net.Listener 接口的 struct,而通過 listener.Accept() 接收的新連接 *TCPConn 則是一個實現(xiàn)了 net.Conn 接口的 struct,它內(nèi)嵌了 net.conn struct。仔細閱讀上面的源碼可以發(fā)現(xiàn),不管是 Listener 的 Accept 還是 Conn 的 Read/Write 方法,都是基于一個 netFD 的數(shù)據(jù)結(jié)構(gòu)的操作, netFD 是一個網(wǎng)絡(luò)描述符,類似于 Linux 的文件描述符的概念,netFD 中包含一個 poll.FD 數(shù)據(jù)結(jié)構(gòu),而 poll.FD 中包含兩個重要的數(shù)據(jù)結(jié)構(gòu) Sysfd 和 pollDesc,前者是真正的系統(tǒng)文件描述符,后者對是底層事件驅(qū)動的封裝,所有的讀寫超時等操作都是通過調(diào)用后者的對應(yīng)方法實現(xiàn)的。
netFD 和 poll.FD 的源碼:
//?Network?file?descriptor. type?netFD?struct?{pfd?poll.FD//?immutable?until?Closefamily??????intsotype??????intisConnected?bool?//?handshake?completed?or?use?of?association?with?peernet?????????stringladdr???????Addrraddr???????Addr }//?FD?is?a?file?descriptor.?The?net?and?os?packages?use?this?type?as?a //?field?of?a?larger?type?representing?a?network?connection?or?OS?file. type?FD?struct?{//?Lock?sysfd?and?serialize?access?to?Read?and?Write?methods.fdmu?fdMutex//?System?file?descriptor.?Immutable?until?Close.Sysfd?int//?I/O?poller.pd?pollDesc//?Writev?cache.iovecs?*[]syscall.Iovec//?Semaphore?signaled?when?file?is?closed.csema?uint32//?Non-zero?if?this?file?has?been?set?to?blocking?mode.isBlocking?uint32//?Whether?this?is?a?streaming?descriptor,?as?opposed?to?a//?packet-based?descriptor?like?a?UDP?socket.?Immutable.IsStream?bool//?Whether?a?zero?byte?read?indicates?EOF.?This?is?false?for?a//?message?based?socket?connection.ZeroReadIsEOF?bool//?Whether?this?is?a?file?rather?than?a?network?socket.isFile?bool }pollDesc
前面提到了 pollDesc 是底層事件驅(qū)動的封裝,netFD 通過它來完成各種 I/O 相關(guān)的操作,它的定義如下:
type?pollDesc?struct?{runtimeCtx?uintptr }這里的 struct 只包含了一個指針,而通過 pollDesc 的 init 方法,我們可以找到它具體的定義是在 runtime.pollDesc 這里:
func?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }//?Network?poller?descriptor. // //?No?heap?pointers. // //go:notinheap type?pollDesc?struct?{link?*pollDesc?//?in?pollcache,?protected?by?pollcache.lock//?The?lock?protects?pollOpen,?pollSetDeadline,?pollUnblock?and?deadlineimpl?operations.//?This?fully?covers?seq,?rt?and?wt?variables.?fd?is?constant?throughout?the?PollDesc?lifetime.//?pollReset,?pollWait,?pollWaitCanceled?and?runtime·netpollready?(IO?readiness?notification)//?proceed?w/o?taking?the?lock.?So?closing,?everr,?rg,?rd,?wg?and?wd?are?manipulated//?in?a?lock-free?way?by?all?operations.//?NOTE(dvyukov):?the?following?code?uses?uintptr?to?store?*g?(rg/wg),//?that?will?blow?up?when?GC?starts?moving?objects.lock????mutex?//?protects?the?following?fieldsfd??????uintptrclosing?booleverr???bool????//?marks?event?scanning?error?happeneduser????uint32??//?user?settable?cookierseq????uintptr?//?protects?from?stale?read?timersrg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?read?or?nilrt??????timer???//?read?deadline?timer?(set?if?rt.f?!=?nil)rd??????int64???//?read?deadlinewseq????uintptr?//?protects?from?stale?write?timerswg??????uintptr?//?pdReady,?pdWait,?G?waiting?for?write?or?nilwt??????timer???//?write?deadline?timerwd??????int64???//?write?deadline }這里重點關(guān)注里面的 rg 和 wg,這里兩個 uintptr "萬能指針"類型,取值分別可能是 pdReady、pdWait、等待 file descriptor 就緒的 goroutine 也就是 g 數(shù)據(jù)結(jié)構(gòu)以及 nil,它們是實現(xiàn)喚醒 goroutine 的關(guān)鍵。
runtime.pollDesc 包含自身類型的一個指針,用來保存下一個 runtime.pollDesc 的地址,以此來實現(xiàn)鏈表,可以減少數(shù)據(jù)結(jié)構(gòu)的大小,所有的 runtime.pollDesc 保存在 runtime.pollCache 結(jié)構(gòu)中,定義如下:
type?pollCache?struct?{lock??mutexfirst?*pollDesc//?PollDesc?objects?must?be?type-stable,//?because?we?can?get?ready?notification?from?epoll/kqueue//?after?the?descriptor?is?closed/reused.//?Stale?notifications?are?detected?using?seq?variable,//?seq?is?incremented?when?deadlines?are?changed?or?descriptor?is?reused. }因為 runtime.pollCache 是一個在 runtime 包里的全局變量,因此需要用一個互斥鎖來避免 data race 問題,從它的名字也能看出這是一個用于緩存的數(shù)據(jù)結(jié)構(gòu),也就是用來提高性能的,具體如何實現(xiàn)呢?
const?pollBlockSize?=?4?*?1024func?(c?*pollCache)?alloc()?*pollDesc?{lock(&c.lock)if?c.first?==?nil?{const?pdSize?=?unsafe.Sizeof(pollDesc{})n?:=?pollBlockSize?/?pdSizeif?n?==?0?{n?=?1}//?Must?be?in?non-GC?memory?because?can?be?referenced//?only?from?epoll/kqueue?internals.mem?:=?persistentalloc(n*pdSize,?0,?&memstats.other_sys)for?i?:=?uintptr(0);?i?<?n;?i++?{pd?:=?(*pollDesc)(add(mem,?i*pdSize))pd.link?=?c.firstc.first?=?pd}}pd?:=?c.firstc.first?=?pd.linklockInit(&pd.lock,?lockRankPollDesc)unlock(&c.lock)return?pd }Go runtime 會在調(diào)用 poll_runtime_pollOpen 往 epoll 實例注冊 fd 之時首次調(diào)用 runtime.pollCache.alloc方法時批量初始化大小 4KB 的 runtime.pollDesc 結(jié)構(gòu)體的鏈表,初始化過程中會調(diào)用 runtime.persistentalloc 來為這些數(shù)據(jù)結(jié)構(gòu)分配不會被 GC 回收的內(nèi)存,確保這些數(shù)據(jù)結(jié)構(gòu)只能被 epoll和kqueue 在內(nèi)核空間去引用。
再往后每次調(diào)用這個方法則會先判斷鏈表頭是否已經(jīng)分配過值了,若是,則直接返回表頭這個 pollDesc,這種批量初始化數(shù)據(jù)進行緩存而后每次都直接從緩存取數(shù)據(jù)的方式是一種很常見的性能優(yōu)化手段,在這里這種方式可以有效地提升 netpoller 的吞吐量。
Go runtime 會在關(guān)閉 pollDesc 之時調(diào)用 runtime.pollCache.free 釋放內(nèi)存:
func?(c?*pollCache)?free(pd?*pollDesc)?{lock(&c.lock)pd.link?=?c.firstc.first?=?pdunlock(&c.lock) }實現(xiàn)原理
使用 Go 編寫一個典型的 TCP echo server:
package?mainimport?("log""net" )func?main()?{listen,?err?:=?net.Listen("tcp",?":8888")if?err?!=?nil?{log.Println("listen?error:?",?err)return}for?{conn,?err?:=?listen.Accept()if?err?!=?nil?{log.Println("accept?error:?",?err)break}//?start?a?new?goroutine?to?handle?the?new?connection.go?HandleConn(conn)} }func?HandleConn(conn?net.Conn)?{defer?conn.Close()packet?:=?make([]byte,?1024)for?{//?block?here?if?socket?is?not?available?for?reading?data.n,?err?:=?conn.Read(packet)if?err?!=?nil?{log.Println("read?socket?error:?",?err)return}//?same?as?above,?block?here?if?socket?is?not?available?for?writing._,?_?=?conn.Write(packet[:n])} }上面是一個基于 Go 原生網(wǎng)絡(luò)模型(基于 netpoller)編寫的一個 TCP server,模式是 goroutine-per-connection ,在這種模式下,開發(fā)者使用的是同步的模式去編寫異步的邏輯而且對于開發(fā)者來說 I/O 是否阻塞是無感知的,也就是說開發(fā)者無需考慮 goroutines 甚至更底層的線程、進程的調(diào)度和上下文切換。而 Go netpoller 最底層的事件驅(qū)動技術(shù)肯定是基于 epoll/kqueue/iocp 這一類的 I/O 事件驅(qū)動技術(shù),只不過是把這些調(diào)度和上下文切換的工作轉(zhuǎn)移到了 runtime 的 Go scheduler,讓它來負責(zé)調(diào)度 goroutines,從而極大地降低了程序員的心智負擔(dān)!
Go 的這種同步模式的網(wǎng)絡(luò)服務(wù)器的基本架構(gòu)通常如下:
上面的示例代碼中相關(guān)的在源碼里的幾個數(shù)據(jù)結(jié)構(gòu)和方法:
//?TCPListener?is?a?TCP?network?listener.?Clients?should?typically //?use?variables?of?type?Listener?instead?of?assuming?TCP. type?TCPListener?struct?{fd?*netFDlc?ListenConfig }//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }//?TCPConn?is?an?implementation?of?the?Conn?interface?for?TCP?network //?connections. type?TCPConn?struct?{conn }//?Conn type?conn?struct?{fd?*netFD }type?conn?struct?{fd?*netFD }func?(c?*conn)?ok()?bool?{?return?c?!=?nil?&&?c.fd?!=?nil?}//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }//?Write?implements?the?Conn?Write?method. func?(c?*conn)?Write(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Write(b)if?err?!=?nil?{err?=?&OpError{Op:?"write",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }net.Listen
調(diào)用 net.Listen 之后,底層會通過 Linux 的系統(tǒng)調(diào)用 socket 方法創(chuàng)建一個 fd 分配給 listener,并用以來初始化 listener 的 netFD ,接著調(diào)用 netFD 的 listenStream 方法完成對 socket 的 bind&listen 操作以及對 netFD 的初始化(主要是對 netFD 里的 pollDesc 的初始化),調(diào)用鏈?zhǔn)?runtime.runtime_pollServerInit --> runtime.poll_runtime_pollServerInit --> runtime.netpollGenericInit,主要做的事情是:
調(diào)用 epollcreate1 創(chuàng)建一個 epoll 實例 epfd,作為整個 runtime 的唯一 event-loop 使用;
調(diào)用 runtime.nonblockingPipe 創(chuàng)建一個用于和 epoll 實例通信的管道,這里為什么不用更新且更輕量的 eventfd 呢?我個人猜測是為了兼容更多以及更老的系統(tǒng)版本;
將 netpollBreakRd 通知信號量封裝成 epollevent 事件結(jié)構(gòu)體注冊進 epoll 實例。
相關(guān)源碼如下:
//?調(diào)用?linux?系統(tǒng)調(diào)用?socket?創(chuàng)建?listener?fd?并設(shè)置為為阻塞?I/O s,?err?:=?socketFunc(family,?sotype|syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC,?proto) //?On?Linux?the?SOCK_NONBLOCK?and?SOCK_CLOEXEC?flags?were //?introduced?in?2.6.27?kernel?and?on?FreeBSD?both?flags?were //?introduced?in?10?kernel.?If?we?get?an?EINVAL?error?on?Linux //?or?EPROTONOSUPPORT?error?on?FreeBSD,?fall?back?to?using //?socket?without?them.socketFunc????????func(int,?int,?int)?(int,?error)??=?syscall.Socket//?用上面創(chuàng)建的?listener?fd?初始化?listener?netFD if?fd,?err?=?newFD(s,?family,?sotype,?net);?err?!=?nil?{poll.CloseFunc(s)return?nil,?err }//?對?listener?fd?進行?bind&listen?操作,并且調(diào)用?init?方法完成初始化 func?(fd?*netFD)?listenStream(laddr?sockaddr,?backlog?int,?ctrlFn?func(string,?string,?syscall.RawConn)?error)?error?{...//?完成綁定操作if?err?=?syscall.Bind(fd.pfd.Sysfd,?lsa);?err?!=?nil?{return?os.NewSyscallError("bind",?err)}//?完成監(jiān)聽操作if?err?=?listenFunc(fd.pfd.Sysfd,?backlog);?err?!=?nil?{return?os.NewSyscallError("listen",?err)}//?調(diào)用?init,內(nèi)部會調(diào)用?poll.FD.Init,最后調(diào)用?pollDesc.initif?err?=?fd.init();?err?!=?nil?{return?err}lsa,?_?=?syscall.Getsockname(fd.pfd.Sysfd)fd.setAddr(fd.addrFunc()(lsa),?nil)return?nil }//?使用?sync.Once?來確保一個?listener?只持有一個?epoll?實例 var?serverInit?sync.Once//?netFD.init?會調(diào)用?poll.FD.Init?并最終調(diào)用到?pollDesc.init, //?它會創(chuàng)建?epoll?實例并把?listener?fd?加入監(jiān)聽隊列 func?(pd?*pollDesc)?init(fd?*FD)?error?{//?runtime_pollServerInit?通過?`go:linkname`?鏈接到具體的實現(xiàn)函數(shù)?poll_runtime_pollServerInit,//?接著再調(diào)用?netpollGenericInit,然后會根據(jù)不同的系統(tǒng)平臺去調(diào)用特定的?netpollinit?來創(chuàng)建?epoll?實例serverInit.Do(runtime_pollServerInit)//?runtime_pollOpen?內(nèi)部調(diào)用了?netpollopen?來將?listener?fd?注冊到?//?epoll?實例中,另外,它會初始化一個?pollDesc?并返回ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}//?把真正初始化完成的?pollDesc?實例賦值給當(dāng)前的?pollDesc?代表自身的指針,//?后續(xù)使用直接通過該指針操作pd.runtimeCtx?=?ctxreturn?nil }var?(//?全局唯一的?epoll?fd,只在?listener?fd?初始化之時被指定一次epfd?int32?=?-1?//?epoll?descriptor )//?netpollinit?會創(chuàng)建一個?epoll?實例,然后把?epoll?fd?賦值給?epfd, //?后續(xù)?listener?以及它?accept?的所有?sockets?有關(guān)?epoll?的操作都是基于這個全局的?epfd func?netpollinit()?{epfd?=?epollcreate1(_EPOLL_CLOEXEC)if?epfd?<?0?{epfd?=?epollcreate(1024)if?epfd?<?0?{println("runtime:?epollcreate?failed?with",?-epfd)throw("runtime:?netpollinit?failed")}closeonexec(epfd)}r,?w,?errno?:=?nonblockingPipe()if?errno?!=?0?{println("runtime:?pipe?failed?with",?-errno)throw("runtime:?pipe?failed")}ev?:=?epollevent{events:?_EPOLLIN,}*(**uintptr)(unsafe.Pointer(&ev.data))?=?&netpollBreakRderrno?=?epollctl(epfd,?_EPOLL_CTL_ADD,?r,?&ev)if?errno?!=?0?{println("runtime:?epollctl?failed?with",?-errno)throw("runtime:?epollctl?failed")}netpollBreakRd?=?uintptr(r)netpollBreakWr?=?uintptr(w) }//?netpollopen?會被?runtime_pollOpen?調(diào)用,注冊?fd?到?epoll?實例, //?注意這里使用的是?epoll?的?ET?模式,同時會利用萬能指針把?pollDesc?保存到?epollevent?的一個?8?位的字節(jié)數(shù)組?data?里 func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32?{var?ev?epolleventev.events?=?_EPOLLIN?|?_EPOLLOUT?|?_EPOLLRDHUP?|?_EPOLLET*(**pollDesc)(unsafe.Pointer(&ev.data))?=?pdreturn?-epollctl(epfd,?_EPOLL_CTL_ADD,?int32(fd),?&ev) }我們前面提到的 epoll 的三個基本調(diào)用,Go 在源碼里實現(xiàn)了對那三個調(diào)用的封裝:
#include?<sys/epoll.h>?? int?epoll_create(int?size);?? int?epoll_ctl(int?epfd,?int?op,?int?fd,?struct?epoll_event?*event);?? int?epoll_wait(int?epfd,?struct?epoll_event?*?events,?int?maxevents,?int?timeout);//?Go?對上面三個調(diào)用的封裝 func?netpollinit() func?netpollopen(fd?uintptr,?pd?*pollDesc)?int32 func?netpoll(block?bool)?gListnetFD 就是通過這三個封裝來對 epoll 進行創(chuàng)建實例、注冊 fd 和等待事件操作的。
Listener.Accept()
netpoll accept socket 的工作流程如下:
服務(wù)端的 netFD 在 listen 時會創(chuàng)建 epoll 的實例,并將 listenerFD 加入 epoll 的事件隊列
netFD 在 accept 時將返回的 connFD 也加入 epoll 的事件隊列
netFD 在讀寫時出現(xiàn) syscall.EAGAIN 錯誤,通過 pollDesc 的 waitRead 方法將當(dāng)前的 goroutine park 住,直到 ready,從 pollDesc 的 waitRead 中返回
Listener.Accept() 接收來自客戶端的新連接,具體還是調(diào)用 netFD.accept 方法來完成這個功能:
//?Accept?implements?the?Accept?method?in?the?Listener?interface;?it //?waits?for?the?next?call?and?returns?a?generic?Conn. func?(l?*TCPListener)?Accept()?(Conn,?error)?{if?!l.ok()?{return?nil,?syscall.EINVAL}c,?err?:=?l.accept()if?err?!=?nil?{return?nil,?&OpError{Op:?"accept",?Net:?l.fd.net,?Source:?nil,?Addr:?l.fd.laddr,?Err:?err}}return?c,?nil }func?(ln?*TCPListener)?accept()?(*TCPConn,?error)?{fd,?err?:=?ln.fd.accept()if?err?!=?nil?{return?nil,?err}tc?:=?newTCPConn(fd)if?ln.lc.KeepAlive?>=?0?{setKeepAlive(fd,?true)ka?:=?ln.lc.KeepAliveif?ln.lc.KeepAlive?==?0?{ka?=?defaultTCPKeepAlive}setKeepAlivePeriod(fd,?ka)}return?tc,?nil }func?(fd?*netFD)?accept()?(netfd?*netFD,?err?error)?{//?調(diào)用?poll.FD?的?Accept?方法接受新的?socket?連接,返回?socket?的?fdd,?rsa,?errcall,?err?:=?fd.pfd.Accept()if?err?!=?nil?{if?errcall?!=?""?{err?=?wrapSyscallError(errcall,?err)}return?nil,?err}//?以?socket?fd?構(gòu)造一個新的?netFD,代表這個新的?socketif?netfd,?err?=?newFD(d,?fd.family,?fd.sotype,?fd.net);?err?!=?nil?{poll.CloseFunc(d)return?nil,?err}//?調(diào)用?netFD?的?init?方法完成初始化if?err?=?netfd.init();?err?!=?nil?{fd.Close()return?nil,?err}lsa,?_?:=?syscall.Getsockname(netfd.pfd.Sysfd)netfd.setAddr(netfd.addrFunc()(lsa),?netfd.addrFunc()(rsa))return?netfd,?nil }netFD.accept 方法里會再調(diào)用 poll.FD.Accept ,最后會使用 Linux 的系統(tǒng)調(diào)用 accept 來完成新連接的接收,并且會把 accept 的 socket 設(shè)置成非阻塞 I/O 模式:
//?Accept?wraps?the?accept?network?call. func?(fd?*FD)?Accept()?(int,?syscall.Sockaddr,?string,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?-1,?nil,?"",?err}defer?fd.readUnlock()if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?-1,?nil,?"",?err}for?{//?使用?linux?系統(tǒng)調(diào)用?accept?接收新連接,創(chuàng)建對應(yīng)的?sockets,?rsa,?errcall,?err?:=?accept(fd.Sysfd)//?因為?listener?fd?在創(chuàng)建的時候已經(jīng)設(shè)置成非阻塞的了,//?所以 accept 方法會直接返回,不管有沒有新連接到來;如果 err == nil 則表示正常建立新連接,直接返回if?err?==?nil?{return?s,?rsa,?"",?err}//?如果?err?!=?nil,則判斷?err?==?syscall.EAGAIN,符合條件則進入?pollDesc.waitRead?方法switch?err?{case?syscall.EAGAIN:if?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?會通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}case?syscall.ECONNABORTED://?This?means?that?a?socket?on?the?listen//?queue?was?closed?before?we?Accept()ed?it;//?it's?a?silly?error,?so?try?again.continue}return?-1,?nil,?errcall,?err} }//?使用?linux?的?accept?系統(tǒng)調(diào)用接收新連接并把這個?socket?fd?設(shè)置成非阻塞?I/O ns,?sa,?err?:=?Accept4Func(s,?syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC) //?On?Linux?the?accept4?system?call?was?introduced?in?2.6.28 //?kernel?and?on?FreeBSD?it?was?introduced?in?10?kernel.?If?we //?get?an?ENOSYS?error?on?both?Linux?and?FreeBSD,?or?EINVAL //?error?on?Linux,?fall?back?to?using?accept.//?Accept4Func?is?used?to?hook?the?accept4?call. var?Accept4Func?func(int,?int)?(int,?syscall.Sockaddr,?error)?=?syscall.Accept4pollDesc.waitRead 方法主要負責(zé)檢測當(dāng)前這個 pollDesc 的上層 netFD 對應(yīng)的 fd 是否有『期待的』I/O 事件發(fā)生,如果有就直接返回,否則就 park 住當(dāng)前的 goroutine 并持續(xù)等待直至對應(yīng)的 fd 上發(fā)生可讀/可寫或者其他『期待的』I/O 事件為止,然后它就會返回到外層的 for 循環(huán),讓 goroutine 繼續(xù)執(zhí)行邏輯。
poll.FD.Accept() 返回之后,會構(gòu)造一個對應(yīng)這個新 socket 的 netFD,然后調(diào)用 init() 方法完成初始化,這個 init 過程和前面 net.Listen() 是一樣的,調(diào)用鏈:netFD.init() --> poll.FD.Init() --> poll.pollDesc.init(),最終又會走到這里:
var?serverInit?sync.Oncefunc?(pd?*pollDesc)?init(fd?*FD)?error?{serverInit.Do(runtime_pollServerInit)ctx,?errno?:=?runtime_pollOpen(uintptr(fd.Sysfd))if?errno?!=?0?{if?ctx?!=?0?{runtime_pollUnblock(ctx)runtime_pollClose(ctx)}return?syscall.Errno(errno)}pd.runtimeCtx?=?ctxreturn?nil }然后把這個 socket fd 注冊到 listener 的 epoll 實例的事件隊列中去,等待 I/O 事件。
Conn.Read/Conn.Write
我們先來看看 Conn.Read 方法是如何實現(xiàn)的,原理其實和 Listener.Accept 是一樣的,具體調(diào)用鏈還是首先調(diào)用 conn 的 netFD.Read ,然后內(nèi)部再調(diào)用 poll.FD.Read ,最后使用 Linux 的系統(tǒng)調(diào)用 read: syscall.Read 完成數(shù)據(jù)讀取:
//?Implementation?of?the?Conn?interface.//?Read?implements?the?Conn?Read?method. func?(c?*conn)?Read(b?[]byte)?(int,?error)?{if?!c.ok()?{return?0,?syscall.EINVAL}n,?err?:=?c.fd.Read(b)if?err?!=?nil?&&?err?!=?io.EOF?{err?=?&OpError{Op:?"read",?Net:?c.fd.net,?Source:?c.fd.laddr,?Addr:?c.fd.raddr,?Err:?err}}return?n,?err }func?(fd?*netFD)?Read(p?[]byte)?(n?int,?err?error)?{n,?err?=?fd.pfd.Read(p)runtime.KeepAlive(fd)return?n,?wrapSyscallError("read",?err) }//?Read?implements?io.Reader. func?(fd?*FD)?Read(p?[]byte)?(int,?error)?{if?err?:=?fd.readLock();?err?!=?nil?{return?0,?err}defer?fd.readUnlock()if?len(p)?==?0?{//?If?the?caller?wanted?a?zero?byte?read,?return?immediately//?without?trying?(but?after?acquiring?the?readLock).//?Otherwise?syscall.Read?returns?0,?nil?which?looks?like//?io.EOF.//?TODO(bradfitz):?make?it?wait?for?readability??(Issue?15735)return?0,?nil}if?err?:=?fd.pd.prepareRead(fd.isFile);?err?!=?nil?{return?0,?err}if?fd.IsStream?&&?len(p)?>?maxRW?{p?=?p[:maxRW]}for?{//?嘗試從該?socket?讀取數(shù)據(jù),因為?socket?在被?listener?accept?的時候設(shè)置成//?了非阻塞?I/O,所以這里同樣也是直接返回,不管有沒有可讀的數(shù)據(jù)n,?err?:=?syscall.Read(fd.Sysfd,?p)if?err?!=?nil?{n?=?0//?err?==?syscall.EAGAIN?表示當(dāng)前沒有期待的?I/O?事件發(fā)生,也就是?socket?不可讀if?err?==?syscall.EAGAIN?&&?fd.pd.pollable()?{//?如果當(dāng)前沒有發(fā)生期待的?I/O?事件,那么?waitRead?//?會通過?park?goroutine?讓邏輯?block?在這里if?err?=?fd.pd.waitRead(fd.isFile);?err?==?nil?{continue}}//?On?MacOS?we?can?see?EINTR?here?if?the?user//?pressed?^Z.??See?issue?#22838.if?runtime.GOOS?==?"darwin"?&&?err?==?syscall.EINTR?{continue}}err?=?fd.eofError(n,?err)return?n,?err} }conn.Write 和 conn.Read 的原理是一致的,它也是通過類似 pollDesc.waitRead 的 pollDesc.waitWrite 來 park 住 goroutine 直至期待的 I/O 事件發(fā)生才返回恢復(fù)執(zhí)行。
pollDesc.waitRead/pollDesc.waitWrite
pollDesc.waitRead 內(nèi)部調(diào)用了 poll.runtime_pollWait --> runtime.poll_runtime_pollWait 來達成無 I/O 事件時 park 住 goroutine 的目的:
//go:linkname?poll_runtime_pollWait?internal/poll.runtime_pollWait func?poll_runtime_pollWait(pd?*pollDesc,?mode?int)?int?{err?:=?netpollcheckerr(pd,?int32(mode))if?err?!=?pollNoError?{return?err}//?As?for?now?only?Solaris,?illumos,?and?AIX?use?level-triggered?IO.if?GOOS?==?"solaris"?||?GOOS?==?"illumos"?||?GOOS?==?"aix"?{netpollarm(pd,?mode)}//?進入?netpollblock?并且判斷是否有期待的?I/O?事件發(fā)生,//?這里的?for?循環(huán)是為了一直等到?io?readyfor?!netpollblock(pd,?int32(mode),?false)?{err?=?netpollcheckerr(pd,?int32(mode))if?err?!=?0?{return?err}//?Can?happen?if?timeout?has?fired?and?unblocked?us,//?but?before?we?had?a?chance?to?run,?timeout?has?been?reset.//?Pretend?it?has?not?happened?and?retry.}return?0 }//?returns?true?if?IO?is?ready,?or?false?if?timedout?or?closed //?waitio?-?wait?only?for?completed?IO,?ignore?errors func?netpollblock(pd?*pollDesc,?mode?int32,?waitio?bool)?bool?{//?gpp?保存的是?goroutine?的數(shù)據(jù)結(jié)構(gòu)?g,這里會根據(jù)?mode?的值決定是?rg?還是?wg,//?前面提到過,rg?和?wg?是用來保存等待?I/O?就緒的?gorouine?的,后面調(diào)用?gopark?之后,//?會把當(dāng)前的?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入?gpp?這個指針,也就是?rg?或者?wggpp?:=?&pd.rgif?mode?==?'w'?{gpp?=?&pd.wg}//?set?the?gpp?semaphore?to?WAIT//?這個?for?循環(huán)是為了等待?io?ready?或者?io?waitfor?{old?:=?*gpp//?gpp?==?pdReady?表示此時已有期待的?I/O?事件發(fā)生,//?可以直接返回?unblock?當(dāng)前?goroutine?并執(zhí)行響應(yīng)的?I/O?操作if?old?==?pdReady?{*gpp?=?0return?true}if?old?!=?0?{throw("runtime:?double?wait")}//?如果沒有期待的?I/O?事件發(fā)生,則通過原子操作把?gpp?的值置為?pdWait?并退出?for?循環(huán)if?atomic.Casuintptr(gpp,?0,?pdWait)?{break}}//?need?to?recheck?error?states?after?setting?gpp?to?WAIT//?this?is?necessary?because?runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl//?do?the?opposite:?store?to?closing/rd/wd,?membarrier,?load?of?rg/wg//?waitio?此時是?false,netpollcheckerr?方法會檢查當(dāng)前?pollDesc?對應(yīng)的?fd?是否是正常的,//?通常來說??netpollcheckerr(pd,?mode)?==?0?是成立的,所以這里會執(zhí)行?gopark?//?把當(dāng)前?goroutine?給?park?住,直至對應(yīng)的?fd?上發(fā)生可讀/可寫或者其他『期待的』I/O?事件為止,//?然后?unpark?返回,在?gopark?內(nèi)部會把當(dāng)前?goroutine?的抽象數(shù)據(jù)結(jié)構(gòu)?g?存入//?gpp(pollDesc.rg/pollDesc.wg)?指針里,以便在后面的?netpoll?函數(shù)取出?pollDesc?之后,//?把?g?添加到鏈表里返回,接著重新調(diào)度?goroutineif?waitio?||?netpollcheckerr(pd,?mode)?==?0?{//?注冊?netpollblockcommit?回調(diào)給?gopark,在?gopark?內(nèi)部會執(zhí)行它,保存當(dāng)前?goroutine?到?gppgopark(netpollblockcommit,?unsafe.Pointer(gpp),?waitReasonIOWait,?traceEvGoBlockNet,?5)}//?be?careful?to?not?lose?concurrent?READY?notificationold?:=?atomic.Xchguintptr(gpp,?0)if?old?>?pdWait?{throw("runtime:?corrupted?polldesc")}return?old?==?pdReady }//?gopark?會停住當(dāng)前的?goroutine?并且調(diào)用傳遞進來的回調(diào)函數(shù)?unlockf,從上面的源碼我們可以知道這個函數(shù)是 //?netpollblockcommit func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.//?gopark?最終會調(diào)用?park_m,在這個函數(shù)內(nèi)部會調(diào)用?unlockf,也就是?netpollblockcommit,//?然后會把當(dāng)前的?goroutine,也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里mcall(park_m) }//?park?continuation?on?g0. func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{//?調(diào)用?netpollblockcommit,把當(dāng)前的?goroutine,//?也就是?g?數(shù)據(jù)結(jié)構(gòu)保存到?pollDesc?的?rg?或者?wg?指針里ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }//?netpollblockcommit?在?gopark?函數(shù)里被調(diào)用 func?netpollblockcommit(gp?*g,?gpp?unsafe.Pointer)?bool?{//?通過原子操作把當(dāng)前?goroutine?抽象的數(shù)據(jù)結(jié)構(gòu)?g,也就是這里的參數(shù)?gp?存入?gpp?指針,//?此時?gpp?的值是?pollDesc?的?rg?或者?wg?指針r?:=?atomic.Casuintptr((*uintptr)(gpp),?pdWait,?uintptr(unsafe.Pointer(gp)))if?r?{//?Bump?the?count?of?goroutines?waiting?for?the?poller.//?The?scheduler?uses?this?to?decide?whether?to?block//?waiting?for?the?poller?if?there?is?nothing?else?to?do.atomic.Xadd(&netpollWaiters,?1)}return?r }pollDesc.waitWrite 的內(nèi)部實現(xiàn)原理和 pollDesc.waitRead 是一樣的,都是基于 poll.runtime_pollWait --> runtime.poll_runtime_pollWait,這里就不再贅述。
netpoll
前面已經(jīng)從源碼的層面分析完了 netpoll 是如何通過 park goroutine 從而達到阻塞 Accept/Read/Write 的效果,而通過調(diào)用 gopark,goroutine 會被放置在某個等待隊列中,這里是放到了 epoll 的 "interest list" 里,底層數(shù)據(jù)結(jié)構(gòu)是由紅黑樹實現(xiàn)的 ?eventpoll.rbr,此時 G 的狀態(tài)由 _Grunning為_Gwaitting ,因此 G 必須被手動喚醒(通過 goready ),否則會丟失任務(wù),應(yīng)用層阻塞通常使用這種方式。
所以我們現(xiàn)在可以來從整體的層面來概括 Go 的網(wǎng)絡(luò)業(yè)務(wù) goroutine 是如何被規(guī)劃調(diào)度的了:
首先,client 連接 server 的時候,listener 通過 accept 調(diào)用接收新 connection,每一個新 connection 都啟動一個 goroutine 處理,accept 調(diào)用會把該 connection 的 fd 連帶所在的 goroutine 上下文信息封裝注冊到 epoll 的監(jiān)聽列表里去,當(dāng) goroutine 調(diào)用 conn.Read 或者 conn.Write 等需要阻塞等待的函數(shù)時,會被 gopark 給封存起來并使之休眠,讓 P 去執(zhí)行本地調(diào)度隊列里的下一個可執(zhí)行的 goroutine,往后 Go scheduler 會在循環(huán)調(diào)度的 runtime.schedule() 函數(shù)以及 sysmon 監(jiān)控線程中調(diào)用 runtime.nepoll 以獲取可運行的 goroutine 列表并通過調(diào)用 injectglist 把剩下的 g 放入全局調(diào)度隊列或者當(dāng)前 P 本地調(diào)度隊列去重新執(zhí)行。
那么當(dāng) I/O 事件發(fā)生之后,netpoller 是通過什么方式喚醒那些在 I/O wait 的 goroutine 的?答案是通過 runtime.netpoll。
runtime.netpoll 的核心邏輯是:
根據(jù)調(diào)用方的入?yún)?delay,設(shè)置對應(yīng)的調(diào)用 epollwait 的 timeout 值;
調(diào)用 epollwait 等待發(fā)生了可讀/可寫事件的 fd;
循環(huán) epollwait 返回的事件列表,處理對應(yīng)的事件類型, 組裝可運行的 goroutine 鏈表并返回。
Go 在多種場景下都可能會調(diào)用 netpoll 檢查文件描述符狀態(tài),netpoll 里會調(diào)用 epoll_wait 從 epoll 的 eventpoll.rdllist 就緒雙向鏈表返回,從而得到 I/O 就緒的 socket fd 列表,并根據(jù)取出最初調(diào)用 epoll_ctl 時保存的上下文信息,恢復(fù) g。所以執(zhí)行完netpoll 之后,會返回一個就緒 fd 列表對應(yīng)的 goroutine 鏈表,接下來將就緒的 goroutine 通過調(diào)用 injectglist 加入到全局調(diào)度隊列或者 P 的本地調(diào)度隊列中,啟動 M 綁定 P 去執(zhí)行。
具體調(diào)用 netpoll 的地方,首先在 Go runtime scheduler 循環(huán)調(diào)度 goroutines 之時就有可能會調(diào)用 netpoll 獲取到已就緒的 fd 對應(yīng)的 goroutine 來調(diào)度執(zhí)行。
首先 Go scheduler 的核心方法 runtime.schedule() 里會調(diào)用一個叫 runtime.findrunable() 的方法獲取可運行的 goroutine 來執(zhí)行,而在 runtime.findrunable() 方法里就調(diào)用了 runtime.netpoll 獲取已就緒的 fd 列表對應(yīng)的 goroutine 列表:
//?One?round?of?scheduler:?find?a?runnable?goroutine?and?execute?it. //?Never?returns. func?schedule()?{...if?gp?==?nil?{gp,?inheritTime?=?findrunnable()?//?blocks?until?work?is?available}... }//?Finds?a?runnable?goroutine?to?execute. //?Tries?to?steal?from?other?P's,?get?g?from?global?queue,?poll?network. func?findrunnable()?(gp?*g,?inheritTime?bool)?{...//?Poll?network.if?netpollinited()?&&?(atomic.Load(&netpollWaiters)?>?0?||?pollUntil?!=?0)?&&?atomic.Xchg64(&sched.lastpoll,?0)?!=?0?{atomic.Store64(&sched.pollUntil,?uint64(pollUntil))if?_g_.m.p?!=?0?{throw("findrunnable:?netpoll?with?p")}if?_g_.m.spinning?{throw("findrunnable:?netpoll?with?spinning")}if?faketime?!=?0?{//?When?using?fake?time,?just?poll.delta?=?0}list?:=?netpoll(delta)?//?同步阻塞調(diào)用?netpoll,直至有可用的?goroutineatomic.Store64(&sched.pollUntil,?0)atomic.Store64(&sched.lastpoll,?uint64(nanotime()))if?faketime?!=?0?&&?list.empty()?{//?Using?fake?time?and?nothing?is?ready;?stop?M.//?When?all?M's?stop,?checkdead?will?call?timejump.stopm()goto?top}lock(&sched.lock)_p_?=?pidleget()?//?查找是否有空閑的?P?可以來就緒的?goroutineunlock(&sched.lock)if?_p_?==?nil?{injectglist(&list)?//?如果當(dāng)前沒有空閑的?P,則把就緒的?goroutine?放入全局調(diào)度隊列等待被執(zhí)行}?else?{//?如果當(dāng)前有空閑的?P,則?pop?出一個?g,返回給調(diào)度器去執(zhí)行,//?并通過調(diào)用?injectglist?把剩下的?g?放入全局調(diào)度隊列或者當(dāng)前?P?本地調(diào)度隊列acquirep(_p_)if?!list.empty()?{gp?:=?list.pop()injectglist(&list)casgstatus(gp,?_Gwaiting,?_Grunnable)if?trace.enabled?{traceGoUnpark(gp,?0)}return?gp,?false}if?wasSpinning?{_g_.m.spinning?=?trueatomic.Xadd(&sched.nmspinning,?1)}goto?top}}?else?if?pollUntil?!=?0?&&?netpollinited()?{pollerPollUntil?:=?int64(atomic.Load64(&sched.pollUntil))if?pollerPollUntil?==?0?||?pollerPollUntil?>?pollUntil?{netpollBreak()}}stopm()goto?top }另外, sysmon 監(jiān)控線程會在循環(huán)過程中檢查距離上一次 runtime.netpoll 被調(diào)用是否超過了 10ms,若是則會去調(diào)用它拿到可運行的 goroutine 列表并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊列或者當(dāng)前 P 本地調(diào)度隊列等待被執(zhí)行:
//?Always?runs?without?a?P,?so?write?barriers?are?not?allowed. // //go:nowritebarrierrec func?sysmon()?{...//?poll?network?if?not?polled?for?more?than?10mslastpoll?:=?int64(atomic.Load64(&sched.lastpoll))if?netpollinited()?&&?lastpoll?!=?0?&&?lastpoll+10*1000*1000?<?now?{atomic.Cas64(&sched.lastpoll,?uint64(lastpoll),?uint64(now))list?:=?netpoll(0)?//?non-blocking?-?returns?list?of?goroutinesif?!list.empty()?{//?Need?to?decrement?number?of?idle?locked?M's//?(pretending?that?one?more?is?running)?before?injectglist.//?Otherwise?it?can?lead?to?the?following?situation://?injectglist?grabs?all?P's?but?before?it?starts?M's?to?run?the?P's,//?another?M?returns?from?syscall,?finishes?running?its?G,//?observes?that?there?is?no?work?to?do?and?no?other?running?M's//?and?reports?deadlock.incidlelocked(-1)injectglist(&list)incidlelocked(1)}}... }Go runtime 在程序啟動的時候會創(chuàng)建一個獨立的 M 作為監(jiān)控線程,叫 sysmon ,這個線程為系統(tǒng)級的 daemon 線程,無需 P 即可運行, sysmon 每 20us~10ms 運行一次。sysmon 中以輪詢的方式執(zhí)行以下操作(如上面的代碼所示):
以非阻塞的方式調(diào)用 runtime.netpoll ,從中找出能從網(wǎng)絡(luò) I/O 中喚醒的 g 列表,并通過調(diào)用 injectglist 把 g 列表放入全局調(diào)度隊列或者當(dāng)前 P 本地調(diào)度隊列等待被執(zhí)行,調(diào)度觸發(fā)時,有可能從這個全局 runnable 調(diào)度隊列獲取 g。然后再循環(huán)調(diào)用 startm ,直到所有 P 都不處于 _Pidle 狀態(tài)。
調(diào)用 retake ,搶占長時間處于 _Psyscall 狀態(tài)的 P。
綜上,Go 借助于 epoll/kqueue/iocp 和 runtime scheduler 等的幫助,設(shè)計出了自己的 I/O 多路復(fù)用 netpoller,成功地讓 Listener.Accept / conn.Read / conn.Write 等方法從開發(fā)者的角度看來是同步模式。
Go netpoller 的價值
通過前面對源碼的分析,我們現(xiàn)在知道 Go netpoller 依托于 runtime scheduler,為開發(fā)者提供了一種強大的同步網(wǎng)絡(luò)編程模式;然而,Go netpoller 存在的意義卻遠不止于此,Go netpoller I/O 多路復(fù)用搭配 Non-blocking I/O 而打造出來的這個原生網(wǎng)絡(luò)模型,它最大的價值是把網(wǎng)絡(luò) I/O 的控制權(quán)牢牢掌握在 Go 自己的 runtime 里,關(guān)于這一點我們需要從 Go 的 runtime scheduler 說起,Go 的 G-P-M 調(diào)度模型如下:
G 在運行過程中如果被阻塞在某個 system call 操作上,那么不光 G 會阻塞,執(zhí)行該 G 的 M 也會解綁 P(實質(zhì)是被 sysmon 搶走了),與 G 一起進入 sleep 狀態(tài)。如果此時有 idle 的 M,則 P 與其綁定繼續(xù)執(zhí)行其他 G;如果沒有 idle M,但仍然有其他 G 要去執(zhí)行,那么就會創(chuàng)建一個新的 M。當(dāng)阻塞在 system call 上的 G 完成 syscall 調(diào)用后,G 會去嘗試獲取一個可用的 P,如果沒有可用的 P,那么 G 會被標(biāo)記為 _Grunnable 并把它放入全局的 runqueue 中等待調(diào)度,之前的那個 sleep 的 M 將再次進入 sleep。
現(xiàn)在清楚為什么 netpoll 為什么一定要使用非阻塞 I/O 了吧?就是為了避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進入內(nèi)核態(tài),因為一旦進入內(nèi)核態(tài),整個程序的控制權(quán)就會發(fā)生轉(zhuǎn)移(到內(nèi)核),不再屬于用戶進程了,那么也就無法借助于 Go 強大的 runtime scheduler 來調(diào)度業(yè)務(wù)程序的并發(fā)了;而有了 netpoll 之后,借助于非阻塞 I/O ,G 就再也不會因為系統(tǒng)調(diào)用的讀寫而 (長時間) 陷入內(nèi)核態(tài),當(dāng) G 被阻塞在某個 network I/O 操作上時,實際上它不是因為陷入內(nèi)核態(tài)被阻塞住了,而是被 Go runtime 調(diào)用 gopark 給 park 住了,此時 G 會被放置到某個 wait queue 中,而 M 會嘗試運行下一個 _Grunnable 的 G,如果此時沒有 _Grunnable 的 G 供 M 運行,那么 M 將解綁 P,并進入 sleep 狀態(tài)。當(dāng) I/O available,在 epoll 的 eventpoll.rdr 中等待的 G 會被放到 eventpoll.rdllist 鏈表里并通過 netpoll 中的 epoll_wait 系統(tǒng)調(diào)用返回放置到全局調(diào)度隊列或者 P 的本地調(diào)度隊列,標(biāo)記為 _Grunnable ,等待 P 綁定 M 恢復(fù)執(zhí)行。
Goroutine 的調(diào)度
這一小節(jié)主要是講處理網(wǎng)絡(luò) I/O 的 goroutines 阻塞之后,Go scheduler 具體是如何像前面幾個章節(jié)所說的那樣,避免讓操作網(wǎng)絡(luò) I/O 的 goroutine 陷入到系統(tǒng)調(diào)用從而進入內(nèi)核態(tài)的,而是封存 goroutine 然后讓出 CPU 的使用權(quán)從而令 P 可以去調(diào)度本地調(diào)度隊列里的下一個 goroutine 的。
溫馨提示:這一小節(jié)屬于延伸閱讀,涉及到的知識點更偏系統(tǒng)底層,需要有一定的匯編語言基礎(chǔ)才能通讀,另外,這一節(jié)對 Go scheduler 的講解僅僅涉及核心的一部分,不會把整個調(diào)度器都講一遍(事實上如果真要解析 Go scheduler 的話恐怕重開一篇幾萬字的文章才能基本講清楚。。。),所以也要求讀者對 Go 的并發(fā)調(diào)度器有足夠的了解,因此這一節(jié)可能會稍顯深奧。當(dāng)然這一節(jié)也可選擇不讀,因為通過前面的整個解析,我相信讀者應(yīng)該已經(jīng)能夠基本掌握 Go netpoller 處理網(wǎng)絡(luò) I/O 的核心細節(jié)了,以及能從宏觀層面了解 netpoller 對業(yè)務(wù) goroutines 的基本調(diào)度了。而這一節(jié)主要是通過對 goroutines 調(diào)度細節(jié)的剖析,能夠加深讀者對整個 Go netpoller 的徹底理解,接上前面幾個章節(jié),形成一個完整的閉環(huán)。如果對調(diào)度的底層細節(jié)沒興趣的話這也可以直接跳過這一節(jié),對理解 Go netpoller 的基本原理影響不大,不過還是建議有條件的讀者可以看看。
從源碼可知,Go scheduler 的調(diào)度 goroutine 過程中所調(diào)用的核心函數(shù)鏈如下:
runtime.schedule?-->?runtime.execute?-->?runtime.gogo?-->?goroutine?code?-->?runtime.goexit?-->?runtime.goexit1?-->?runtime.mcall?-->?runtime.goexit0?-->?runtime.scheduleGo scheduler 會不斷循環(huán)調(diào)用 runtime.schedule() 去調(diào)度 goroutines,而每個 goroutine 執(zhí)行完成并退出之后,會再次調(diào)用 runtime.schedule(),使得調(diào)度器回到調(diào)度循環(huán)去執(zhí)行其他的 goroutine,不斷循環(huán),永不停歇。
當(dāng)我們使用 go 關(guān)鍵字啟動一個新 goroutine 時,最終會調(diào)用 runtime.newproc --> runtime.newproc1,來得到 g,runtime.newproc1 會先從 P 的 gfree 緩存鏈表中查找可用的 g,若緩存未生效,則會新創(chuàng)建 g 給當(dāng)前的業(yè)務(wù)函數(shù),最后這個 g 會被傳給 runtime.gogo 去真正執(zhí)行。
這里首先需要了解一個 gobuf 的結(jié)構(gòu)體,它用來保存 goroutine 的調(diào)度信息,是 runtime.gogo 的入?yún)?#xff1a;
//?gobuf?存儲?goroutine?調(diào)度上下文信息的結(jié)構(gòu)體 type?gobuf?struct?{//?The?offsets?of?sp,?pc,?and?g?are?known?to?(hard-coded?in)?libmach.////?ctxt?is?unusual?with?respect?to?GC:?it?may?be?a//?heap-allocated?funcval,?so?GC?needs?to?track?it,?but?it//?needs?to?be?set?and?cleared?from?assembly,?where?it's//?difficult?to?have?write?barriers.?However,?ctxt?is?really?a//?saved,?live?register,?and?we?only?ever?exchange?it?between//?the?real?register?and?the?gobuf.?Hence,?we?treat?it?as?a//?root?during?stack?scanning,?which?means?assembly?that?saves//?and?restores?it?doesn't?need?write?barriers.?It's?still//?typed?as?a?pointer?so?that?any?other?writes?from?Go?get//?write?barriers.sp???uintptr?//?Stack?Pointer?棧指針pc???uintptr?//?Program?Counter?程序計數(shù)器g????guintptr?//?持有當(dāng)前?gobuf?的?goroutinectxt?unsafe.Pointerret??sys.Uintreglr???uintptrbp???uintptr?//?for?GOEXPERIMENT=framepointer }執(zhí)行 runtime.execute(),進而調(diào)用 runtime.gogo:
func?execute(gp?*g,?inheritTime?bool)?{_g_?:=?getg()//?Assign?gp.m?before?entering?_Grunning?so?running?Gs?have?an//?M._g_.m.curg?=?gpgp.m?=?_g_.mcasgstatus(gp,?_Grunnable,?_Grunning)gp.waitsince?=?0gp.preempt?=?falsegp.stackguard0?=?gp.stack.lo?+?_StackGuardif?!inheritTime?{_g_.m.p.ptr().schedtick++}//?Check?whether?the?profiler?needs?to?be?turned?on?or?off.hz?:=?sched.profilehzif?_g_.m.profilehz?!=?hz?{setThreadCPUProfiler(hz)}if?trace.enabled?{//?GoSysExit?has?to?happen?when?we?have?a?P,?but?before?GoStart.//?So?we?emit?it?here.if?gp.syscallsp?!=?0?&&?gp.sysblocktraced?{traceGoSysExit(gp.sysexitticks)}traceGoStart()}//?gp.sched?就是?gobufgogo(&gp.sched) }這里還需要了解一個概念:g0,Go G-P-M 調(diào)度模型中,g 代表 goroutine,而實際上一共有三種 g:
執(zhí)行用戶代碼的 g;
執(zhí)行調(diào)度器代碼的 g,也即是 g0;
執(zhí)行 runtime.main 初始化工作的 main goroutine;
第一種 g 就是使用 go 關(guān)鍵字啟動的 goroutine,也是我們接觸最多的一類 g;第三種 g 是調(diào)度器啟動之后用來執(zhí)行的一系列初始化工作的,包括但不限于啟動 sysmon 監(jiān)控線程、內(nèi)存初始化和啟動 GC 等等工作;第二種 g 叫 g0,用來執(zhí)行調(diào)度器代碼,g0 在底層和其他 g 是一樣的數(shù)據(jù)結(jié)構(gòu),但是性質(zhì)上有很大的區(qū)別,首先 g0 的棧大小是固定的,比如在 Linux 或者其他 Unix-like 的系統(tǒng)上一般是固定 8MB,不能動態(tài)伸縮,而普通的 g 初始棧大小是 2KB,可按需擴展,g0 其實就是線程棧,我們知道每個線程被創(chuàng)建出來之時都需要操作系統(tǒng)為之分配一個初始固定的線程棧,就是前面說的 8MB 大小的棧,g0 棧就代表了這個線程棧,因此每一個 m 都需要綁定一個 g0 來執(zhí)行調(diào)度器代碼,然后跳轉(zhuǎn)到執(zhí)行用戶代碼的地方。
runtime.gogo 是真正去執(zhí)行 goroutine 代碼的函數(shù),這個函數(shù)由匯編實現(xiàn),為什么需要用匯編?因為 gogo 的工作是完成線程 M 上的堆棧切換:從系統(tǒng)堆棧 g0 切換成 goroutine gp,也就是 CPU 使用權(quán)和堆棧的切換,這種切換本質(zhì)上是對 CPU 的 PC、SP 等寄存器和堆棧指針的更新,而這一類精度的底層操作別說是 Go,就算是最貼近底層的 C 也無法做到,這種程度的操作已超出所有高級語言的范疇,因此只能借助于匯編來實現(xiàn)。
runtime.gogo 在不同的 CPU 架構(gòu)平臺上的實現(xiàn)各不相同,但是核心原理殊途同歸,我們這里選用 amd64 架構(gòu)的匯編實現(xiàn)來分析,我會在關(guān)鍵的地方加上解釋:
// func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8// 將第一個 FP 偽寄存器所指向的 gobuf 的第一個參數(shù)存入 BX 寄存器, // gobuf 的一個參數(shù)即是 SP 指針MOVQ buf+0(FP), BXMOVQ gobuf_g(BX), DX // 將 gp.sched.g 保存到 DX 寄存器MOVQ 0(DX), CX // make sure g != nil// 將 tls (thread local storage) 保存到 CX 寄存器,然后把 gp.sched.g 放到 tls[0],// 這樣以后調(diào)用 getg() 之時就可以通過 TLS 直接獲取到當(dāng)前 goroutine 的 g 結(jié)構(gòu)體實例,// 進而可以得到 g 所在的 m 和 p,TLS 里一開始存儲的是系統(tǒng)堆棧 g0 的地址get_tls(CX)MOVQ DX, g(CX)// 下面的指令則是對函數(shù)棧的 BP/SP 寄存器(指針)的存取,// 最后進入到指定的代碼區(qū)域,執(zhí)行函數(shù)棧幀MOVQ gobuf_sp(BX), SP // restore SPMOVQ gobuf_ret(BX), AXMOVQ gobuf_ctxt(BX), DXMOVQ gobuf_bp(BX), BP// 這里是在清空 gp.sched,因為前面已經(jīng)把 gobuf 里的字段值都存入了寄存器,// 所以 gp.sched 就可以提前清空了,不需要等到后面 GC 來回收,減輕 GC 的負擔(dān)MOVQ $0, gobuf_sp(BX) // clear to help garbage collectorMOVQ $0, gobuf_ret(BX)MOVQ $0, gobuf_ctxt(BX)MOVQ $0, gobuf_bp(BX)// 把 gp.sched.pc 值放入 BX 寄存器// PC 指針指向 gogo 退出時需要執(zhí)行的函數(shù)地址MOVQ gobuf_pc(BX), BX// 用 BX 寄存器里的值去修改 CPU 的 IP 寄存器,// 這樣就可以根據(jù) CS:IP 寄存器的段地址+偏移量跳轉(zhuǎn)到 BX 寄存器里的地址,也就是 gp.sched.pcJMP BXruntime.gogo 函數(shù)接收 gp.sched 這個 gobuf 結(jié)構(gòu)體實例,其中保存了函數(shù)棧寄存器 SP/PC/BP,如果熟悉操作系統(tǒng)原理的話可以知道這些寄存器是 CPU 進行函數(shù)調(diào)用和返回時切換對應(yīng)的函數(shù)棧幀所需的寄存器,而 goroutine 的執(zhí)行和函數(shù)調(diào)用的原理是一致的,也是 CPU 寄存器的切換過程,所以這里的幾個寄存器當(dāng)前存的就是 G 的函數(shù)執(zhí)行棧,當(dāng) goroutine 在處理網(wǎng)絡(luò) I/O 之時,如果恰好處于 I/O 就緒的狀態(tài)的話,則正常完成 runtime.gogo,并在最后跳轉(zhuǎn)到特定的地址,那么這個地址是哪里呢?
我們知道 CPU 執(zhí)行函數(shù)的時候需要知道函數(shù)在內(nèi)存里的代碼段地址和偏移量,然后才能去取來函數(shù)棧執(zhí)行,而典型的提供代碼段地址和偏移量的寄存器就是 CS 和 IP 寄存器,而 JMP BX 指令則是用 BX 寄存器去更新 IP 寄存器,而 BX 寄存器里的值是 gp.sched.pc,那么這個 PC 指針究竟是指向哪里呢?讓我們來看另一處源碼。
眾所周知,啟動一個新的 goroutine 是通過 go 關(guān)鍵字來完成的,而 go compiler 會在編譯期間利用 cmd/compile/internal/gc.state.stmt 和 cmd/compile/internal/gc.state.call 這兩個函數(shù)將 go 關(guān)鍵字翻譯成 runtime.newproc 函數(shù)調(diào)用,而 runtime.newproc 接收了函數(shù)指針和其大小之后,會獲取 goroutine 和調(diào)用處的程序計數(shù)器,接著再調(diào)用 runtime.newproc1:
//?Create?a?new?g?in?state?_Grunnable,?starting?at?fn,?with?narg?bytes //?of?arguments?starting?at?argp.?callerpc?is?the?address?of?the?go //?statement?that?created?this.?The?caller?is?responsible?for?adding //?the?new?g?to?the?scheduler. // //?This?must?run?on?the?system?stack?because?it's?the?continuation?of //?newproc,?which?cannot?split?the?stack. // //go:systemstack func?newproc1(fn?*funcval,?argp?unsafe.Pointer,?narg?int32,?callergp?*g,?callerpc?uintptr)?*g?{...memclrNoHeapPointers(unsafe.Pointer(&newg.sched),?unsafe.Sizeof(newg.sched))newg.sched.sp?=?spnewg.stktopsp?=?sp//?把?goexit?函數(shù)地址存入?gobuf?的?PC?指針里newg.sched.pc?=?funcPC(goexit)?+?sys.PCQuantum?//?+PCQuantum?so?that?previous?instruction?is?in?same?functionnewg.sched.g?=?guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched,?fn)newg.gopc?=?callerpcnewg.ancestors?=?saveAncestors(callergp)newg.startpc?=?fn.fnif?_g_.m.curg?!=?nil?{newg.labels?=?_g_.m.curg.labels}if?isSystemGoroutine(newg,?false)?{atomic.Xadd(&sched.ngsys,?+1)}casgstatus(newg,?_Gdead,?_Grunnable)... }這里可以看到,newg.sched.pc 被設(shè)置了 runtime.goexit 的函數(shù)地址,newg 就是后面 runtime.gogo 執(zhí)行的 goroutine,因此 runtime.gogo 最后的匯編指令 JMP BX是跳轉(zhuǎn)到了 runtime.goexit,讓我們來繼續(xù)看看這個函數(shù)做了什么:
// The top-most function running on a goroutine // returns to goexit+PCQuantum. Defined as ABIInternal // so as to make it identifiable to traceback (this // function it used as a sentinel; traceback wants to // see the func PC, not a wrapper PC). TEXT runtime·goexit<ABIInternal>(SB),NOSPLIT,$0-0BYTE $0x90 // NOPCALL runtime·goexit1(SB) // does not return// traceback from goexit1 must hit code range of goexitBYTE $0x90 // NOP這個函數(shù)也是匯編實現(xiàn)的,但是非常簡單,就是直接調(diào)用 runtime·goexit1:
//?Finishes?execution?of?the?current?goroutine. func?goexit1()?{if?raceenabled?{racegoend()}if?trace.enabled?{traceGoEnd()}mcall(goexit0) }調(diào)用 runtime.mcall函數(shù):
// func mcall(fn func(*g)) // Switch to m->g0's stack, call fn(g). // Fn must never return. It should gogo(&g->sched) // to keep running g.// 切換回 g0 的系統(tǒng)堆棧,執(zhí)行 fn(g) TEXT runtime·mcall(SB), NOSPLIT, $0-8// 取入?yún)?funcval 對象的指針存入 DI 寄存器,此時 fn.fn 是 goexit0 的地址MOVQ fn+0(FP), DIget_tls(CX)MOVQ g(CX), AX // save state in g->schedMOVQ 0(SP), BX // caller's PCMOVQ BX, (g_sched+gobuf_pc)(AX)LEAQ fn+0(FP), BX // caller's SPMOVQ BX, (g_sched+gobuf_sp)(AX)MOVQ AX, (g_sched+gobuf_g)(AX)MOVQ BP, (g_sched+gobuf_bp)(AX)// switch to m->g0 & its stack, call fnMOVQ g(CX), BXMOVQ g_m(BX), BX// 把 g0 的棧指針存入 SI 寄存器,后面需要用到MOVQ m_g0(BX), SICMPQ SI, AX // if g == m->g0 call badmcallJNE 3(PC)MOVQ $runtime·badmcall(SB), AXJMP AX// 這兩個指令是把 g0 地址存入到 TLS 里,// 然后從 SI 寄存器取出 g0 的棧指針,// 替換掉 SP 寄存器里存的當(dāng)前 g 的棧指針MOVQ SI, g(CX) // g = m->g0MOVQ (g_sched+gobuf_sp)(SI), SP // sp = m->g0->sched.spPUSHQ AXMOVQ DI, DX// 入口處的第一個指令已經(jīng)把 funcval 實例對象的指針存入了 DI 寄存器,// 0(DI) 表示取出 DI 的第一個成員,即 goexit0 函數(shù)地址,再存入 DIMOVQ 0(DI), DICALL DI // 調(diào)用 DI 寄存器里的地址,即 goexit0POPQ AXMOVQ $runtime·badmcall2(SB), AXJMP AXRET可以看到 runtime.mcall 函數(shù)的主要邏輯是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),此處的 g 即是當(dāng)前運行的 goroutine,這個方法會保存當(dāng)前運行的 G 的 PC/SP 到 g->sched 里,以便該 G 可以在以后被重新恢復(fù)執(zhí)行,因為也涉及到寄存器和堆棧指針的操作,所以也需要使用匯編實現(xiàn),該函數(shù)最后會在 g0 系統(tǒng)堆棧下執(zhí)行 runtime.goexit0:
func?goexit0(gp?*g)?{_g_?:=?getg()casgstatus(gp,?_Grunning,?_Gdead)if?isSystemGoroutine(gp,?false)?{atomic.Xadd(&sched.ngsys,?-1)}gp.m?=?nillocked?:=?gp.lockedm?!=?0gp.lockedm?=?0_g_.m.lockedg?=?0gp.preemptStop?=?falsegp.paniconfault?=?falsegp._defer?=?nil?//?should?be?true?already?but?just?in?case.gp._panic?=?nil?//?non-nil?for?Goexit?during?panic.?points?at?stack-allocated?data.gp.writebuf?=?nilgp.waitreason?=?0gp.param?=?nilgp.labels?=?nilgp.timer?=?nilif?gcBlackenEnabled?!=?0?&&?gp.gcAssistBytes?>?0?{//?Flush?assist?credit?to?the?global?pool.?This?gives//?better?information?to?pacing?if?the?application?is//?rapidly?creating?an?exiting?goroutines.scanCredit?:=?int64(gcController.assistWorkPerByte?*?float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit,?scanCredit)gp.gcAssistBytes?=?0}dropg()if?GOARCH?==?"wasm"?{?//?no?threads?yet?on?wasmgfput(_g_.m.p.ptr(),?gp)schedule()?//?never?returns}if?_g_.m.lockedInt?!=?0?{print("invalid?m->lockedInt?=?",?_g_.m.lockedInt,?"\n")throw("internal?lockOSThread?error")}gfput(_g_.m.p.ptr(),?gp)if?locked?{//?The?goroutine?may?have?locked?this?thread?because//?it?put?it?in?an?unusual?kernel?state.?Kill?it//?rather?than?returning?it?to?the?thread?pool.//?Return?to?mstart,?which?will?release?the?P?and?exit//?the?thread.if?GOOS?!=?"plan9"?{?//?See?golang.org/issue/22227.gogo(&_g_.m.g0.sched)}?else?{//?Clear?lockedExt?on?plan9?since?we?may?end?up?re-using//?this?thread._g_.m.lockedExt?=?0}}schedule() }runtime.goexit0 的主要工作是就是
利用 CAS 操作把 g 的狀態(tài)從 _Grunning 更新為 _Gdead;
對 g 做一些清理操作,把一些字段值置空;
調(diào)用 runtime.dropg 解綁 g 和 m;
把 g 放入 p 存儲 g 的 gfree 鏈表作為緩存,后續(xù)如果需要啟動新的 goroutine 則可以直接從鏈表里取而不用重新初始化分配內(nèi)存。
最后,調(diào)用 runtime.schedule() 再次進入調(diào)度循環(huán)去調(diào)度新的 goroutines,永不停歇。
另一方面,如果 goroutine 處于 I/O 不可用狀態(tài),我們前面已經(jīng)分析過 netpoller 利用非阻塞 I/O + I/O 多路復(fù)用避免了陷入系統(tǒng)調(diào)用,所以此時會調(diào)用 runtime.gopark 并把 goroutine 暫時封存在用戶態(tài)空間,并休眠當(dāng)前的 goroutine,因此不會阻塞 runtime.gogo 的匯編執(zhí)行,而是通過 runtime.mcall 調(diào)用 runtime.park_m:
func?gopark(unlockf?func(*g,?unsafe.Pointer)?bool,?lock?unsafe.Pointer,?reason?waitReason,?traceEv?byte,?traceskip?int)?{if?reason?!=?waitReasonSleep?{checkTimeouts()?//?timeouts?may?expire?while?two?goroutines?keep?the?scheduler?busy}mp?:=?acquirem()gp?:=?mp.curgstatus?:=?readgstatus(gp)if?status?!=?_Grunning?&&?status?!=?_Gscanrunning?{throw("gopark:?bad?g?status")}mp.waitlock?=?lockmp.waitunlockf?=?unlockfgp.waitreason?=?reasonmp.waittraceev?=?traceEvmp.waittraceskip?=?traceskipreleasem(mp)//?can't?do?anything?that?might?move?the?G?between?Ms?here.mcall(park_m) }func?park_m(gp?*g)?{_g_?:=?getg()if?trace.enabled?{traceGoPark(_g_.m.waittraceev,?_g_.m.waittraceskip)}casgstatus(gp,?_Grunning,?_Gwaiting)dropg()if?fn?:=?_g_.m.waitunlockf;?fn?!=?nil?{ok?:=?fn(gp,?_g_.m.waitlock)_g_.m.waitunlockf?=?nil_g_.m.waitlock?=?nilif?!ok?{if?trace.enabled?{traceGoUnpark(gp,?2)}casgstatus(gp,?_Gwaiting,?_Grunnable)execute(gp,?true)?//?Schedule?it?back,?never?returns.}}schedule() }runtime.mcall 方法我們在前面已經(jīng)介紹過,它主要的工作就是是從當(dāng)前 goroutine 切換回 g0 的系統(tǒng)堆棧,然后調(diào)用 fn(g),而此時 runtime.mcall 調(diào)用執(zhí)行的是 runtime.park_m,這個方法里會利用 CAS 把當(dāng)前運行的 goroutine -- gp 的狀態(tài) 從 _Grunning 切換到 _Gwaiting,表明該 goroutine 已進入到等待喚醒狀態(tài),此時封存和休眠 G 的操作就完成了,只需等待就緒之后被重新喚醒執(zhí)行即可。最后調(diào)用 runtime.schedule() 再次進入調(diào)度循環(huán),去執(zhí)行下一個 goroutine,充分利用 CPU。
至此,我們完成了對 Go netpoller 原理剖析的整個閉環(huán)。
Go netpoller 的問題
Go netpoller 的設(shè)計不可謂不精巧、性能也不可謂不高,配合 goroutine 開發(fā)網(wǎng)絡(luò)應(yīng)用的時候就一個字:爽。因此 Go 的網(wǎng)絡(luò)編程模式是及其簡潔高效的,然而,沒有任何一種設(shè)計和架構(gòu)是完美的, goroutine-per-connection 這種模式雖然簡單高效,但是在某些極端的場景下也會暴露出問題:goroutine 雖然非常輕量,它的自定義棧內(nèi)存初始值僅為 2KB,后面按需擴容;海量連接的業(yè)務(wù)場景下, goroutine-per-connection ,此時 goroutine 數(shù)量以及消耗的資源就會呈線性趨勢暴漲,雖然 Go scheduler 內(nèi)部做了 g 的緩存鏈表,可以一定程度上緩解高頻創(chuàng)建銷毀 goroutine 的壓力,但是對于瞬時性暴漲的長連接場景就無能為力了,大量的 goroutines 會被不斷創(chuàng)建出來,從而對 Go runtime scheduler 造成極大的調(diào)度壓力和侵占系統(tǒng)資源,然后資源被侵占又反過來影響 Go scheduler 的調(diào)度,進而導(dǎo)致性能下降。
Reactor 網(wǎng)絡(luò)模型
目前 Linux 平臺上主流的高性能網(wǎng)絡(luò)庫/框架中,大都采用 Reactor 模式,比如 netty、libevent、libev、ACE,POE(Perl)、Twisted(Python)等。
Reactor 模式本質(zhì)上指的是使用 I/O 多路復(fù)用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O) 的模式。
通常設(shè)置一個主線程負責(zé)做 event-loop 事件循環(huán)和 I/O 讀寫,通過 select/poll/epoll_wait 等系統(tǒng)調(diào)用監(jiān)聽 I/O 事件,業(yè)務(wù)邏輯提交給其他工作線程去做。而所謂『非阻塞 I/O』的核心思想是指避免阻塞在 read() 或者 write() 或者其他的 I/O 系統(tǒng)調(diào)用上,這樣可以最大限度的復(fù)用 event-loop 線程,讓一個線程能服務(wù)于多個 sockets。在 Reactor 模式中,I/O 線程只能阻塞在 I/O multiplexing 函數(shù)上(select/poll/epoll_wait)。
Reactor 模式的基本工作流程如下:
Server 端完成在 bind&listen 之后,將 listenfd 注冊到 epollfd 中,最后進入 event-loop 事件循環(huán)。循環(huán)過程中會調(diào)用 select/poll/epoll_wait 阻塞等待,若有在 listenfd 上的新連接事件則解除阻塞返回,并調(diào)用 socket.accept 接收新連接 connfd,并將 connfd 加入到 epollfd 的 I/O 復(fù)用(監(jiān)聽)隊列。
當(dāng) connfd 上發(fā)生可讀/可寫事件也會解除 select/poll/epoll_wait 的阻塞等待,然后進行 I/O 讀寫操作,這里讀寫 I/O 都是非阻塞 I/O,這樣才不會阻塞 event-loop 的下一個循環(huán)。然而,這樣容易割裂業(yè)務(wù)邏輯,不易理解和維護。
調(diào)用 read 讀取數(shù)據(jù)之后進行解碼并放入隊列中,等待工作線程處理。
工作線程處理完數(shù)據(jù)之后,返回到 event-loop 線程,由這個線程負責(zé)調(diào)用 write 把數(shù)據(jù)寫回 client。
accept 連接以及 conn 上的讀寫操作若是在主線程完成,則要求是非阻塞 I/O,因為 Reactor 模式一條最重要的原則就是:I/O 操作不能阻塞 event-loop 事件循環(huán)。實際上 event loop 可能也可以是多線程的,只是一個線程里只有一個 select/poll/epoll_wait。
上面提到了 Go netpoller 在某些場景下可能因為創(chuàng)建太多的 goroutine 而過多地消耗系統(tǒng)資源,而在現(xiàn)實世界的網(wǎng)絡(luò)業(yè)務(wù)中,服務(wù)器持有的海量連接中在極短的時間窗口內(nèi)只有極少數(shù)是 active 而大多數(shù)則是 idle,就像這樣(非真實數(shù)據(jù),僅僅是為了比喻):
那么為每一個連接指派一個 goroutine 就顯得太過奢侈了,而 Reactor 模式這種利用 I/O 多路復(fù)用進而只需要使用少量線程即可管理海量連接的設(shè)計就可以在這樣網(wǎng)絡(luò)業(yè)務(wù)中大顯身手了:
MultiReactors.png
在絕大部分應(yīng)用場景下,我推薦大家還是遵循 Go 的 best practices,使用原生的 Go 網(wǎng)絡(luò)庫來構(gòu)建自己的網(wǎng)絡(luò)應(yīng)用。然而,在某些極度追求性能、壓榨系統(tǒng)資源以及技術(shù)棧必須是原生 Go (不考慮 C/C++ 寫中間層而 Go 寫業(yè)務(wù)層)的業(yè)務(wù)場景下,我們可以考慮自己構(gòu)建 Reactor 網(wǎng)絡(luò)模型。
gnet
gnet 是一個基于事件驅(qū)動的高性能和輕量級網(wǎng)絡(luò)框架。它直接使用 epoll 和 kqueue 系統(tǒng)調(diào)用而非標(biāo)準(zhǔn) Go 網(wǎng)絡(luò)包:net 來構(gòu)建網(wǎng)絡(luò)應(yīng)用,它的工作原理類似兩個開源的網(wǎng)絡(luò)庫:netty 和 libuv,這也使得gnet 達到了一個遠超 Go net 的性能表現(xiàn)。
gnet 設(shè)計開發(fā)的初衷不是為了取代 Go 的標(biāo)準(zhǔn)網(wǎng)絡(luò)庫:net,而是為了創(chuàng)造出一個類似于 Redis、Haproxy 能高效處理網(wǎng)絡(luò)包的 Go 語言網(wǎng)絡(luò)服務(wù)器框架。
gnet 的賣點在于它是一個高性能、輕量級、非阻塞的純 Go 實現(xiàn)的傳輸層(TCP/UDP/Unix Domain Socket)網(wǎng)絡(luò)框架,開發(fā)者可以使用 gnet 來實現(xiàn)自己的應(yīng)用層網(wǎng)絡(luò)協(xié)議(HTTP、RPC、Redis、WebSocket 等等),從而構(gòu)建出自己的應(yīng)用層網(wǎng)絡(luò)應(yīng)用:比如在 gnet 上實現(xiàn) HTTP 協(xié)議就可以創(chuàng)建出一個 HTTP 服務(wù)器 或者 Web 開發(fā)框架,實現(xiàn) Redis 協(xié)議就可以創(chuàng)建出自己的 Redis 服務(wù)器等等。
gnet,在某些極端的網(wǎng)絡(luò)業(yè)務(wù)場景,比如海量連接、高頻短連接、網(wǎng)絡(luò)小包等等場景,gnet 在性能和資源占用上都遠超 Go 原生的 net 包(基于 netpoller)。
gnet 已經(jīng)實現(xiàn)了 Multi-Reactors 和 Multi-Reactors + Goroutine Pool 兩種網(wǎng)絡(luò)模型,也得益于這些網(wǎng)絡(luò)模型,使得 gnet 成為一個高性能和低損耗的 Go 網(wǎng)絡(luò)框架:
MultiReactors.png
multireactorsthreadpool.png???? 功能
[x] 高性能 的基于多線程/Go程網(wǎng)絡(luò)模型的 event-loop 事件驅(qū)動
[x] 內(nèi)置 goroutine 池,由開源庫 ants 提供支持
[x] 內(nèi)置 bytes 內(nèi)存池,由開源庫 bytebufferpool 提供支持
[x] 整個生命周期是無鎖的
[x] 簡單易用的 APIs
[x] 基于 Ring-Buffer 的高效且可重用的內(nèi)存 buffer
[x] 支持多種網(wǎng)絡(luò)協(xié)議/IPC 機制:TCP、UDP 和 Unix Domain Socket
[x] 支持多種負載均衡算法:Round-Robin(輪詢)、Source-Addr-Hash(源地址哈希) 和 Least-Connections(最少連接數(shù))
[x] 支持兩種事件驅(qū)動機制:Linux 里的 epoll 以及 FreeBSD/DragonFly/Darwin 里的 kqueue
[x] 支持異步寫操作
[x] 靈活的事件定時器
[x] SO_REUSEPORT 端口重用
[x] 內(nèi)置多種編解碼器,支持對 TCP 數(shù)據(jù)流分包:LineBasedFrameCodec, DelimiterBasedFrameCodec, FixedLengthFrameCodec 和 LengthFieldBasedFrameCodec,參考自 netty codec,而且支持自定制編解碼器
[x] 支持 Windows 平臺,基于 IOCP 事件驅(qū)動機制 Go 標(biāo)準(zhǔn)網(wǎng)絡(luò)庫
[ ] 實現(xiàn) gnet 客戶端
參考&延伸閱讀
The Go netpoller
Nonblocking I/O
epoll(7) — Linux manual page
I/O Multiplexing: The select and poll Functions
The method to epoll’s madness
Scalable Go Scheduler Design Doc
Scheduling In Go : Part I - OS Scheduler
Scheduling In Go : Part II - Go Scheduler
Scheduling In Go : Part III - Concurrency
Goroutines, Nonblocking I/O, And Memory Usage
IO多路復(fù)用與Go網(wǎng)絡(luò)庫的實現(xiàn)
關(guān)于select函數(shù)中timeval和fd_set重新設(shè)置的問題
A Million WebSockets and Go
Going Infinite, handling 1M websockets connections in Go
字節(jié)跳動在 Go 網(wǎng)絡(luò)庫上的實踐
總結(jié)
以上是生活随笔為你收集整理的Go netpoller 网络模型之源码全面解析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 最新开源:3TS腾讯事务处理技术验证系统
- 下一篇: 微信AI从识物到通用图像搜索的探索揭秘