万字长文 | 漫谈libco协程设计及实现
libco是微信后臺(tái)大規(guī)模使用的c/c++協(xié)程庫(kù),2013年至今穩(wěn)定運(yùn)行在微信后臺(tái)的數(shù)萬(wàn)臺(tái)機(jī)器上,使得微信后端服務(wù)能同時(shí)hold大量請(qǐng)求,被譽(yù)為微信服務(wù)器穩(wěn)定性的基石。libco在2013年的時(shí)候作為騰訊六大開源項(xiàng)目首次開源。libco源碼地址。
libco首先能解決CPU利用率與IO利用率不平衡,比用多線程解決IO阻塞CPU問題更高效。因?yàn)橛脩魬B(tài)協(xié)程切換比線程切換性能高:線程切換保存恢復(fù)的數(shù)據(jù)更多,需要用戶態(tài)和內(nèi)核態(tài)切換。其次libco又避免了異步調(diào)用和回調(diào)分離導(dǎo)致的代碼結(jié)構(gòu)破碎。
libco采用epoll多路復(fù)用使得一個(gè)線程處理多個(gè)socket連接,采用鉤子函數(shù)hook住socket族函數(shù),采用時(shí)間輪盤處理等待超時(shí)事件,采用協(xié)程棧保存、恢復(fù)每個(gè)協(xié)程上下文環(huán)境。
為了讓大家更容易閱讀libco源碼,本文以源碼為主介紹libco,內(nèi)容偏底層細(xì)節(jié)。更多個(gè)人文章,歡迎關(guān)注作者博客。
1.?協(xié)程切換
1.1?函數(shù)棧
首先復(fù)習(xí)下進(jìn)程的地址空間,如圖1所示,與本文相關(guān)的有代碼段、堆、棧。代碼段包含應(yīng)用程序的匯編代碼,指令寄存器eip存的是代碼段中某一條匯編指令地址,cpu從eip中取出匯編指令的地址,并在代碼段中找到對(duì)應(yīng)匯編指令開始執(zhí)行。CPU執(zhí)行指令時(shí)在棧里存參數(shù)、局部變量等數(shù)據(jù)。代碼通過malloc、new在堆上申請(qǐng)內(nèi)存空間。
圖1
圖2所示C代碼,通過gcc -m32 test.c -o test.o在i386下編譯,然后執(zhí)行g(shù)db test.o。disas main可看到圖3所示的main函數(shù)匯編碼,disas sum可看到圖4所示sum函數(shù)的匯編代碼。調(diào)用sum時(shí),main和sum的函數(shù)棧如圖5所示。圖5的表共有兩列,第一列為內(nèi)存地址,第二列為該地址存的內(nèi)容,除了用“...”省略的內(nèi)存地址,其他每一行均比上一行低4byte,因?yàn)闂5刂窂母叩降驮鲩L(zhǎng)。
從圖5可以看出:一,每個(gè)函數(shù)的棧在ebp棧底指針和esp棧頂指針之間;二,存在調(diào)用關(guān)系的兩個(gè)函數(shù)的棧內(nèi)存地址是相鄰的;三,ebp指針指的位置存儲(chǔ)的是上級(jí)函數(shù)的ebp地址,例如sum的ebp 0xffffd598位置存的是main的ebp0xffffd5c8,目的是sum執(zhí)行后可恢復(fù)main的ebp,而main的esp可通過sum的ebp + 4恢復(fù);四,sum的ebp + 4位置,即main的esp位置存的是sum執(zhí)行后的返回地址0x08048415,該地址不在圖1中的棧(Stack)里,而在圖1中的代碼段里,sum執(zhí)行后,leave指令恢復(fù)ebp、esp,ret指令將esp處的內(nèi)容0x08048415放到寄存器eip,cpu從eip里取出下一條待執(zhí)行的指令地址,并根據(jù)指令地址從代碼段里獲取指令執(zhí)行;五,sum的參數(shù)y、x按高地址到低地址,依次存在sum的ebp + 12、ebp + 8位置處。
圖2
圖3 main函數(shù)匯編碼
圖4 sum函數(shù)匯編碼
圖5 32位系統(tǒng)函數(shù)棧
1.2?協(xié)程棧
共享?xiàng)O挛慕榻B,此處介紹非共享?xiàng)!T诜枪蚕項(xiàng)DJ较?#xff0c;每個(gè)非主協(xié)程有自己的棧,而該棧是在堆上分配的,并不是系統(tǒng)棧,但主協(xié)程的棧仍然是系統(tǒng)棧,每?jī)蓚€(gè)協(xié)程的棧地址不相鄰。協(xié)程棧切換分為第1次、第k次(k>=2)換到目的協(xié)程TargetCoroutine。
因?yàn)橹鲄f(xié)程即當(dāng)前線程的第1次運(yùn)行是系統(tǒng)調(diào)度的,后續(xù)才由用戶調(diào)度,而非主協(xié)程每次都由用戶調(diào)度。所以每次主協(xié)程切回的行為都一樣,且和非主協(xié)程第k次(k>=2)的切回行為一致。
第1次切到TargetCoroutine之前, coctx_make(圖6)將函數(shù)地址pfn寫入?yún)f(xié)程變量regs[ kEIP ],pfn即為CoRoutineFunc的指針。CoRoutineFunc函數(shù)(圖7)在第448行調(diào)進(jìn)用戶自定義的協(xié)程函數(shù)UserCoRoutineFunc(圖8)。圖6中ss_sp為128K協(xié)程棧低地址,ss_size為128K將ss_sp+ss_size – sizeof(coctx_param_t)–sizeof(void*)作為esp開始位置,記錄在regs[kESP]。因?yàn)闂母叩降驮鲩L(zhǎng),所以真正的棧空間從高地址ss_sp + ss_size?– sizeof(void*) – sizeof(coctx_param_t)增長(zhǎng)到低地址ss_sp。這部分空間雖然是協(xié)程棧,但實(shí)際是通過stack_mem->stack_buffer= (char*)malloc(stack_size)申請(qǐng)的堆空間。CoRoutineFunc、其調(diào)用的函數(shù)、其調(diào)用的函數(shù)再調(diào)用的函數(shù)…的函數(shù)棧均在該128K的堆空間里。
圖6
圖7
圖8
?
第1次切換到TargetCoroutine時(shí)。圖9第50行將esp指向TargetCoroutine.coctx_t.regs,此時(shí)esp指向的地址如圖10所示。第54行從regs[0]彈出返回地址,即CoRoutineFunc函數(shù)地址0x08043212。第65行彈出esp地址即ss_sp+ss_size–sizeof(coctx_param_t)–sizeof(void*)。第67行pushl %eax做兩件事情:
一,將esp地址減4,即esp = ss_sp + ss_size – sizeof(coctx_param_t) – sizeof(void*) – 4;
二,在esp新位置壓入CoRoutineFunc函數(shù)地址。第71行ret從esp取出CoRoutineFunc函數(shù)地址放入eip寄存器,cpu從eip寄存器取出CoRoutineFunc函數(shù)第一條指令地址開始執(zhí)行指令,CoRoutineFunc函數(shù)第一條指令的地址就是CoRoutineFunc函數(shù)地址。
圖6中沒有對(duì)regs[kEBP]賦初值,因此切到TargetCoroutine時(shí)彈出的ebp是0,導(dǎo)致CoRoutineFunc函數(shù)棧存的上級(jí)函數(shù)的ebp是0,但沒有影響。CoRoutineFunc(圖7)只能執(zhí)行到第454行:co_yield_env(env),然后切到其他協(xié)程,不會(huì)執(zhí)行到456行。上級(jí)函數(shù)的ebp是在CoRoutineFunc執(zhí)行后,用于恢復(fù)上級(jí)函數(shù)的esp,但在這里CoRoutineFunc函數(shù)在return 0之前已經(jīng)切到其他協(xié)程,因此上級(jí)函數(shù)的ebp是0不會(huì)導(dǎo)致錯(cuò)誤。
圖9
圖10?第1次切到TargetCoroutine
TargetCoroutine在第k-1次被coctx_swap切出時(shí),TargetCoroutine是圖2.3.2.4的源協(xié)程curr。切出TargetCoroutine時(shí)會(huì)調(diào)進(jìn)coctx_swap,在執(zhí)行圖9第34行之前,函數(shù)棧如圖11所示。然后將co_swap(注意不是coctx_swap,因?yàn)檫@里沒有像圖4的第3、4、5行修改ebp和esp)棧頂?shù)刂?#43;4即0xffd2dc14通過第38行寫入到regs[kESP]。將co_swap(不是coctx_swap,原因同上)棧底地址ebp即0xffd2dc3c通過第40行寫入regs[kEBP]。將stCoRoutineEnv_t* curr_env = co_get_curr_thread_env()的第一條匯編指令地址0x08043212通過第47行寫入regs[ kEIP ]。此時(shí)regs數(shù)組內(nèi)容如圖12所示,在此期間esp的變化如圖12左側(cè)所示。
圖11 co_swap函數(shù)棧
圖12?第k(k>=2)次切到TargetCoroutine
第k(k>=2)次切回TargetCoroutine時(shí),圖9第54行彈出eax,即stCoRoutineEnv_t* curr_env = co_get_curr_thread_env()的第一條匯編指令地址0x08043212。第61行彈出ebp即圖2.3.2.6所示的0xffd2dc3c。圖2.3.2.4第65行彈出esp,即圖11所示的0xffd2dc14(不是0xffd2dc10,因?yàn)閳D2.3.2.4第38行的壓入的eax是0xffd2dc10 + 4)。第67行做兩件事情,首先esp減4得到切出時(shí)的棧地址0xffd2dc10,再將eax存的匯編指令地址0x08043212寫到esp即0xffd2dc10處。最后ret從esp取出匯編指令地址0x08043212放入eip寄存器,cpu從eip寄存器取出指令地址開始執(zhí)行指令。因?yàn)門argetCoroutine切回時(shí),首先執(zhí)行 stCoRoutineEnv_t*? curr_env=co_get_curr_thread_env(),而TargetCoroutine在之前切出時(shí),最后執(zhí)行的代碼是coctx_swap(&(curr->ctx),&(pending_co->ctx) ),因此協(xié)程在切回后能接著之前切出時(shí)的代碼繼續(xù)執(zhí)行。
對(duì)比圖10和12發(fā)現(xiàn):第1次切到TargetCoroutine時(shí)ebp、esp、返回地址、以及其他寄存器和第k(k>=2)次均不同。
libco在stCoRoutineEnv_t定義了pCallStack數(shù)組,大小為128,數(shù)組里的每個(gè)元素均為協(xié)程。pCallStack用于獲取當(dāng)前協(xié)程pCallStack[iCallStackSize - 1];獲取當(dāng)前協(xié)程掛起后該切到的協(xié)程pCallStack[iCallStackSize - 2]。pCallStack存的是遞歸調(diào)用(暫且稱之為遞歸,并不是遞歸)的協(xié)程,pCallStack[0]一定是主協(xié)程。例如主協(xié)程調(diào)用協(xié)程1,協(xié)程1調(diào)用協(xié)程2...協(xié)程k-1調(diào)用協(xié)程k,這種遞歸關(guān)系的k最大為127,調(diào)到協(xié)程127時(shí),此時(shí)pCallStack[0]存主協(xié)程,pCallStack[1]存協(xié)程1...pCallStack[k]存協(xié)程k..pCallStack[127]存協(xié)程127。但遞歸如此之深的協(xié)程實(shí)際中不會(huì)遇到,更多的場(chǎng)景應(yīng)該是主協(xié)程調(diào)用協(xié)程1,協(xié)程1掛起切回主協(xié)程,主協(xié)程再調(diào)用協(xié)程2,協(xié)程2掛起切回主協(xié)程,主協(xié)程再調(diào)用協(xié)程3...因此主協(xié)程調(diào)到協(xié)程k時(shí),pCallStack[0]是主協(xié)程,pCallStack[1]是協(xié)程k,其他元素為空;協(xié)程k掛起切回主協(xié)程時(shí),pCallStack[0]是主協(xié)程,其他元素為空。因此128大小的pCallStack足夠上萬(wàn)甚至更多協(xié)程使用。
1.3?主協(xié)程
主協(xié)程即為當(dāng)前線程,用stCoRoutine_t. cIsMain標(biāo)記。主協(xié)程的棧在圖1所示的棧(Stack)區(qū),而其他協(xié)程的棧在圖1所示的堆(Heap)區(qū)。圖9中切出主協(xié)程時(shí)38-47行寄存器存在regs數(shù)組里(不在128K的協(xié)程棧里,另外申請(qǐng)的堆空間)。切回主協(xié)程時(shí),第61、65行彈出的ebp、esp指向的是系統(tǒng)棧里的內(nèi)存,因此主協(xié)程的棧始終在系統(tǒng)棧上,用不到128K的協(xié)程棧。
那是否有必要將當(dāng)前協(xié)程標(biāo)記為主協(xié)程?圖2.1.3.1的第1024、1033行是代碼僅有的兩處需要判斷主協(xié)程。如果聲明了協(xié)程私有變量,但沒有創(chuàng)建其他協(xié)程時(shí),co為NULL,此時(shí)通過1026行獲取主協(xié)程私有變量。但在程序運(yùn)行到某段代碼時(shí)開始創(chuàng)建協(xié)程,如果不標(biāo)記主協(xié)程,因?yàn)閏o不為NULL,代碼會(huì)通過第1028行獲取主協(xié)程私有變量,此時(shí)因?yàn)槟貌坏揭郧霸O(shè)置的主協(xié)程私有變量而導(dǎo)致錯(cuò)誤。例如若將圖2.1.3.1第1024、1033行的co->cIsMain條件刪掉,圖2.1.3.2第24行輸出的主協(xié)程私有變量為11,而第30行輸出0。
圖13
圖14
?
1.4?共享?xiàng)?/strong>
每個(gè)協(xié)程申請(qǐng)一個(gè)固定128K的棧,在協(xié)程數(shù)量很大時(shí),存在內(nèi)存壓力。因此libco引入共享?xiàng)DJ?#xff0c;示例代碼可參看example_copystack.cpp。共享?xiàng)?duì)主協(xié)程沒有影響,共享?xiàng)H匀皇窃诙焉?#xff0c;而主協(xié)程的棧在系統(tǒng)棧上。
采用共享?xiàng)r(shí),每個(gè)協(xié)程的棧從共享?xiàng)?匠鰰r(shí),需要分配空間存儲(chǔ),但按需分配空間。因?yàn)榻^大部分協(xié)程的棧空間都遠(yuǎn)低于128K,因此拷出時(shí)只需分配很小的空間,相比私有棧能節(jié)省大量?jī)?nèi)存。共享?xiàng)?梢灾婚_一個(gè),但為了避免頻繁換入換出,一般開多個(gè)共享?xiàng)!C總€(gè)共享?xiàng)?梢陨暾?qǐng)大空間,降低棧溢出的風(fēng)險(xiǎn)。
假設(shè)開10個(gè)共享?xiàng)?#xff0c;每個(gè)協(xié)程模10映射到對(duì)應(yīng)的共享?xiàng)!<僭O(shè)協(xié)程調(diào)用順序?yàn)橹鲄f(xié)程、協(xié)程2、協(xié)程3、協(xié)程12。協(xié)程2切到協(xié)程3時(shí),因?yàn)閰f(xié)程2、3使用的共享?xiàng)7謩e是第2、3個(gè)共享?xiàng)?#xff0c;沒有沖突,所以協(xié)程2的棧內(nèi)容仍然保留在第2個(gè)共享?xiàng)?#xff0c;并不拷出來(lái),但協(xié)程2的寄存器會(huì)被coctx_swap保存在regs數(shù)組。調(diào)用到協(xié)程12時(shí),協(xié)程12和協(xié)程2都映射到第2個(gè)共享?xiàng)?#xff0c;因此需要將協(xié)程2的棧內(nèi)容拷出,并將協(xié)程12的棧內(nèi)容拷貝到第2個(gè)共享?xiàng)V小K怨蚕項(xiàng)6嗔丝匠鰠f(xié)程2的棧、拷進(jìn)協(xié)程12的棧兩個(gè)操作,即用拷貝共享?xiàng)5臅r(shí)間換取每個(gè)協(xié)程棧的空間。
圖15在609行將curr的stack_sp指向c的地址,記錄當(dāng)前協(xié)程棧的低位置,當(dāng)前協(xié)程棧的高位置是ss_sp + ss_size – sizeof(coctx_param_t) – sizeof(void*),存的是CoRoutineFunc函數(shù)第一條匯編指令。curr協(xié)程拷出協(xié)程時(shí),需要拷貝從curr->stack_sp(即&c)到ss_sp + ss_size?–?sizeof(void*) – sizeof(coctx_param_t)的棧內(nèi)容。以后從其他協(xié)程再切換回curr協(xié)程時(shí),如果共享?xiàng)@镉衏urr協(xié)程,則只通過coctx_swap恢復(fù)寄存器即可;否則如圖15第657行所示,需要將curr保存在curr->save_buffer的協(xié)程棧復(fù)制到從curr->stack_sp到ss_sp + ss_size?– sizeof(void*) – sizeof(coctx_param_t)的內(nèi)存空間。
圖15從curr協(xié)程切到pending_co協(xié)程時(shí),如果是共享?xiàng)DJ?#xff0c;先拿到pending_co的共享?xiàng)tack_mem里已有的協(xié)程occupy_co,如果occupy_co非空且不是pending_co,則保存已有的協(xié)程save_stack_buffer(occupy_co),將stack_mem指向的協(xié)程換為pending_co。并將pending_co和occupy_co均保存在env里,不能用局部變量記錄,因?yàn)榫植孔兞吭赾octx_swap之前屬于curr協(xié)程,但coctx_swap后協(xié)程棧已經(jīng)被切換,curr的所有局部變量無(wú)法被pending_co訪問。如果occupy_co和pending_co不是同一個(gè)協(xié)程,需要將occupy_co在共享?xiàng)@锏臄?shù)據(jù)拷貝到occupy_co->save_buffer。協(xié)程的數(shù)據(jù)除了在棧里還分布在寄存器,如果occupy_co不是curr,則在occupy_co之前被切換到其他協(xié)程時(shí)寄存器已經(jīng)被coctx_swap保存;否則,則其寄存器在本次執(zhí)行coctx_swap被保存。
圖15
1.5?線程切換VS?協(xié)程切換
IO密集時(shí)也可以使用多個(gè)線程。但線程有兩個(gè)不足:一,切換代價(jià)大(保存恢復(fù)上下文、線程調(diào)度);二,占用資源多。
線程往往需要對(duì)公共數(shù)據(jù)加鎖,鎖會(huì)導(dǎo)致線程調(diào)度。因?yàn)橛脩舻木€程是在用戶態(tài)執(zhí)行,而線程調(diào)度和管理是在內(nèi)核態(tài)實(shí)現(xiàn),所以線程調(diào)度需要從用戶態(tài)轉(zhuǎn)到內(nèi)核態(tài),再?gòu)膬?nèi)核態(tài)轉(zhuǎn)到用戶態(tài)。切到內(nèi)核態(tài)時(shí)需要保存用戶態(tài)上下文,再切到用戶態(tài)時(shí),需要恢復(fù)用戶態(tài)上下文,而線程的用戶態(tài)上下文比協(xié)程上下文大得多。另外線程調(diào)度也需要耗時(shí)。
線程棧默認(rèn)為1M大于協(xié)程棧128K,另外線程還需要各種struct存一些狀態(tài),實(shí)測(cè)每個(gè)pthread_create創(chuàng)建的線程大約占8M左右內(nèi)存,因此線程占用資源也遠(yuǎn)比協(xié)程多。
?
2.?協(xié)程控制
2.1 epoll多路復(fù)用IO模型
協(xié)程使用epoll多路復(fù)用IO模型。常見的同步調(diào)用是同步阻塞模型,異步調(diào)用是異步IO模型。以read為例說(shuō)明以下三種IO模型的區(qū)別,read分為兩個(gè)階段:一,等待數(shù)據(jù);二,將數(shù)據(jù)從kernel拷貝到用戶線程。
同步調(diào)用read使用同步阻塞IO,kernel等待數(shù)據(jù)到達(dá),再將數(shù)據(jù)拷貝到用戶線程,這兩個(gè)階段用戶線程都被阻塞。異步調(diào)用read使用異步IO模型,用戶線程調(diào)用read后,立刻可以去做其它事, kernel等待數(shù)據(jù)準(zhǔn)備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,都完成后,kernel通知用戶read完成,然后用戶線程直接使用數(shù)據(jù),兩個(gè)階段用戶線程都不被阻塞。而協(xié)程調(diào)用read使用多路復(fù)用IO模型,用戶線程調(diào)用read后,第一階段也不會(huì)被阻塞,但第二個(gè)階段會(huì)被阻塞,epoll多路復(fù)用IO模型可以在一個(gè)線程管理多個(gè)socket。
同步調(diào)用在兩個(gè)階段都會(huì)阻塞用戶線程,因此效率低。雖然可以為每個(gè)連接開個(gè)線程,但連接數(shù)多時(shí),線程太多導(dǎo)致性能壓力,也可以開固定數(shù)目的線程池,但如果存在大量長(zhǎng)連接,線程資源不被釋放,后續(xù)的連接得不到處理。異步調(diào)用時(shí),因?yàn)閮蓚€(gè)階段都不阻塞用戶線程,因此效率最高,但異步的調(diào)用邏輯和回調(diào)邏輯需要分開,在異步調(diào)用多時(shí),代碼結(jié)構(gòu)不清晰。而協(xié)程的epoll多路復(fù)用IO模型,雖然會(huì)阻塞第二個(gè)階段,但因?yàn)榈诙A段讀數(shù)據(jù)耗時(shí)很少,因此效率略低于異步調(diào)用。協(xié)程最大的優(yōu)點(diǎn)是在接近異步效率的同時(shí),可以使用同步的寫法(僅僅是同步的寫法,不是同步調(diào)用)。例如read函數(shù)的調(diào)用代碼后,緊接著可以寫處理數(shù)據(jù)的邏輯,不用再定義回調(diào)函數(shù)。調(diào)用read后協(xié)程掛起,其他協(xié)程被調(diào)用,數(shù)據(jù)就緒后在read后面處理數(shù)據(jù)。
系統(tǒng)select/poll、epoll函數(shù)都提供多路IO復(fù)用方案,協(xié)程使用的是epoll。select、poll類似,監(jiān)視writefds、readfds、exceptfds文件描述符(fd)。調(diào)用select/poll會(huì)阻塞,直到有fd就緒(可讀、可寫、有except),或超時(shí)。select/poll返回后,通過遍歷fd集合,找到就緒的fs,并開始讀寫。poll用鏈表存儲(chǔ)fd,而select用fd標(biāo)記位存儲(chǔ),因此poll沒有最大連接數(shù)限制,而select限制為1024。select/poll共有的缺點(diǎn)是:一,返回后需要遍歷fd集合找到就緒的fd,但fd集合就緒的描述符很少;二,select/poll均需將fd集合在用戶態(tài)和內(nèi)核態(tài)之間來(lái)回拷貝。epoll的引入是為了解決select/poll上述兩個(gè)缺點(diǎn),epoll提供三個(gè)函數(shù)epoll_create、epoll_ctl、epoll_wait。epoll_create在內(nèi)核的高速cache中建一棵紅黑樹以及就緒鏈表(activeList)。epoll_ctl的add在紅黑樹上添加fd結(jié)點(diǎn),并且注冊(cè)fd的回調(diào)函數(shù),內(nèi)核在檢測(cè)到某fd就緒時(shí)會(huì)調(diào)用回調(diào)函數(shù)將fd添加到activeList。epoll_wait將activeList中的fd返回。epoll_ctl每次只往內(nèi)核添加紅黑樹節(jié)點(diǎn),不需像select/poll拷貝所有fd到內(nèi)核,epoll_wait通過共享內(nèi)存從內(nèi)核傳遞就緒fd到用戶,不需像select/poll拷貝出所有fd并遍歷所有fd找到就緒的。
2.2?非阻塞IO
協(xié)程的epoll多路復(fù)用IO模型使用的是非阻塞IO,發(fā)起read操作后,可立即掛起協(xié)程,并調(diào)度其他協(xié)程。非阻塞IO是通過fcntl函數(shù)設(shè)置O_NONBLOCK,影響socket族read、write、connect、sendto、recvfrom、send、recv函數(shù)。因?yàn)閞ead、recv、recvfrom實(shí)現(xiàn)類似,write、send實(shí)現(xiàn)類似。因此接下來(lái)介紹read、write、connect三個(gè)函數(shù)。
2.2.1 鉤子函數(shù)read
如圖16所示,用戶自定義的read函數(shù)hook住系統(tǒng)read函數(shù)。Read時(shí)分三種情況:一,用戶未開啟hook,直接在337行調(diào)用系統(tǒng)read;二,如果用戶指定了O_NONBLOCK,也直接在343行調(diào)用系統(tǒng)read,此時(shí)是非阻塞的;三,如果用戶既開啟了hook,又沒有指定O_NONBLOCK,如果libco不做任何處理,即使通過353行poll了,但如果協(xié)程是超時(shí)返回,第355行還是會(huì)被阻塞。所以只要用戶開啟hook,socket函數(shù)(圖17)會(huì)在234行調(diào)用fcntl函數(shù),最終調(diào)用圖18的第696行,悄悄的設(shè)置O_NONBLOCK,但user_flag并沒有記錄O_NONBLOCK,這樣即可和第二種情況區(qū)分。然后圖16第353行調(diào)用poll將當(dāng)前協(xié)程掛起,直到有數(shù)據(jù)可讀或者超時(shí),協(xié)程才會(huì)重新調(diào)度。在第二種情況下,不能先將協(xié)程掛起,等待就緒后再切回,因?yàn)橛脩麸@示的設(shè)置O_NONBLOCK是為了立即返回,如果掛起,就緒或超時(shí)后再切回,與用戶需要立即獲得返回結(jié)果的初衷違背。
圖16
圖17
圖18
2.2.2 鉤子函數(shù)write
非阻塞write在發(fā)送緩沖區(qū)沒有空間時(shí)直接返回,發(fā)送緩沖區(qū)有空間時(shí),則拷貝全部或部分(空間不夠)數(shù)據(jù),返回實(shí)際拷貝的字節(jié)數(shù)。
如圖19所示,分三種情況:一,用戶未開啟hook,在372行調(diào)用系統(tǒng)write;二,如果用戶指定了O_NONBLOCK,在378行調(diào)用系統(tǒng)write,此時(shí)是非阻塞的,這種情況與第三種情況區(qū)分的原因見2.2.2.1;三,如果用戶既開啟了hook,又沒有指定O_NONBLOCK,由2.2.2.1可知,libco已悄悄設(shè)置O_NONBLOCK,只要數(shù)據(jù)沒有完全寫入緩沖區(qū),就通過第402行poll將協(xié)程掛起,等待有空余空間喚醒協(xié)程或者超時(shí)喚醒。writeret記錄本次調(diào)用寫入的字節(jié)數(shù),wrotelen記錄總共寫入的字節(jié)數(shù),如果本次寫入沒有空間或出錯(cuò),則直接返回。因?yàn)閣rite明確知道要寫入數(shù)據(jù)的長(zhǎng)度nbyte,而一次可能無(wú)法寫入全部數(shù)據(jù),所以write在while循環(huán)里不斷寫數(shù)據(jù),直到數(shù)據(jù)寫完、寫出錯(cuò),才會(huì)停止寫數(shù)據(jù)。而2.2.2.1的read不知要讀多少數(shù)據(jù),因此讀一次就返回。
圖19
2.2.3 鉤子函數(shù)connect
圖20所示為libco自定義connect函數(shù)片段。如果用戶啟用hook,且未設(shè)置O_NONBLOCK,libco悄悄幫用戶設(shè)置了O_NONBLOCK,但調(diào)用connect后不能立即返回,因?yàn)閏onnect有三次握手的過程,內(nèi)核中對(duì)三次握手的超時(shí)限制是75秒,超時(shí)則會(huì)失敗。libco設(shè)置O_NONBLOCK后,立即調(diào)用系統(tǒng)connect可能會(huì)失敗,因此在圖20中循環(huán)三次,每次設(shè)置超時(shí)時(shí)間25秒,然后掛起協(xié)程,等待connect成功或超時(shí)。
圖20
2.3 epoll激活協(xié)程
協(xié)程hook住了底層socket族函數(shù),設(shè)置了O_NONBLOCK,調(diào)用socket族函數(shù)后,調(diào)用poll注冊(cè)epoll事件并掛起協(xié)程,讓其他協(xié)程執(zhí)行,所有協(xié)程都掛起后通過epoll,在主協(xié)程里檢查注冊(cè)的IO事件,若就緒或超時(shí)則切到對(duì)應(yīng)協(xié)程。
?poll最終會(huì)調(diào)進(jìn)co_poll_inner,圖21、圖22分別為co_poll_inner函數(shù)上、下部分。該函數(shù)的參數(shù)ctx為epoll的環(huán)境變量,包含:epoll描述符iEpollFd,超時(shí)隊(duì)列pTimeOut,已超時(shí)隊(duì)列pstTimeoutList,就緒隊(duì)列pstActiveList,epoll_wait就緒事件集合result。其余參數(shù)fds為socket文件描述符集合,nfds為socket文件描述符數(shù)目,timeout超時(shí)時(shí)間,pollfunc為系統(tǒng)poll函數(shù)。
第908行epfd是epoll_create創(chuàng)建的epoll描述符,相當(dāng)于紅黑樹、就緒鏈表的標(biāo)識(shí),后文通過epfd管理所有fd。919到927行是在nfds少于3個(gè)時(shí)直接在棧上分配空間(更快),否則在堆上分配。第930行記錄就緒fd的回調(diào)函數(shù)OnPollProcessEvent,該回調(diào)函數(shù)會(huì)切回對(duì)應(yīng)的協(xié)程。第940行記錄切回協(xié)程之前的預(yù)處理函數(shù)OnPollPreparePfn。第948行將每個(gè)fd通過epoll_ctl加入紅黑樹。第968行將arg加入超時(shí)隊(duì)列pTimeOut,在980行掛起協(xié)程。等到所有協(xié)程均掛起,主協(xié)程開始運(yùn)行。
圖21
圖22
圖23為主協(xié)程調(diào)用的co_eventloop函數(shù)片段。co_epoll_wait找出所有就緒fd,調(diào)用pfnPrepare即OnPollPreparePfn,將就緒fd從超時(shí)隊(duì)列pTimeOut移除,并加入就緒隊(duì)列pstActiveList。801行用不到。TakeAllTimeout拿出超時(shí)隊(duì)列里所有超時(shí)元素,并在第817行加入pstActiveList。824行到833行將807行取出的未超時(shí)事件再加回超時(shí)隊(duì)列,因?yàn)門akeAllTimeout拿出的不一定都是超時(shí)事件,超時(shí)隊(duì)列底層實(shí)現(xiàn)是60000大小的循環(huán)數(shù)組,存放每毫秒(共60000毫秒)的超時(shí)事件,每個(gè)數(shù)組的元素均是一條鏈表,循環(huán)數(shù)組的目的是便于通過下標(biāo)找到所有超時(shí)鏈表。例如超時(shí)時(shí)間是10毫秒的所有事件均記錄在數(shù)組下標(biāo)為9(在循環(huán)數(shù)組實(shí)際的下標(biāo)可能不是9,僅舉個(gè)例子)的鏈表里,所有超時(shí)時(shí)間大于60000毫秒的事件均記錄在數(shù)組下標(biāo)為59999的鏈表里。如果取出超時(shí)時(shí)間是60000毫秒的事件,TakeAllTimeout會(huì)把超時(shí)時(shí)間大于60000毫秒的也取出來(lái),因此需要再把超時(shí)時(shí)間大于60000毫秒的重新加回超時(shí)隊(duì)列。在第836行是在協(xié)程超時(shí)或fd就緒時(shí)調(diào)用pfnProcess即OnPollProcessEvent切回協(xié)程。協(xié)程切回后,由上文2.1.2可知,協(xié)程接著co_yield_env里co_swap里的stCoRoutineEnv_t* curr_env = co_get_curr_thread_env()繼續(xù)執(zhí)行,co_yield_env執(zhí)行完后從圖22的第981行繼續(xù)執(zhí)行。第986行應(yīng)該是多余的,因?yàn)閜fnPrepare已經(jīng)從超時(shí)隊(duì)列刪除事件。992行調(diào)用epoll刪除當(dāng)前協(xié)程的就緒fd。
注意到co_poll_inner傳入的fd數(shù)組,而arg只是鏈表中的一個(gè)元素。假設(shè)co_poll_inner傳入10個(gè)文件描述符,如果只有1個(gè)fd就緒,OnPollPreparePfn從pTimeOut刪除arg,則10個(gè)文件fd都從超時(shí)隊(duì)列刪除,在圖22切回協(xié)程時(shí)在第987到995行將10個(gè)描述符都從紅黑樹刪除,然后應(yīng)用層需要將9個(gè)未就緒的fd重新調(diào)用co_poll_inner再加入紅黑樹。如果每次只就緒一個(gè)fd,這樣共需要加入紅黑樹:10 + 9+ 8 +… +1次,效率低于10次poll,每次只poll一個(gè)fd。co_poll_inner提供傳入fd數(shù)組的原因是,co_poll_inner是poll調(diào)用的,而poll是hook的系統(tǒng)函數(shù),不能改變系統(tǒng)的語(yǔ)義。系統(tǒng)poll支持?jǐn)?shù)組的原因是,調(diào)用系統(tǒng)poll一次,可檢查多個(gè)fd,比調(diào)用系統(tǒng)poll多次,每次檢查一個(gè)fd,效率更高。因此系統(tǒng)poll適合一次poll多個(gè)fd,但libco自定義的鉤子函數(shù)poll不適合一次poll多個(gè)fd,所以libco使用poll時(shí)需避免一次poll多個(gè)fd。
圖23
2.4?信號(hào)量激活協(xié)程
?libco提供信號(hào)量機(jī)制,類似golang的channel。example_cond.cpp舉了生產(chǎn)者和消費(fèi)者的例子,一個(gè)協(xié)程做生產(chǎn)者,另一個(gè)做消費(fèi)者,生產(chǎn)者產(chǎn)生數(shù)據(jù)后,喚醒消費(fèi)者。
主協(xié)程首先創(chuàng)建消費(fèi)者協(xié)程,并通過co_resume切換到消費(fèi)者協(xié)程函數(shù)Consumer,co_resume會(huì)調(diào)用到coctx_swap保存主協(xié)程的棧內(nèi)容,并將消費(fèi)者協(xié)程初始化在regs里的esp、ebp、返回地址等數(shù)據(jù)彈到寄存器和消費(fèi)者協(xié)程棧里,此時(shí)開始調(diào)度消費(fèi)者協(xié)程。Consumer在檢查到task_queue為空時(shí),將消費(fèi)者協(xié)程通過co_cond_timedwait,加入超時(shí)隊(duì)列pTimeout,并加入信號(hào)量隊(duì)列env->cond,然后通過co_yield_ct切換回主協(xié)程,co_yield_ct調(diào)用到coctx_swap函數(shù),保存消費(fèi)者協(xié)程,并恢復(fù)主協(xié)程。主協(xié)程接著創(chuàng)建生產(chǎn)者協(xié)程,通過co_resume切換到生產(chǎn)者協(xié)程。生產(chǎn)者協(xié)程函數(shù)Producer在task_queue里添加數(shù)據(jù)后,通過co_cond_signal從pTimeOut、env->cond里刪掉消費(fèi)者協(xié)程,并加入就緒隊(duì)列pstActiveList,然后通過poll掛起將生產(chǎn)者協(xié)程加入超時(shí)隊(duì)列pTimeout,并在co_poll_inner通過co_yield_env切回到主協(xié)程。主協(xié)程在co_eventloop遍歷就緒隊(duì)列pstActiveList,調(diào)度消費(fèi)者協(xié)程消費(fèi)task_queue里數(shù)據(jù)。
2.5?超時(shí)激活協(xié)程
當(dāng)前協(xié)程通過語(yǔ)句poll(NULL, 0, duration),可設(shè)置協(xié)程的超時(shí)時(shí)間間隔duration。poll是被hook住的函數(shù),執(zhí)行poll之后,當(dāng)前協(xié)程會(huì)被加到超時(shí)隊(duì)列pTimeOut,并被切換到其他協(xié)程,所有協(xié)程掛起后,主協(xié)程掃描超時(shí)隊(duì)列,找到超時(shí)的協(xié)程,并切換。因此可用poll實(shí)現(xiàn)協(xié)程的睡眠。注意不可用sleep,因?yàn)閟leep會(huì)睡眠線程,線程睡眠了,協(xié)程無(wú)法被調(diào)度,所有的協(xié)程也都不會(huì)執(zhí)行了。
2.6?回調(diào)激活協(xié)程
使用libco需要hook住socket族函數(shù),但業(yè)務(wù)代碼一般使用現(xiàn)成的非libco網(wǎng)絡(luò)庫(kù),如果改該網(wǎng)絡(luò)庫(kù)使其hook住socket族函數(shù),工作量太大。因此業(yè)務(wù)側(cè)可在協(xié)程里異步調(diào)用,異步調(diào)用后掛起協(xié)程,所有的異步回調(diào)使用同一函數(shù),在同一回調(diào)函數(shù)里,根據(jù)異步調(diào)用時(shí)的標(biāo)記決定喚醒哪個(gè)協(xié)程。該方案也可做到不分離異步調(diào)用和處理異步調(diào)用返回的數(shù)據(jù)。但需要業(yè)務(wù)側(cè)添加一個(gè)統(tǒng)一的異步回調(diào)函數(shù),并在該函數(shù)里根據(jù)標(biāo)識(shí)調(diào)度協(xié)程。
3.?協(xié)程池
協(xié)程池的好處是不用每次使用協(xié)程時(shí)都創(chuàng)建新的協(xié)程。創(chuàng)建新協(xié)程主要開銷有兩個(gè):一,需要malloc協(xié)程環(huán)境stCoRoutine_t,stCoRoutine_t有4K大小的協(xié)程私有變量數(shù)組;二,協(xié)程棧128K。每次創(chuàng)建新的協(xié)程要分配這么大的空間需要有時(shí)間開銷,另外頻繁申請(qǐng)、銷毀會(huì)導(dǎo)致內(nèi)存碎片的產(chǎn)生。即使在共享?xiàng)DJ较虏挥脼槊總€(gè)協(xié)程申請(qǐng)協(xié)程棧,也會(huì)有第一部分stCoRoutine_t的開銷。每次從協(xié)程池取出協(xié)程后,將stCoRoutine_t.pfn重新初始化為用戶自定義的協(xié)程函數(shù)即可。
4.?協(xié)程私有變量
協(xié)程私有變量為每個(gè)協(xié)程獨(dú)享,不會(huì)被其他協(xié)程修改,可參看example_specific.cpp。協(xié)程私有變量通過宏CO_ROUTINE_SPECIFIC定義,然后重載操作符->實(shí)現(xiàn)set/get。該宏首先定義線程私有變量,聲明pthread_key_t類型的變量,并通過pthread_once_t保證pthread_key_t類型變量只被create一次。如果當(dāng)前沒有創(chuàng)建過協(xié)程,或者當(dāng)前協(xié)程是主協(xié)程,則通過co_pthread_setspecific/pthread_getspecific來(lái)set/get線程私有變量。否則在當(dāng)前協(xié)程的aSpec數(shù)組里set/get協(xié)程私有變量,pthread_key_t類型的變量作為數(shù)組下標(biāo)。
pthread_once控制pthread_key_create在多線程環(huán)境中只會(huì)執(zhí)行一次。_routine_init_##name控制一個(gè)線程的多個(gè)協(xié)程只會(huì)調(diào)用pthread_once一次,避免每次set、get變量時(shí)均調(diào)用pthread_once。
因?yàn)閰f(xié)程私有變量需要重載操作符->,因此CO_ROUTINE_SPECIFIC第一個(gè)參數(shù)必須為結(jié)構(gòu)體或類。雖然aSpec有1024個(gè)容量,但通過pthread_key_create創(chuàng)建的key是從1開始,因此協(xié)程私有變量實(shí)際可容納1023個(gè)元素。
1.?共享?xiàng)O聝?nèi)容篡改
圖24所示代碼,協(xié)程RoutineFuncA、RoutineFuncB共用一個(gè)共享?xiàng)!_\(yùn)行代碼,第7行輸出1,第14行輸出2,但在第14行之前只在RoutineFuncA里將global_ptr指向的內(nèi)容設(shè)置為1。原因是在RoutineFuncA里global_ptr指向routineidA的地址,而在RoutineFuncB里,因?yàn)槭枪蚕項(xiàng)?#xff0c;所以routineidB和routineidA的地址一樣,而該地址處的值被第13行修改為2,因此第14行打印出來(lái)2。所以共享?xiàng)DJ较?#xff0c;需避免協(xié)程之前傳遞地址,因?yàn)榈刂返膬?nèi)容會(huì)被篡改。
圖24
2. poll效率
libco自定義的鉤子函數(shù)poll,雖然支持傳入fd數(shù)組,但一次poll多個(gè)fd的效率,不如多次poll每次poll一個(gè)fd的效率,原因見2.3 epoll激活協(xié)程。
3.棧溢出
每個(gè)協(xié)程棧使用128K的堆內(nèi)存,128K是malloc使用brk和mmap分配堆內(nèi)存的分界線。但128K的空間可能不夠一個(gè)協(xié)程使用,導(dǎo)致協(xié)程棧溢出。協(xié)程棧溢出問題,有如下三種解決方案。
3.1?堆內(nèi)存
大內(nèi)存不在協(xié)程棧上分配,通過malloc、new在堆上分配,可以使用編譯參數(shù)-Wstack-usage檢查使用較大棧空間的變量。另外棧以外空間的非法讀寫不一定會(huì)Crash,因此在128K協(xié)程棧上下添加保護(hù)頁(yè),并通過mprotect設(shè)置保護(hù)頁(yè)權(quán)限,非法讀寫保護(hù)頁(yè)時(shí),程序會(huì)立即Crash。推薦使用該方案。
3.2?自動(dòng)擴(kuò)容協(xié)程棧
在協(xié)程調(diào)用的所有函數(shù)入口檢查棧使用率,如果棧使用率超過閾值,就自動(dòng)擴(kuò)容協(xié)程棧。在進(jìn)入函數(shù)時(shí),聲明一個(gè)臨時(shí)變量,獲取該變量的地址varaddr,因?yàn)閰f(xié)程棧空間高地址為stack_mem->stack_bp,低地址為stack_mem->stack_buffer,因此使用率為(stack_mem->stack_bp -varaddr) / (stack_mem->stack_bp - stack_mem->stack_buffer)。如果使用率超過閾值,重新申請(qǐng)大空間的新協(xié)程棧,并將舊協(xié)程棧拷貝到新協(xié)程棧。?
但如果協(xié)程函數(shù)里使用了指針,比如指針ptr指向舊協(xié)程棧內(nèi)存地址0xffd344c0,棧拷貝后,訪問ptr的內(nèi)容仍然是訪問舊協(xié)程棧0xffd344c0,導(dǎo)致非法訪問。在協(xié)程函數(shù)里使用指針的概率很大,比如聲明數(shù)組,因此該方案風(fēng)險(xiǎn)較大。
?golang支持協(xié)程棧的自動(dòng)擴(kuò)容,1.3之前是分段棧,即老棧保留,另外再開辟新棧,老棧新棧一起使用,超出老棧的數(shù)據(jù)用新棧保存。使用分段棧存在hot split問題,所以1.3及之后采用連續(xù)棧,老棧不夠用時(shí),申請(qǐng)大空間的新棧,并將老棧數(shù)據(jù)拷貝到新棧。拷貝老棧到新棧時(shí),golang也面臨指針失效的問題,原文參考,但golang的編譯器會(huì)管理每個(gè)指針的位置,原文參考?。只要知道每個(gè)指針在老棧的位置,算出相對(duì)協(xié)程棧頂偏移量,即可將指針遷移到新棧的位置。
3.3?虛擬內(nèi)存
因?yàn)閙alloc使用的是虛擬內(nèi)存,而物理內(nèi)存按需分配,協(xié)程占用的內(nèi)存并不是malloc的內(nèi)存大小,而是實(shí)際使用到的內(nèi)存大小,因此可將malloc(128K)改為malloc(8M)。但在非協(xié)程池模式下,頻繁mmap 8M的堆內(nèi)存會(huì)導(dǎo)致大量缺頁(yè)中斷。在協(xié)程池模式下,假設(shè)池子有10個(gè)協(xié)程,10個(gè)協(xié)程輪流處理同一個(gè)用戶自定義的協(xié)程函數(shù),而該函數(shù)若最終會(huì)使用8M的物理內(nèi)存,最終導(dǎo)致協(xié)程池里的所有協(xié)程實(shí)際使用內(nèi)存都為8M,在協(xié)程池大的情況下,會(huì)迅速耗盡內(nèi)存。因此在回收協(xié)程池里的協(xié)程時(shí),需要檢測(cè)物理內(nèi)存實(shí)際使用量(方法同3.2.2),超過128K時(shí),需要銷毀協(xié)程,重建128K的協(xié)程加入?yún)f(xié)程池。
總結(jié)
以上是生活随笔為你收集整理的万字长文 | 漫谈libco协程设计及实现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 直播预告 |【数据挖掘主题报告】多样流量
- 下一篇: 直播马上开始│走进腾讯云物联网