阿里 双11 同款,流量防卫兵 Sentinel go 源码解读
作者 | 于雨? apache/dubbo-go 項(xiàng)目負(fù)責(zé)人
本文作者系 apache/dubbo-go 項(xiàng)目負(fù)責(zé)人,目前在 dubbogo 項(xiàng)目中已內(nèi)置可用 sentinel-go,如果想單獨(dú)使用可參考 在 dubbo-go 中使用 sentinel 一文,若有其他疑問可進(jìn) dubbogo社區(qū)【釘釘群 23331795】進(jìn)行溝通。
導(dǎo)讀:本文主要分析阿里巴巴集團(tuán)開源的流量控制中間件 Sentinel,其原生支持了 Java/Go/C++ 等多種語言,本文僅僅分析其 Go 語言實(shí)現(xiàn)。下文如無特殊說明,sentinel 指代 Sentinel-Go。
1 基本概念 Resource ?和 Rule
1.1 Resource
// ResourceType represents classification of the resourcestype ResourceType int32const (ResTypeCommon ResourceType = iotaResTypeWebResTypeRPC)// TrafficType describes the traffic type: Inbound or Outboundtype TrafficType int32const (// Inbound represents the inbound traffic (e.g. provider)Inbound TrafficType = iota// Outbound represents the outbound traffic (e.g. consumer)Outbound)// ResourceWrapper represents the invocationtype ResourceWrapper struct {// global unique resource namename string// resource classificationclassification ResourceType// Inbound or OutboundflowType TrafficType}Resource(ResourceWrapper) 存儲(chǔ)了應(yīng)用場景 ResourceType,以及目標(biāo)流控的方向 FlowType(TrafficType)。
1.2 Entry
// EntryOptions represents the options of a Sentinel resource entry.type EntryOptions struct {resourceType base.ResourceTypeentryType base.TrafficTypeacquireCount uint32slotChain *base.SlotChain}type EntryContext struct {entry *SentinelEntry// Use to calculate RTstartTime uint64Resource *ResourceWrapperStatNode StatNodeInput *SentinelInput// the result of rule slots checkRuleCheckResult *TokenResult}type SentinelEntry struct {res *ResourceWrapper// one entry bounds with one contextctx *EntryContextsc *SlotChain}Entry 實(shí)體 SentinelEntry 關(guān)聯(lián)了 Resource(ResourceWrapper) 以及其流控規(guī)則集合 SlotChain。每個(gè) Entry 實(shí)體有一個(gè)上下文環(huán)境 EntryContext,存儲(chǔ)每個(gè) Rule 檢測時(shí)用到的一些流控參數(shù)和流控判定結(jié)果。
值得注意的是,SentinelEntry.sc 值來自于 EntryOptions.slotChain,EntryOptions.slotChain 存儲(chǔ)了全局 SlotChain 對(duì)象 api/slot_chain.go:globalSlotChain。
至于何為 SlotChain,就是 sentinel 提供的所有的流控組件的集合,可以簡單地認(rèn)為每個(gè)流控組件就是一個(gè) Slot,其詳細(xì)分析見[3.5 SlotChain]。
sentinel 一些變量和函數(shù)命名的可讀性較差,如 EntryOptions.acquireCount 實(shí)在無法讓人望文生義,看過函數(shù) core/api.go:WithAcquireCount() 的注釋才明白:EntryOptions.acquireCount 是批量動(dòng)作執(zhí)行次數(shù)。如有的一次 RPC 請(qǐng)求中調(diào)用了服務(wù)端的一個(gè)服務(wù)接口,則取值 1【也是 EntryOptions.acquireCount 的默認(rèn)取值】,如果調(diào)用了服務(wù)端的 3 個(gè)服務(wù)接口,則取值 3。所以建議改名為 EntryOptions.batchCount 比較好,考慮到最小改動(dòng)原則,可以在保留 core/api.go:WithAcquireCount() 的同時(shí)增加一個(gè)同樣功能的 core/api.go:WithBatchCount() 接口。相關(guān)改進(jìn)已經(jīng)提交到 ?pr 263。
1.3 Rule
type TokenCalculateStrategy int32const (Direct TokenCalculateStrategy = iotaWarmUp)type ControlBehavior int32const (Reject ControlBehavior = iotaThrottling)// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metrictype Rule struct {// Resource represents the resource name.Resource string `json:"resource"`ControlBehavior ControlBehavior `json:"controlBehavior"`// Threshold means the threshold during StatIntervalInMs// If StatIntervalInMs is 1000(1 second), Threshold means QPSThreshold float64 `json:"threshold"`MaxQueueingTimeMs uint32 `json:"maxQueueingTimeMs"`// StatIntervalInMs indicates the statistic interval and it's the optional setting for flow Rule.// If user doesn't set StatIntervalInMs, that means using default metric statistic of resource.// If the StatIntervalInMs user specifies can not reuse the global statistic of resource,// sentinel will generate independent statistic structure for this rule.StatIntervalInMs uint32 `json:"statIntervalInMs"`}Rule 記錄了某 Resource 的限流判定閾值 Threshold、限流時(shí)間窗口計(jì)時(shí)長度 StatIntervalInMs 以及 觸發(fā)限流后的判罰動(dòng)作 ControlBehavior。
上面核心是 Rule 的接口 RuleCheckSlot,至于 StatSlot 則用于統(tǒng)計(jì) sentinel 自身的運(yùn)行 metrics。
1.4 Flow
當(dāng)前章節(jié)主要分析流控中的限流(core/flow),根據(jù)流控的處理流程梳理 sentinel 整體骨架。
1.4.1 TrafficShapingController
所謂 TrafficShapingController,顧名思義,就是 流量塑形控制器,是流控的具體實(shí)施者。
// core/flow/traffic_shaping.go// TrafficShapingCalculator calculates the actual traffic shaping threshold// based on the threshold of rule and the traffic shaping strategy.type TrafficShapingCalculator interface {CalculateAllowedTokens(acquireCount uint32, flag int32) float64}type DirectTrafficShapingCalculator struct {threshold float64}func (d *DirectTrafficShapingCalculator) CalculateAllowedTokens(uint32, int32) float64 {return d.threshold}TrafficShapingCalculator 接口用于計(jì)算限流的上限,如果不使用 warm-up 功能,可以不去深究其實(shí)現(xiàn),其實(shí)體之一 DirectTrafficShapingCalculator 返回 Rule.Threshold【用戶設(shè)定的限流上限】。
// TrafficShapingChecker performs checking according to current metrics and the traffic// shaping strategy, then yield the token result.type TrafficShapingChecker interface {DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult}type RejectTrafficShapingChecker struct {rule *Rule}func (d *RejectTrafficShapingChecker) DoCheck(resStat base.StatNode, acquireCount uint32, threshold float64) *base.TokenResult {metricReadonlyStat := d.BoundOwner().boundStat.readOnlyMetricif metricReadonlyStat == nil {return nil}curCount := float64(metricReadonlyStat.GetSum(base.MetricEventPass))if curCount+float64(acquireCount) > threshold {return base.NewTokenResultBlockedWithCause(base.BlockTypeFlow, "", d.rule, curCount)}return nil}RejectTrafficShapingChecker 依據(jù) Rule.Threshold 判定 Resource 在當(dāng)前時(shí)間窗口是否超限,其限流結(jié)果 TokenResultStatus 只可能是 Pass 或者 Blocked。
sentinel flow 還有一個(gè)勻速限流 ThrottlingChecker,它的目的是讓請(qǐng)求勻速被執(zhí)行,把一個(gè)時(shí)間窗口【譬如 1s】根據(jù) threshold 再細(xì)分為更細(xì)的微時(shí)間窗口,在每個(gè)微時(shí)間窗口最多執(zhí)行一次請(qǐng)求,其限流結(jié)果 TokenResultStatus 只可能是 Pass 或者 Blocked 或者 Wait,其相關(guān)意義分別為:
- Pass:在微時(shí)間窗口內(nèi)無超限,請(qǐng)求通過;
- Wait:在微時(shí)間窗口內(nèi)超限,被滯后若干時(shí)間窗口執(zhí)行,在這段時(shí)間內(nèi)請(qǐng)求需要等待;
- Blocked:在微時(shí)間窗口內(nèi)超限,且等待時(shí)間超過用戶設(shè)定的最大愿意等待時(shí)間長度【Rule.MaxQueueingTimeMs】,請(qǐng)求被拒絕。
在 Direct + Reject 限流的場景下,這三個(gè)接口其實(shí)并無多大意義,其核心函數(shù) TrafficShapingController.PerformChecking() 的主要流程是:
- 1 ?從 TrafficShapingController.boundStat 中獲取當(dāng)前 Resource 的 metrics 值【curCount】;
- 2 如果 curCount + batchNum(acquireCount) > Rule.Threshold,則 pass,否則就 reject。
在限流場景下, TrafficShapingController 四個(gè)成員的意義如下:
- flowCalculator 計(jì)算限流上限;
- flowChecker 執(zhí)行限流 Check 動(dòng)作;
- rule 存儲(chǔ)限流規(guī)則;
- boundStat 存儲(chǔ)限流的 Check 結(jié)果和時(shí)間窗口參數(shù),作為下次限流 Check 動(dòng)作判定的依據(jù)。
1.4.2 TrafficControllerMap
在執(zhí)行限流判定時(shí),需要根據(jù) Resource 名稱獲取其對(duì)應(yīng)的 TrafficShapingController。
// TrafficControllerMap represents the map storage for TrafficShapingController.type TrafficControllerMap map[string][]*TrafficShapingController// core/flow/rule_manager.gotcMap = make(TrafficControllerMap)package 級(jí)別全局私有變量 tcMap 存儲(chǔ)了所有的 Rule,其 key 為 Resource 名稱,value 則是與 Resource 對(duì)應(yīng)的 TrafficShapingController。
用戶級(jí)別接口函數(shù) core/flow/rule_manager.go:LoadRules() 會(huì)根據(jù)用戶定義的 Rule 構(gòu)造其對(duì)應(yīng)的 TrafficShapingController 存入 tcMap,這個(gè)接口調(diào)用函數(shù) generateStatFor(*Rule) 構(gòu)造 TrafficShapingController.boundStat。
限流場景下,函數(shù) generateStatFor(*Rule) 的核心代碼如下:
func generateStatFor(rule *Rule) (*standaloneStatistic, error) {resNode = stat.GetOrCreateResourceNode(rule.Resource, base.ResTypeCommon)// default case, use the resource's default statisticreadStat := resNode.DefaultMetric()retStat.reuseResourceStat = trueretStat.readOnlyMetric = readStatretStat.writeOnlyMetric = nilreturn &retStat, nil}2 Metrics
Resource 的指標(biāo) Metrics 是進(jìn)行 Rule 判定的基礎(chǔ)。
2.1 原子時(shí)間輪 AtomicBucketWrapArray
Sentinel 庫功能豐富,但無論是限流還是熔斷,其存儲(chǔ)基礎(chǔ)都是滑動(dòng)時(shí)間窗口。其間包含了眾多優(yōu)化:如無鎖定長時(shí)間輪。
滑動(dòng)窗口實(shí)現(xiàn)有很多種,時(shí)間輪算法是其中一種比較簡單的實(shí)現(xiàn),在時(shí)間輪算法之上可以實(shí)現(xiàn)多種限流方法。時(shí)間輪整體框圖如下:
1 BucketWrap
時(shí)間輪的最基本單元是一個(gè)桶【時(shí)間窗口】。
// BucketWrap represent a slot to record metrics// In order to reduce the usage of memory, BucketWrap don't hold length of BucketWrap// The length of BucketWrap could be seen in LeapArray.// The scope of time is [startTime, startTime+bucketLength)// The size of BucketWrap is 24(8+16) bytestype BucketWrap struct {// The start timestamp of this statistic bucket wrapper.BucketStart uint64// The actual data structure to record the metrics (e.g. MetricBucket).Value atomic.Value}補(bǔ)充:這里之所以用指針,是因?yàn)橐?BucketWrap 為基礎(chǔ)的 AtomicBucketWrapArray 會(huì)被多個(gè) sentinel 流控組件使用,每個(gè)組件的流控參數(shù)不一,例如:
- 1 core/circuitbreaker/circuit_breaker.go:slowRtCircuitBreaker 使用的 slowRequestLeapArray 的底層參數(shù) slowRequestCounter
- 2 core/circuitbreaker/circuit_breaker.go:errorRatioCircuitBreaker 使用的 errorCounterLeapArray 的底層參數(shù) errorCounter
1.1 MetricBucket
BucketWrap 可以認(rèn)作是一種 時(shí)間桶模板,具體的桶的實(shí)體是 MetricsBucket,其定義如下:
// MetricBucket represents the entity to record metrics per minimum time unit (i.e. the bucket time span).// Note that all operations of the MetricBucket are required to be thread-safe.type MetricBucket struct {// Value of statisticcounter [base.MetricEventTotal]int64minRt int64}MetricBucket 存儲(chǔ)了五種類型的 metric:
// There are five events to record// pass + block == Totalconst (// sentinel rules check passMetricEventPass MetricEvent = iota// sentinel rules check blockMetricEventBlockMetricEventComplete// Biz error, used for circuit breakerMetricEventError// request execute rt, unit is millisecondMetricEventRt// hack for the number of eventMetricEventTotal)2 AtomicBucketWrapArray
每個(gè)桶只記錄了其起始時(shí)間和 metric 值,至于每個(gè)桶的時(shí)間窗口長度這種公共值則統(tǒng)一記錄在 AtomicBucketWrapArray 內(nèi),AtomicBucketWrapArray 定義如下:
// atomic BucketWrap array to resolve race condition// AtomicBucketWrapArray can not append or delete element after initializingtype AtomicBucketWrapArray struct {// The base address for real data arraybase unsafe.Pointer// The length of slice(array), it can not be modified.length intdata []*BucketWrap}AtomicBucketWrapArray.base 的值是 AtomicBucketWrapArray.data slice 的 data 區(qū)域的首指針。因?yàn)?AtomicBucketWrapArray.data 是一個(gè)固定長度的 slice,所以 AtomicBucketWrapArray.base 直接存儲(chǔ)數(shù)據(jù)內(nèi)存區(qū)域的首地址,以加速訪問速度。
其次,AtomicBucketWrapArray.data 中存儲(chǔ)的是 BucketWrap 的指針,而不是 BucketWrap。
NewAtomicBucketWrapArrayWithTime() 函數(shù)會(huì)預(yù)熱一下,把所有的時(shí)間桶都生成出來。
2.2 時(shí)間輪
1 leapArray
// Give a diagram to illustrate// Suppose current time is 888, bucketLengthInMs is 200ms,// intervalInMs is 1000ms, LeapArray will build the below windows// B0 B1 B2 B3 B4// |_______|_______|_______|_______|_______|// 1000 1200 1400 1600 800 (1000)// ^// time=888type LeapArray struct {bucketLengthInMs uint32sampleCount uint32intervalInMs uint32array *AtomicBucketWrapArray// update lockupdateLock mutex}LeapArray 各個(gè)成員解析:
- bucketLengthInMs 是漏桶長度,以毫秒為單位;
- sampleCount 則是時(shí)間漏桶個(gè)數(shù);
- intervalInMs 是時(shí)間窗口長度,以毫秒為單位。
其注釋中的 ASCII 圖很好地解釋了每個(gè)字段的含義。
LeapArray 核心函數(shù)是 LeapArray.currentBucketOfTime(),其作用是根據(jù)某個(gè)時(shí)間點(diǎn)獲取其做對(duì)應(yīng)的時(shí)間桶 BucketWrap,代碼如下:
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {if now <= 0 {return nil, errors.New("Current time is less than 0.")}idx := la.calculateTimeIdx(now)bucketStart := calculateStartTime(now, la.bucketLengthInMs)for { //spin to get the current BucketWrapold := la.array.get(idx)if old == nil {// because la.array.data had initiated when new la.array// theoretically, here is not reachablenewWrap := &BucketWrap{BucketStart: bucketStart,Value: atomic.Value{},}newWrap.Value.Store(bg.NewEmptyBucket())if la.array.compareAndSet(idx, nil, newWrap) {return newWrap, nil} else {runtime.Gosched()}} else if bucketStart == atomic.LoadUint64(&old.BucketStart) {return old, nil} else if bucketStart > atomic.LoadUint64(&old.BucketStart) {// current time has been next cycle of LeapArray and LeapArray dont't count in last cycle.// reset BucketWrapif la.updateLock.TryLock() {old = bg.ResetBucketTo(old, bucketStart)la.updateLock.Unlock()return old, nil} else {runtime.Gosched()}} else if bucketStart < atomic.LoadUint64(&old.BucketStart) {// TODO: reserve for some special case (e.g. when occupying "future" buckets).return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))}}}其 for-loop 核心邏輯是:
- 1 獲取時(shí)間點(diǎn)對(duì)應(yīng)的時(shí)間桶 old;
- 2 如果 old 為空,則新建一個(gè)時(shí)間桶,以原子操作的方式嘗試存入時(shí)間窗口的時(shí)間輪中,存入失敗則重新嘗試;
- 3 如果 old 就是當(dāng)前時(shí)間點(diǎn)所在的時(shí)間桶,則返回;
- 4 如果 old 的時(shí)間起點(diǎn)小于當(dāng)前時(shí)間,則通過樂觀鎖嘗試 reset 桶的起始時(shí)間等參數(shù)值,加鎖更新成功則返回;
- 5 如果 old 的時(shí)間起點(diǎn)大于當(dāng)前時(shí)間,則系統(tǒng)發(fā)生了時(shí)間扭曲,返回錯(cuò)誤。
2 BucketLeapArray
leapArray 實(shí)現(xiàn)了滑動(dòng)時(shí)間窗口的所有主體,其對(duì)外使用接口則是 BucketLeapArray:
// The implementation of sliding window based on LeapArray (as the sliding window infrastructure)// and MetricBucket (as the data type). The MetricBucket is used to record statistic// metrics per minimum time unit (i.e. the bucket time span).type BucketLeapArray struct {data LeapArraydataType string}從這個(gè) struct 的注釋可見,其時(shí)間窗口 BucketWrap 的實(shí)體是 MetricBucket。
2.3 Metric 數(shù)據(jù)讀寫
SlidingWindowMetric
// SlidingWindowMetric represents the sliding window metric wrapper.// It does not store any data and is the wrapper of BucketLeapArray to adapt to different internal bucket// SlidingWindowMetric is used for SentinelRules and BucketLeapArray is used for monitor// BucketLeapArray is per resource, and SlidingWindowMetric support only read operation.type SlidingWindowMetric struct {bucketLengthInMs uint32sampleCount uint32intervalInMs uint32real *BucketLeapArray}SlidingWindowMetric 是對(duì) BucketLeapArray 的一個(gè)封裝,只提供了只讀接口。
ResourceNode
type BaseStatNode struct {sampleCount uint32intervalMs uint32goroutineNum int32arr *sbase.BucketLeapArraymetric *sbase.SlidingWindowMetric}type ResourceNode struct {BaseStatNoderesourceName stringresourceType base.ResourceType}// core/stat/node_storage.gotype ResourceNodeMap map[string]*ResourceNodevar (inboundNode = NewResourceNode(base.TotalInBoundResourceName, base.ResTypeCommon)resNodeMap = make(ResourceNodeMap)rnsMux = new(sync.RWMutex))BaseStatNode 對(duì)外提供了讀寫接口,其數(shù)據(jù)寫入 BaseStatNode.arr,讀取接口則依賴 BaseStatNode.metric。BaseStatNode.arr 是在 NewBaseStatNode() 中創(chuàng)建的,指針 SlidingWindowMetric.real 也指向它。
ResourceNode 則顧名思義,其代表了某資源和它的 Metrics 存儲(chǔ) ?ResourceNode.BaseStatNode。
全局變量 resNodeMap 存儲(chǔ)了所有資源的 Metrics 指標(biāo)數(shù)據(jù)。
3 限流流程
本節(jié)只分析 Sentinel 庫提供的最基礎(chǔ)的流量整形功能 – 限流,限流算法多種多樣,可以使用其內(nèi)置的算法,用戶自己也可以進(jìn)行擴(kuò)展。
限流過程有三步步驟:
- 1 針對(duì)特定 Resource 構(gòu)造其 EntryContext,存儲(chǔ)其 Metrics、限流開始時(shí)間等,Sentinel 稱之為 StatPrepareSlot;
- 2 依據(jù) Resource 的限流算法判定其是否應(yīng)該進(jìn)行限流,并給出限流判定結(jié)果,Sentinel 稱之為 RuleCheckSlot;
- 補(bǔ)充:這個(gè)限流算法是一系列判斷方法的合集(SlotChain);
- 3 判定之后,除了用戶自身根據(jù)判定結(jié)果執(zhí)行相應(yīng)的 action,Sentinel 也需要根據(jù)判定結(jié)果執(zhí)行自身的 Action,以及把整個(gè)判定流程所使用的的時(shí)間 RT 等指標(biāo)存儲(chǔ)下來,Sentinel 稱之為 StatSlot。
整體流程如下圖所示:
3.1 Slot
針對(duì) Check 三個(gè)步驟,有三個(gè)對(duì)應(yīng)的 Slot 分別定義如下:
// StatPrepareSlot is responsible for some preparation before statistic// For example: init structure and so ontype StatPrepareSlot interface {// Prepare function do some initialization// Such as: init statistic structure、node and etc// The result of preparing would store in EntryContext// All StatPrepareSlots execute in sequence// Prepare function should not throw panic.Prepare(ctx *EntryContext)}// RuleCheckSlot is rule based checking strategy// All checking rule must implement this interface.type RuleCheckSlot interface {// Check function do some validation// It can break off the slot pipeline// Each TokenResult will return check result// The upper logic will control pipeline according to SlotResult.Check(ctx *EntryContext) *TokenResult}// StatSlot is responsible for counting all custom biz metrics.// StatSlot would not handle any panic, and pass up all panic to slot chaintype StatSlot interface {// OnEntryPass function will be invoked when StatPrepareSlots and RuleCheckSlots execute pass// StatSlots will do some statistic logic, such as QPS、log、etcOnEntryPassed(ctx *EntryContext)// OnEntryBlocked function will be invoked when StatPrepareSlots and RuleCheckSlots fail to execute// It may be inbound flow control or outbound cir// StatSlots will do some statistic logic, such as QPS、log、etc// blockError introduce the block detailOnEntryBlocked(ctx *EntryContext, blockError *BlockError)// OnCompleted function will be invoked when chain exits.// The semantics of OnCompleted is the entry passed and completed// Note: blocked entry will not call this functionOnCompleted(ctx *EntryContext)}拋卻 Prepare 和 Stat,可以簡單的認(rèn)為:所謂的 slot,就是 sentinel 提供的某個(gè)流控組件。
值得注意的是,根據(jù)注釋 StatSlot.OnCompleted 只有在 RuleCheckSlot.Check 通過才會(huì)執(zhí)行,用于計(jì)算從請(qǐng)求開始到結(jié)束所使用的 RT 等 Metrics。
3.2 Prepare
// core/base/slot_chain.go// StatPrepareSlot is responsible for some preparation before statistic// For example: init structure and so ontype StatPrepareSlot interface {// Prepare function do some initialization// Such as: init statistic structure、node and etc// The result of preparing would store in EntryContext// All StatPrepareSlots execute in sequence// Prepare function should not throw panic.Prepare(ctx *EntryContext)}// core/stat/stat_prepare_slot.gotype ResourceNodePrepareSlot struct {}func (s *ResourceNodePrepareSlot) Prepare(ctx *base.EntryContext) {node := GetOrCreateResourceNode(ctx.Resource.Name(), ctx.Resource.Classification())// Set the resource node to the context.ctx.StatNode = node}如前面解釋,Prepare 主要是構(gòu)造存儲(chǔ) Resource Metrics 所使用的 ResourceNode。所有 Resource 的 StatNode 都會(huì)存儲(chǔ)在 package 級(jí)別的全局變量 core/stat/node_storage.go:resNodeMap [type: map[string]*ResourceNode] 中,函數(shù) GetOrCreateResourceNode 用于根據(jù) Resource Name 從 resNodeMap 中獲取其對(duì)應(yīng)的 StatNode,如果不存在則創(chuàng)建一個(gè) StatNode 并存入 resNodeMap。
3.3 Check
RuleCheckSlot.Check() 執(zhí)行流程:
- 1 根據(jù) Resource 名稱獲取其所有的 Rule 集合;
- 2 遍歷 Rule 集合,對(duì) Resource 依次執(zhí)行 Check,任何一個(gè) Rule 判定 Resource 需要進(jìn)行限流【Blocked】則返回,否則放行。
3.4 Exit
sentinel 對(duì) Resource 進(jìn)行 Check 后,其后續(xù)邏輯執(zhí)行順序是:
- 1 如果 RuleCheckSlot.Check() 判定 pass 通過則執(zhí)行 StatSlot.OnEntryPassed(),否則 RuleCheckSlot.Check() 判定 reject 則執(zhí)行 StatSlot.OnEntryBlocked();
- 2 如果 RuleCheckSlot.Check() 判定 pass 通過,則執(zhí)行本次 Action;
- 3 如果 RuleCheckSlot.Check() 判定 pass 通過,則執(zhí)行 SentinelEntry.Exit() --> SlotChain.ext() --> StatSlot.OnCompleted() 。
第三步驟的調(diào)用鏈路如下:
StatSlot.OnCompleted()
// core/flow/standalone_stat_slot.gotype StandaloneStatSlot struct {}func (s StandaloneStatSlot) OnEntryPassed(ctx *base.EntryContext) {res := ctx.Resource.Name()for _, tc := range getTrafficControllerListFor(res) {if !tc.boundStat.reuseResourceStat {if tc.boundStat.writeOnlyMetric != nil {tc.boundStat.writeOnlyMetric.AddCount(base.MetricEventPass, int64(ctx.Input.AcquireCount))}}}}func (s StandaloneStatSlot) OnEntryBlocked(ctx *base.EntryContext, blockError *base.BlockError) {// Do nothing}func (s StandaloneStatSlot) OnCompleted(ctx *base.EntryContext) {// Do nothing}SlotChain.exit()
// core/base/slot_chain.gotype SlotChain struct {}func (sc *SlotChain) exit(ctx *EntryContext) {// The OnCompleted is called only when entry passedif ctx.IsBlocked() {return}for _, s := range sc.stats {s.OnCompleted(ctx)}}SentinelEntry.Exit()
// core/base/entry.gotype SentinelEntry struct {sc *SlotChainexitCtl sync.Once}func (e *SentinelEntry) Exit() {e.exitCtl.Do(func() {if e.sc != nil {e.sc.exit(ctx)}})}從上面執(zhí)行可見,StatSlot.OnCompleted() 是在 Action 【如一次 RPC 的請(qǐng)求-響應(yīng) Invokation】完成之后調(diào)用的。如果有的組件需要計(jì)算一次 Action 的時(shí)間耗費(fèi) ?RT,就在其對(duì)應(yīng)的 StatSlot.OnCompleted() 中依據(jù) EntryContext.startTime 完成時(shí)間耗費(fèi)計(jì)算。
3.5 SlotChain
Sentinel 本質(zhì)是一個(gè)流控包,不僅提供了限流功能,還提供了眾多其他諸如自適應(yīng)流量保護(hù)、熔斷降級(jí)、冷啟動(dòng)、全局流量 Metrics 結(jié)果等功能流控組件,Sentinel-Go 包定義了一個(gè) SlotChain 實(shí)體存儲(chǔ)其所有的流控組件。
// core/base/slot_chain.go// SlotChain hold all system slots and customized slot.// SlotChain support plug-in slots developed by developer.type SlotChain struct {statPres []StatPrepareSlotruleChecks []RuleCheckSlotstats []StatSlot}// The entrance of slot chain// Return the TokenResult and nil if internal panic.func (sc *SlotChain) Entry(ctx *EntryContext) *TokenResult {// execute prepare slotsps := sc.statPresif len(sps) > 0 {for _, s := range sps {s.Prepare(ctx)}}// execute rule based checking slotrcs := sc.ruleChecksvar ruleCheckRet *TokenResultif len(rcs) > 0 {for _, s := range rcs {sr := s.Check(ctx)if sr == nil {// nil equals to check passcontinue}// check slot resultif sr.IsBlocked() {ruleCheckRet = srbreak}}}if ruleCheckRet == nil {ctx.RuleCheckResult.ResetToPass()} else {ctx.RuleCheckResult = ruleCheckRet}// execute statistic slotss := sc.statsruleCheckRet = ctx.RuleCheckResultif len(ss) > 0 {for _, s := range ss {// indicate the result of rule based checking slot.if !ruleCheckRet.IsBlocked() {s.OnEntryPassed(ctx)} else {// The block error should not be nil.s.OnEntryBlocked(ctx, ruleCheckRet.blockErr)}}}return ruleCheckRet}func (sc *SlotChain) exit(ctx *EntryContext) {if ctx == nil || ctx.Entry() == nil {logging.Error(errors.New("nil EntryContext or SentinelEntry"), "")return}// The OnCompleted is called only when entry passedif ctx.IsBlocked() {return}for _, s := range sc.stats {s.OnCompleted(ctx)}// relieve the context here}建議:Sentinel 包針對(duì)某個(gè) Resource 無法確知其使用了那個(gè)組件,在運(yùn)行時(shí)會(huì)針對(duì)某個(gè) Resource 的 EntryContext 依次執(zhí)行所有的組件的 Rule。Sentinel-golang 為何不給用戶相關(guān)用戶提供一個(gè)接口讓其設(shè)置使用的流控組件集合,以減少下面函數(shù) SlotChain.Entry() 中執(zhí)行 RuleCheckSlot.Check() 執(zhí)行次數(shù)?相關(guān)改進(jìn)已經(jīng)提交到 pr 264【補(bǔ)充,代碼已合并,據(jù)負(fù)責(zé)人壓測后回復(fù) sentinel-go 效率整體提升 15%】。
globalSlotChain
Sentinel-Go 定義了一個(gè) SlotChain 的 package 級(jí)別的全局私有變量 globalSlotChain 用于存儲(chǔ)其所有的流控組件對(duì)象。相關(guān)代碼示例如下。因本文只關(guān)注限流組件,所以下面只給出了限流組件的注冊(cè)代碼。
// api/slot_chain.gofunc BuildDefaultSlotChain() *base.SlotChain {sc := base.NewSlotChain()sc.AddStatPrepareSlotLast(&stat.ResourceNodePrepareSlot{})sc.AddRuleCheckSlotLast(&flow.Slot{})sc.AddStatSlotLast(&flow.StandaloneStatSlot{})return sc}var globalSlotChain = BuildDefaultSlotChain()Entry
在 Sentinel-Go 對(duì)外的最重要的入口函數(shù) api/api.go:Entry() 中,globalSlotChain 會(huì)作為 EntryOptions 的 SlotChain 參數(shù)被使用。
// api/api.go// Entry is the basic API of Sentinel.func Entry(resource string, opts ...EntryOption) (*base.SentinelEntry, *base.BlockError) {options := entryOptsPool.Get().(*EntryOptions)options.slotChain = globalSlotChainreturn entry(resource, options)}Sentinel 的演進(jìn)離不開社區(qū)的貢獻(xiàn)。Sentinel Go 1.0 GA 版本即將在近期發(fā)布,帶來更多云原生相關(guān)的特性。我們非常歡迎感興趣的開發(fā)者參與貢獻(xiàn),一起來主導(dǎo)未來版本的演進(jìn)。我們鼓勵(lì)任何形式的貢獻(xiàn),包括但不限于:
? bug fix
? new features/improvements
? dashboard
? document/website
? test cases
開發(fā)者可以在 GitHub 上面的 good first issue 列表上挑選感興趣的 issue 來參與討論和貢獻(xiàn)。我們會(huì)重點(diǎn)關(guān)注積極參與貢獻(xiàn)的開發(fā)者,核心貢獻(xiàn)者會(huì)提名為 Committer,一起主導(dǎo)社區(qū)的發(fā)展。我們也歡迎大家有任何問題和建議,都可以通過 GitHub issue、Gitter 或釘釘群(群號(hào):30150716)等渠道進(jìn)行交流。Now start hacking!
? Sentinel Go repo: https://github.com/alibaba/sentinel-golang
? 企業(yè)用戶歡迎進(jìn)行登記:https://github.com/alibaba/Sentinel/issues/18
作者簡介
于雨(github @AlexStocks),apache/dubbo-go 項(xiàng)目負(fù)責(zé)人,一個(gè)有十多年服務(wù)端基礎(chǔ)架構(gòu)研發(fā)一線工作經(jīng)驗(yàn)的程序員,目前在螞蟻金服可信原生部從事容器編排和 service mesh 工作。熱愛開源,從 2015 年給 Redis 貢獻(xiàn)代碼開始,陸續(xù)改進(jìn)過 Muduo/Pika/Dubbo/Dubbo-go 等知名項(xiàng)目。
“阿里巴巴云原生關(guān)注微服務(wù)、Serverless、容器、Service Mesh 等技術(shù)領(lǐng)域、聚焦云原生流行技術(shù)趨勢、云原生大規(guī)模的落地實(shí)踐,做最懂云原生開發(fā)者的公眾號(hào)。”
總結(jié)
以上是生活随笔為你收集整理的阿里 双11 同款,流量防卫兵 Sentinel go 源码解读的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SAE 的极致应用部署效率
- 下一篇: 率先通过信通院容器规模化测评 阿里云获最