semaphore 的原理与实现
Semaphore
數據結構
//?Go?語言中暴露的?semaphore?實現 //?具體的用法是提供?sleep?和?wakeup?原語 //?以使其能夠在其它同步原語中的競爭情況下使用 //?因此這里的?semaphore?和?Linux?中的?futex?目標是一致的 //?只不過語義上更簡單一些 // //?也就是說,不要認為這些是信號量 //?把這里的東西看作?sleep?和?wakeup?實現的一種方式 //?每一個?sleep?都會和一個?wakeup?配對 //?即使在發生?race?時,wakeup?在?sleep?之前時也是如此 // //?See?Mullender?and?Cox,?``Semaphores?in?Plan?9,'' //?http://swtch.com/semaphore.pdf//?為?sync.Mutex?準備的異步信號量//?semaRoot?持有一棵?地址各不相同的?sudog(s.elem)?的平衡樹 //?每一個?sudog?都反過來指向(通過?s.waitlink)一個在同一個地址上等待的其它?sudog?們 //?同一地址的 sudog 的內部列表上的操作時間復雜度都是 O(1)。頂層 semaRoot 列表的掃描 //?的時間復雜度是 O(log n),n 是被哈希到同一個 semaRoot 的不同地址的總數,每一個地址上都會有一些 goroutine 被阻塞。 //?訪問?golang.org/issue/17953?來查看一個在引入二級列表之前性能較差的程序樣例,test/locklinear.go //?中有一個復現這個樣例的測試 type?semaRoot?struct?{lock??mutextreap?*sudog?//?root?of?balanced?tree?of?unique?waiters.nwait?uint32?//?Number?of?waiters.?Read?w/o?the?lock. }//?Prime?to?not?correlate?with?any?user?patterns. const?semTabSize?=?251var?semtable?[semTabSize]struct?{root?semaRootpad??[sys.CacheLineSize?-?unsafe.Sizeof(semaRoot{})]byte }func?semroot(addr?*uint32)?*semaRoot?{return?&semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root } ┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐????????????????? │??0??│??1??│??2??│??3??│??4??│?????????.....??????????│?250?│????????????????? └─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘?????????????????│??????????????????????????????????????????????????????│????????????????????│??????????????????????????????????????????????????????│????????????????????└──┐???????????????????????????????????????????????????└─┐??????????????????│?????????????????????????????????????????????????????│??????????????????│?????????????????????????????????????????????????????│??????????????????▼?????????????????????????????????????????????????????▼??????????????????┌─────────┐???????????????????????????????????????????┌─────────┐?????????????│?struct??│???????????????????????????????????????????│?struct??│?????????????├─────────┴─────────┐?????????????????????????????????├─────────┴─────────┐???│???root?semaRoot???│──┐??????????????????????????????│???root?semaRoot???│──┐├───────────────────┤??│??????????????????????????????├───────────────────┤??││????????pad????????│??│??????????????????????????????│????????pad????????│??│└───────────────────┘??│??????????????????????????????└───────────────────┘??││?????????????????????????????????????????????????????│┌────────────────┘????????????????????????????????????┌────────────────┘│?????????????????????????????????????????????????????│?????????????????│?????????????????????????????????????????????????????│?????????????????▼?????????????????????????????????????????????????????▼?????????????????┌──────────┐??????????????????????????????????????????┌──────────┐????????????│?semaRoot?│??????????????????????????????????????????│?semaRoot?│????????????├──────────┴────────┐?????????????????????????????????├──────────┴────────┐???│????lock?mutex?????│?????????????????????????????????│????lock?mutex?????│???├───────────────────┤?????????????????????????????????├───────────────────┤???│???treap?*sudog????│?????????????????????????????????│???treap?*sudog????│???├───────────────────┤?????????????????????????????????├───────────────────┤???│???nwait?uint32????│?????????????????????????????????│???nwait?uint32????│???└───────────────────┘?????????????????????????????????└───────────────────┘???treap 結構:
?????????????????????????????????┌──────────┐????????????????????????????????????┌─┬─?│??sudog???│????????????????????????????????????│?│??├──────────┴────────────┐???????????????????????┌─────────────────────┼─┼──│??????prev?*sudog??????│???????????????????????│?????????????????????│?│??├───────────────────────┤???????????????????????│?????????????????????│?│??│??????next?*sudog??????│────┐??????????????????│?????????????????????│?│??├───────────────────────┤????│??????????????????│?????????????????????│?│??│?????parent?*sudog?????│????│??????????????????│?????????????????????│?│??├───────────────────────┤????│??????????????????│?????????????????????│?│??│??elem?unsafe.Pointer??│????│??????????????????│?????????????????????│?│??├───────────────────────┤????│??????????????????│?????????????????????│?│??│?????ticket?uint32?????│????│??????????????????│?????????????????????│?│??└───────────────────────┘????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????│?????????????????????│?│???????????????????????????????│??????????????????▼?????????????????????│?│???????????????????????????????▼?????????????????? ┌──────────┐????????????????│?│?????????????????????????┌──────────┐????????????? │??sudog???│????????????????│?│?????????????????????????│??sudog???│????????????? ├──────────┴────────────┐???│?│?????????????????????????├──────────┴────────────┐ │??????prev?*sudog??????│???│?│?????????????????????????│??????prev?*sudog??????│ ├───────────────────────┤???│?│?????????????????????????├───────────────────────┤ │??????next?*sudog??????│???│?│?????????????????????????│??????next?*sudog??????│ ├───────────────────────┤???│?│?????????????????????????├───────────────────────┤ │?????parent?*sudog?????│───┘?└─────────────────────────│?????parent?*sudog?????│ ├───────────────────────┤???????????????????????????????├───────────────────────┤ │??elem?unsafe.Pointer??│???????????????????????????????│??elem?unsafe.Pointer??│ ├───────────────────────┤???????????????????????????????├───────────────────────┤ │?????ticket?uint32?????│???????????????????????????????│?????ticket?uint32?????│ └───────────────────────┘???????????????????????????????└───────────────────────┘在這個 treap 結構里,從 elem 的視角(其實就是 lock 的 addr)來看,這個結構是個二叉搜索樹。從 ticket 的角度來看,整個結構就是一個小頂堆。
所以才叫樹堆(treap)。
相同 addr,即對同一個 mutex 上鎖的 g,會阻塞在同一個地址上。這些阻塞在同一個地址上的 goroutine 會被打包成 sudog,組成一個鏈表。用 sudog 的 waitlink 相連:
┌──────────┐?????????????????????????┌──────────┐??????????????????????????┌──────────┐????????????? │??sudog???│??????????????????┌─────?│??sudog???│???????????????????┌─────?│??sudog???│????????????? ├──────────┴────────────┐?????│??????├──────────┴────────────┐??????│??????├──────────┴────────────┐ │????waitlink?*sudog????│─────┘??????│????waitlink?*sudog????│──────┘??????│????waitlink?*sudog????│ ├───────────────────────┤????????????├───────────────────────┤?????????????├───────────────────────┤ │????waittail?*sudog????│????????????│????waittail?*sudog????│?????????????│????waittail?*sudog????│ └───────────────────────┘????????????└───────────────────────┘?????????????└───────────────────────┘中間的元素的?waittail?都會指向最后一個元素:
┌──────────┐??????????????????????????????????????????????????????????????????????????????????????????? │??sudog???│??????????????????????????????????????????????????????????????????????????????????????????? ├──────────┴────────────┐?????????????????????????????????????????????????????????????????????????????? │????waitlink?*sudog????│?????????????????????????????????????????????????????????????????????????????? ├───────────────────────┤?????????????????????????????????????????????????????????????????????????????? │????waittail?*sudog????│───────────────────────────────────────────────────────────┐?????????????????? └───────────────────────┘???????????????????????????????????????????????????????????│??????????????????┌──────────┐??????????????????????????????????????│??????????????????│??sudog???│??????????????????????????????????????│??????????????????├──────────┴────────────┐?????????????????????????│??????????????????│????waitlink?*sudog????│?????????????????????????│??????????????????├───────────────────────┤?????????????????????????│??????????????????│????waittail?*sudog????│─────────────────────────┤??????????????????└───────────────────────┘?????????????????????????▼??????????????????┌──────────┐?????????????│??sudog???│?????????????├──────────┴────────────┐│????waitlink?*sudog????│├───────────────────────┤│????waittail?*sudog????│└───────────────────────┘對外封裝
在 sema.go 里實現的內容,用?go:linkname?導出給 sync、poll 庫來使用,也是在鏈接期做了些手腳:
//go:linkname?sync_runtime_Semacquire?sync.runtime_Semacquire func?sync_runtime_Semacquire(addr?*uint32)?{semacquire1(addr,?false,?semaBlockProfile) }//go:linkname?poll_runtime_Semacquire?internal/poll.runtime_Semacquire func?poll_runtime_Semacquire(addr?*uint32)?{semacquire1(addr,?false,?semaBlockProfile) }//go:linkname?sync_runtime_Semrelease?sync.runtime_Semrelease func?sync_runtime_Semrelease(addr?*uint32,?handoff?bool)?{semrelease1(addr,?handoff) }//go:linkname?sync_runtime_SemacquireMutex?sync.runtime_SemacquireMutex func?sync_runtime_SemacquireMutex(addr?*uint32,?lifo?bool)?{semacquire1(addr,?lifo,?semaBlockProfile|semaMutexProfile) }//go:linkname?poll_runtime_Semrelease?internal/poll.runtime_Semrelease func?poll_runtime_Semrelease(addr?*uint32)?{semrelease(addr) }實現
sem 本身支持 acquire 和 release,其實就是 OS 里常說的 P 操作和 V 操作。
公共部分
func?cansemacquire(addr?*uint32)?bool?{for?{v?:=?atomic.Load(addr)if?v?==?0?{return?false}if?atomic.Cas(addr,?v,?v-1)?{return?true}} }acquire 過程
type?semaProfileFlags?intconst?(semaBlockProfile?semaProfileFlags?=?1?<<?iotasemaMutexProfile )//?Called?from?runtime. func?semacquire(addr?*uint32)?{semacquire1(addr,?false,?0) }func?semacquire1(addr?*uint32,?lifo?bool,?profile?semaProfileFlags)?{gp?:=?getg()if?gp?!=?gp.m.curg?{throw("semacquire?not?on?the?G?stack")}//?低成本的情況if?cansemacquire(addr)?{return}//?高成本的情況://????增加?waiter?count?的值//????再嘗試調用一次?cansemacquire,成本了就直接返回//????沒成功就把自己作為一個?waiter?入隊//????sleep//????(之后?waiter?的?descriptor?被?signaler?用?dequeue?踢出)s?:=?acquireSudog()root?:=?semroot(addr)t0?:=?int64(0)s.releasetime?=?0s.acquiretime?=?0s.ticket?=?0for?{lock(&root.lock)//?給?nwait?加一,這樣后來的就不會在?semrelease?中進低成本的路徑了atomic.Xadd(&root.nwait,?1)//?檢查?cansemacquire?避免錯過了喚醒if?cansemacquire(addr)?{atomic.Xadd(&root.nwait,?-1)unlock(&root.lock)break}//?在?cansemacquire?之后的?semrelease?都可以知道我們正在等待//?(上面設置了?nwait),所以會直接進入?sleep//?注:?這里說的?sleep?其實就是?goparkunlockroot.queue(addr,?s,?lifo)goparkunlock(&root.lock,?"semacquire",?traceEvGoBlockSync,?4)if?s.ticket?!=?0?||?cansemacquire(addr)?{break}}if?s.releasetime?>?0?{blockevent(s.releasetime-t0,?3)}releaseSudog(s) }release 過程
func?semrelease(addr?*uint32)?{semrelease1(addr,?false) }func?semrelease1(addr?*uint32,?handoff?bool)?{root?:=?semroot(addr)atomic.Xadd(addr,?1)//?低成本情況:?沒有?waiter?//?這個?atomic?的檢查必須發生在?xadd?之前,以避免錯誤喚醒//?(具體參見?semacquire?中的循環)if?atomic.Load(&root.nwait)?==?0?{return}//?高成本情況:?搜索?waiter?并喚醒它lock(&root.lock)if?atomic.Load(&root.nwait)?==?0?{//?count?值已經被另一個?goroutine?消費了//?所以我們不需要喚醒其它?goroutine?了unlock(&root.lock)return}s,?t0?:=?root.dequeue(addr)if?s?!=?nil?{atomic.Xadd(&root.nwait,?-1)}unlock(&root.lock)if?s?!=?nil?{?//?可能會很慢,所以先解鎖acquiretime?:=?s.acquiretimeif?acquiretime?!=?0?{mutexevent(t0-acquiretime,?3)}if?s.ticket?!=?0?{throw("corrupted?semaphore?ticket")}if?handoff?&&?cansemacquire(addr)?{s.ticket?=?1}readyWithTime(s,?5)} }func?readyWithTime(s?*sudog,?traceskip?int)?{if?s.releasetime?!=?0?{s.releasetime?=?cputicks()}goready(s.g,?traceskip) }treap 結構
sudog 按照地址 hash 到 251 個 bucket 中的其中一個,每一個 bucket 都是一棵 treap。而相同 addr 上的 sudog 會形成一個鏈表。
為啥同一個地址的 sudog 不需要展開放在 treap 中呢?顯然,sudog 喚醒的時候,block 在同一個 addr 上的 goroutine,說明都是加的同一把鎖,這些 goroutine 被喚醒肯定是一起被喚醒的,相同地址的 g 并不需要查找才能找到,只要決定是先進隊列的被喚醒(fifo)還是后進隊列的被喚醒(lifo)就可以了。
//?queue?函數會把?s?添加到?semaRoot?上阻塞的?goroutine?們中 //?實際上就是把?s?添加到其地址對應的?treap?上 func?(root?*semaRoot)?queue(addr?*uint32,?s?*sudog,?lifo?bool)?{s.g?=?getg()s.elem?=?unsafe.Pointer(addr)s.next?=?nils.prev?=?nilvar?last?*sudogpt?:=?&root.treapfor?t?:=?*pt;?t?!=?nil;?t?=?*pt?{if?t.elem?==?unsafe.Pointer(addr)?{//?Already?have?addr?in?list.if?lifo?{//?treap?中在?t?的位置用?s?覆蓋掉?t*pt?=?ss.ticket?=?t.tickets.acquiretime?=?t.acquiretimes.parent?=?t.parents.prev?=?t.prevs.next?=?t.nextif?s.prev?!=?nil?{s.prev.parent?=?s}if?s.next?!=?nil?{s.next.parent?=?s}//?把?t?放在?s?的?wait?list?的第一個位置s.waitlink?=?ts.waittail?=?t.waittailif?s.waittail?==?nil?{s.waittail?=?t}t.parent?=?nilt.prev?=?nilt.next?=?nilt.waittail?=?nil}?else?{//?把?s?添加到?t?的等待列表的末尾if?t.waittail?==?nil?{t.waitlink?=?s}?else?{t.waittail.waitlink?=?s}t.waittail?=?ss.waitlink?=?nil}return}last?=?tif?uintptr(unsafe.Pointer(addr))?<?uintptr(t.elem)?{pt?=?&t.prev}?else?{pt?=?&t.next}}//?把?s?作為樹的新的葉子插入進去//?平衡樹使用?ticket?作為堆的權重值,這個?ticket?是隨機生成的//?也就是說,這個結構以元素地址來看的話,是一個二叉搜索樹//?同時用?ticket?值使其同時又是一個小頂堆,滿足//?s.ticket?<=?both?s.prev.ticket?and?s.next.ticket.//?https://en.wikipedia.org/wiki/Treap//?http://faculty.washington.edu/aragon/pubs/rst89.pdf////?s.ticket?會在一些地方和?0?相比,因此只設置最低位的?bit//?這樣不會明顯地影響?treap?的質量?s.ticket?=?fastrand()?|?1s.parent?=?last*pt?=?s//?按照?ticket?來進行旋轉,以滿足?treap?的性質for?s.parent?!=?nil?&&?s.parent.ticket?>?s.ticket?{if?s.parent.prev?==?s?{root.rotateRight(s.parent)}?else?{if?s.parent.next?!=?s?{panic("semaRoot?queue")}root.rotateLeft(s.parent)}} }//?dequeue?會搜索到阻塞在?addr?地址的?semaRoot?中的第一個?goroutine //?如果這個?sudog?需要進行?profile,dequeue?會返回它被喚醒的時間(now),否則的話?now?為?0 func?(root?*semaRoot)?dequeue(addr?*uint32)?(found?*sudog,?now?int64)?{ps?:=?&root.treaps?:=?*psfor?;?s?!=?nil;?s?=?*ps?{if?s.elem?==?unsafe.Pointer(addr)?{goto?Found}if?uintptr(unsafe.Pointer(addr))?<?uintptr(s.elem)?{ps?=?&s.prev}?else?{ps?=?&s.next}}return?nil,?0Found:now?=?int64(0)if?s.acquiretime?!=?0?{now?=?cputicks()}if?t?:=?s.waitlink;?t?!=?nil?{//?替換掉同樣在 addr 上等待的 t。*ps?=?tt.ticket?=?s.tickett.parent?=?s.parentt.prev?=?s.previf?t.prev?!=?nil?{t.prev.parent?=?t}t.next?=?s.nextif?t.next?!=?nil?{t.next.parent?=?t}if?t.waitlink?!=?nil?{t.waittail?=?s.waittail}?else?{t.waittail?=?nil}t.acquiretime?=?nows.waitlink?=?nils.waittail?=?nil}?else?{//?向下旋轉?s?到葉節點,以進行刪除,同時要考慮優先級for?s.next?!=?nil?||?s.prev?!=?nil?{if?s.next?==?nil?||?s.prev?!=?nil?&&?s.prev.ticket?<?s.next.ticket?{root.rotateRight(s)}?else?{root.rotateLeft(s)}}//?Remove?s,?now?a?leaf.//?刪除?s,現在是葉子節點了if?s.parent?!=?nil?{if?s.parent.prev?==?s?{s.parent.prev?=?nil}?else?{s.parent.next?=?nil}}?else?{root.treap?=?nil}}s.parent?=?nils.elem?=?nils.next?=?nils.prev?=?nils.ticket?=?0return?s,?now }總結
以上是生活随笔為你收集整理的semaphore 的原理与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 只会使用 WaitGroup?你应该学习
- 下一篇: 曹大带我学 Go(1)——调度的本质