javascript
深度思考 Spring Cloud + Alibaba Sentinel 源码原理
隨著微服務(wù)的流行,服務(wù)和服務(wù)之間的穩(wěn)定性變得越來越重要。Sentinel 以流量為切入點(diǎn),從流量控制、熔斷降級、系統(tǒng)負(fù)載保護(hù)等多個維度保護(hù)服務(wù)的穩(wěn)定性。
作者 | 向寒 / 孫玄
來源 | 架構(gòu)之美
頭圖?|?下載于視覺中國
關(guān)于?Sentinel?
1、理論篇
以下是經(jīng)過多年分布式經(jīng)驗(yàn)總結(jié)的兩個理論基礎(chǔ):
(1)微服務(wù)與治理的關(guān)系
(2)爬坡理論
我們今天的主題分為以下兩個主要部分:
Sentinel設(shè)計原理
Sentinel運(yùn)行流程源碼剖析
Sentinel?設(shè)計原理
1、特性
豐富的應(yīng)用場景:阿里 10 年雙十一積累場景,含秒殺、雙十一零點(diǎn)持續(xù)洪峰、熱點(diǎn)商品探測、預(yù)熱、消息隊(duì)列削峰填谷、集群流量控制、實(shí)時熔斷下游不可用應(yīng)用等多樣化的場景。
廣泛的開源生態(tài):提供開箱即用的與其它開源框架/庫的整合模塊,如Dubbo、Spring Cloud、gRPC、Zuul、Reactor 等。
完善的 SPI 擴(kuò)展點(diǎn):提供簡單易用、完善的 SPI 擴(kuò)展接口;可通過實(shí)現(xiàn)擴(kuò)展接口來快速地定制邏輯。
完備的實(shí)時監(jiān)控:提供實(shí)時的監(jiān)控功能,可看到接入應(yīng)用的單臺機(jī)器秒級數(shù)據(jù),及500 臺以下規(guī)模的集群匯總運(yùn)行情況。
2、核心關(guān)鍵點(diǎn)
(1)資源:限流的對象
如下代碼/user/select即為一個資源:
1@GetMapping("/user/select")23@SentinelResource(value?=?"select",?blockHandler?=?"exceptionHandler")45public?TUser?select(@RequestParam?Integer?userId)?{67????log.info("post?/user/select?userid="?+?userId);89????return?userService.select(userId); 10 11}即被SentinelResource注解修飾的API:
1@Target({ElementType.METHOD,?ElementType.TYPE})23@Retention(RetentionPolicy.RUNTIME)45@Inherited67public?@interface?SentinelResource?{89????String?value()?default?""; 10 11 12 13????EntryType?entryType()?default?EntryType.OUT; 14 15 16 17????int?resourceType()?default?0; 18 19 20 21????String?blockHandler()?default?""; 22 23 24 25????Class<?>[]?blockHandlerClass()?default?{}; 26 27 28 29????String?fallback()?default?""; 30 31...... 32 33}(2)入口:sentinel為每個資源創(chuàng)建一個Entry。
(3)槽鏈:每個Entry都會有一條用于記錄限流以及各種控制的信息Slot chain,以此來實(shí)現(xiàn)下圖中綠色部分的功能。
Sentinel?運(yùn)行流程源碼剖析
此圖為官網(wǎng)全局流程圖,接下來我們通過源碼,分解該過程:
1、入口處
1SphU.entry("methodA",?EntryType.IN);//入口 2 3}核心代碼
2、入口邏輯
1private?Entry?entryWithPriority(ResourceWrapper?resourceWrapper,?int?count,?boolean?prioritized,?Object...?args)23????throws?BlockException?{45????//?從threadLocal中獲取當(dāng)前線程對應(yīng)的context實(shí)例。67????Context?context?=?ContextUtil.getContext();89????if?(context?instanceof?NullContext)?{ 10 11????????//?The?{@link?NullContext}?indicates?that?the?amount?of?context?has?exceeded?the?threshold, 12 13????????//?so?here?init?the?entry?only.?No?rule?checking?will?be?done. 14 15????????//?如果context是nullContext的實(shí)例,表示當(dāng)前context的總數(shù)已經(jīng)達(dá)到閾值,所以這里直接創(chuàng)建entry實(shí)例,并返回,不進(jìn)行規(guī)則的檢查。 16 17????????return?new?CtEntry(resourceWrapper,?null,?context); 18 19????} 20 21 22 23????if?(context?==?null)?{ 24 25????????//?Using?default?context. 26 27????????//如果context為空,則使用默認(rèn)的名字創(chuàng)建一個,就是外部在調(diào)用SphU.entry(..)方法前如果沒有調(diào)用ContextUtil.enter(..),則這里會調(diào)用該方法進(jìn)行內(nèi)部初始化context 28 29????????context?=?InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); 30 31????} 32 33 34 35????//?Global?switch?is?close,?no?rule?checking?will?do. 36 37????//?總開關(guān) 38 39????if?(!Constants.ON)?{ 40 41????????return?new?CtEntry(resourceWrapper,?null,?context); 42 43????} 44 45 46 47????//?構(gòu)造鏈路(核心實(shí)現(xiàn))?go?in 48 49????ProcessorSlot<Object>?chain?=?lookProcessChain(resourceWrapper); 50 51 52 53????/* 54 55?????*?Means?amount?of?resources?(slot?chain)?exceeds?{@link?Constants.MAX_SLOT_CHAIN_SIZE}, 56 57?????*?so?no?rule?checking?will?be?done. 58 59?????*?當(dāng)鏈的大小達(dá)到閾值Constants.MAX_SLOT_CHAIN_SIZE時,不會校驗(yàn)任何規(guī)則,直接返回。 60 61?????*/ 62 63????if?(chain?==?null)?{ 64 65????????return?new?CtEntry(resourceWrapper,?null,?context); 66 67????} 68 69 70 71????Entry?e?=?new?CtEntry(resourceWrapper,?chain,?context); 72 73????try?{ 74 75????????//?開始進(jìn)行鏈路調(diào)用。 76 77????????chain.entry(context,?resourceWrapper,?null,?count,?prioritized,?args); 78 79????}?catch?(BlockException?e1)?{ 80 81????????e.exit(count,?args); 82 83????????throw?e1; 84 85????}?catch?(Throwable?e1)?{ 86 87????????//?This?should?not?happen,?unless?there?are?errors?existing?in?Sentinel?internal. 88 89????????RecordLog.info("Sentinel?unexpected?exception",?e1); 90 91????} 92 93????return?e; 94 95}3、上下文信息
Context
Context是當(dāng)前線程所持有的Sentinel上下文。
進(jìn)入Sentinel的邏輯時,會首先獲取當(dāng)前線程的Context,如果沒有則新建。當(dāng)任務(wù)執(zhí)行完畢后,會清除當(dāng)前線程的context。Context 代表調(diào)用鏈路上下文,貫穿一次調(diào)用鏈路中的所有 Entry。
Context 維持著入口節(jié)點(diǎn)(entranceNode)、本次調(diào)用鏈路的 當(dāng)前節(jié)點(diǎn)(curNode)、調(diào)用來源(origin)等信息。Context 名稱即為調(diào)用鏈路入口名稱。
Node
Node是對一個@SentinelResource標(biāo)記的資源的統(tǒng)計包裝。
Context中記錄本當(dāng)前線程資源調(diào)用的入口節(jié)點(diǎn)。
我們可以通過入口節(jié)點(diǎn)的childList,可以追溯資源的調(diào)用情況。而每個節(jié)點(diǎn)都對應(yīng)一個@SentinelResource標(biāo)記的資源及其統(tǒng)計數(shù)據(jù),例如:passQps,blockQps,rt等數(shù)據(jù)。
Entry
Entry是Sentinel中用來表示是否通過限流的一個憑證,如果能正常返回,則說明你可以訪問被Sentinel保護(hù)的后方服務(wù),否則Sentinel會拋出一個BlockException。
另外,它保存了本次執(zhí)行entry()方法的一些基本信息,包括資源的Context、Node、對應(yīng)的責(zé)任鏈等信息,后續(xù)完成資源調(diào)用后,還需要更具獲得的這個Entry去執(zhí)行一些善后操作,包括退出Entry對應(yīng)的責(zé)任鏈,完成節(jié)點(diǎn)的一些統(tǒng)計信息更新,清除當(dāng)前線程的Context信息等。
在構(gòu)建Context時已經(jīng)完成下圖部分:
4、核心流程
這里有兩個需要注意的點(diǎn):
ProcessorSlot chain = lookProcessChain(resourceWrapper); 構(gòu)建鏈路。
chain.entry(context, resourceWrapper, null, count, prioritized, args); 進(jìn)行鏈路調(diào)用首先來看鏈路是如何構(gòu)建的。
5、獲取槽鏈
已有直接獲取;
沒有去創(chuàng)建。
6、創(chuàng)建槽鏈
SlotChainProvider.newSlotChain();
1???//?基于spi擴(kuò)展點(diǎn)機(jī)制來擴(kuò)展,默認(rèn)為DefaultSlotChainBuilder 2 3slotChainBuilder?=?SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class,?DefaultSlotChainBuilder.class);7、SPI加載ProcessorSlot
這里采用了spi的機(jī)制來擴(kuò)展SlotChainBuilder,默認(rèn)是采用DefaultSlotChainBuilder來實(shí)現(xiàn)的,可以看到sentinel源碼的sentinel-core包下,META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder文件下,默認(rèn)屬性是:
所以默認(rèn)采用DefaultSlotChainBuilder來構(gòu)建鏈路,因此找到DefaultSlotChainBuilder.build()方法。
8、DefaultSlotChainBuilder
1public?ProcessorSlotChain?build()?{23????????//?定義鏈路起點(diǎn)45????????ProcessorSlotChain?chain?=?new?DefaultProcessorSlotChain();6789????????//?Note:?the?instances?of?ProcessorSlot?should?be?different,?since?they?are?not?stateless. 10 11????????//?基于spi擴(kuò)展機(jī)制,加載ProcessorSlot的實(shí)現(xiàn)類,從META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot文件下獲取,并且按指定順序排序 12 13????????List<ProcessorSlot>?sortedSlotList?=?SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class); 14 15????????//?遍歷構(gòu)建鏈路 16 17????????for?(ProcessorSlot?slot?:?sortedSlotList)?{ 18 19????????????if?(!(slot?instanceof?AbstractLinkedProcessorSlot))?{ 20 21????????????????RecordLog.warn("The?ProcessorSlot("?+?slot.getClass().getCanonicalName()?+?")?is?not?an?instance?of?AbstractLinkedProcessorSlot,?can't?be?added?into?ProcessorSlotChain"); 22 23????????????????continue; 24 25????????????} 26 27????????????//?將slot節(jié)點(diǎn)加入鏈,因?yàn)橐呀?jīng)排好序了,只需要加到最后即可 28 29????????????chain.addLast((AbstractLinkedProcessorSlot<?>)?slot); 30 31????????} 32 33 34 35????????return?chain; 36 37????}9、遍歷ProcessorSlots
這里也是通過spi的機(jī)制,讀取文件META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot:
1#?Sentinel?default?ProcessorSlots23com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot45com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot67com.alibaba.csp.sentinel.slots.logger.LogSlot89com.alibaba.csp.sentinel.slots.statistic.StatisticSlot 10 11com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot 12 13com.alibaba.csp.sentinel.slots.system.SystemSlot 14 15com.alibaba.csp.sentinel.slots.block.flow.FlowSlot 16 17com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot從這里看出,鏈路由這些節(jié)點(diǎn)組成,而slot之間的順序是根據(jù)每個slot節(jié)點(diǎn)的@SpiOrder注解的值來確定的。
NodeSelectorSlot -> ClusterBuilderSlot -> LogSlot -> StatisticSlot -> AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot
鏈路調(diào)用?
chain.entry(…)
上面已經(jīng)構(gòu)建好了鏈路,下面就要開始進(jìn)行鏈路的調(diào)用了。
回到CtSph#entryWithPriority
1、NodeSelectorSlot
NodeSelectorSlot(@SpiOrder(-10000))
直接進(jìn)入NodeSelectorSlot類的entry方法。
根據(jù)官方文檔,NodeSelectorSlot類的作用為:
負(fù)責(zé)收集資源的路徑,并將這些資源的調(diào)用路徑,以樹狀結(jié)構(gòu)存儲起來,用于根據(jù)調(diào)用路徑來限流降級。
1@Override23public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?Object?obj,?int?count,?boolean?prioritized,?Object...?args)45????throws?Throwable?{6789????//?雙重檢查鎖+緩存?機(jī)制 10 11????DefaultNode?node?=?map.get(context.getName()); 12 13????if?(node?==?null)?{ 14 15????????synchronized?(this)?{ 16 17????????????node?=?map.get(context.getName()); 18 19????????????if?(node?==?null)?{ 20 21????????????????node?=?new?DefaultNode(resourceWrapper,?null); 22 23????????????????HashMap<String,?DefaultNode>?cacheMap?=?new?HashMap<String,?DefaultNode>(map.size()); 24 25????????????????cacheMap.putAll(map); 26 27????????????????cacheMap.put(context.getName(),?node); 28 29????????????????map?=?cacheMap; 30 31????????????????//?Build?invocation?tree 32 33????????????????//?構(gòu)建調(diào)用鏈的樹形結(jié)構(gòu) 34 35????????????????((DefaultNode)?context.getLastNode()).addChild(node); 36 37????????????} 38 39 40 41????????} 42 43????} 44 45 46 47????context.setCurNode(node); 48 49????//?進(jìn)入下一個鏈 50 51????fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args); 52 53}2、ClusterBuilderSlot
ClusterBuilderSlot(@SpiOrder(-9000))
根據(jù)官方文檔,ClusterBuilderSlot的作用為:
此插槽用于構(gòu)建資源的 ClusterNode 以及調(diào)用來源節(jié)點(diǎn)。ClusterNode 保持某個資源運(yùn)行統(tǒng)計信息(響應(yīng)時間、QPS、block 數(shù)目、線程數(shù)、異常數(shù)等)以及調(diào)用來源統(tǒng)計信息列表。調(diào)用來源的名稱由 ContextUtil.enter(contextName,origin) 中的 origin 標(biāo)記。
3、LogSlot
LogSlot(@SpiOrder(-8000))
該類對鏈路的傳遞不做處理,只有在拋出BlockException的時候,向上層層傳遞的過程中,會通過該類來輸入一些日志信息:
1@Override23public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?obj,?int?count,?boolean?prioritized,?Object...?args)45????throws?Throwable?{67????try?{89????????fireEntry(context,?resourceWrapper,?obj,?count,?prioritized,?args); 10 11????}?catch?(BlockException?e)?{ 12 13????????//?當(dāng)拋出BlockException異常時,這里會輸入日志信息 14 15????????EagleEyeLogUtil.log(resourceWrapper.getName(),?e.getClass().getSimpleName(),?e.getRuleLimitApp(), 16 17????????????context.getOrigin(),?count); 18 19????????throw?e; 20 21????}?catch?(Throwable?e)?{ 22 23????????RecordLog.warn("Unexpected?entry?exception",?e); 24 25????} 26 27}4、StatisticSlot
StatisticSlot(@SpiOrder(-7000))
官方文檔:
StatisticSlot用于記錄、統(tǒng)計不同緯度的 runtime 指標(biāo)監(jiān)控信息。
1@Override23public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?node,?int?count,45??????????????????boolean?prioritized,?Object...?args)?throws?Throwable?{67????try?{89????????//?Do?some?checking.1011????????//?先將調(diào)用鏈繼續(xù)下去,等到后續(xù)鏈調(diào)用結(jié)束了,再執(zhí)行下面的步驟1213????????fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args);14151617????????//?Request?passed,?add?thread?count?and?pass?count.1819????????node.increaseThreadNum();2021????????node.addPassRequest(count);22232425????????if?(context.getCurEntry().getOriginNode()?!=?null)?{2627????????????//?Add?count?for?origin?node.2829????????????context.getCurEntry().getOriginNode().increaseThreadNum();3031????????????context.getCurEntry().getOriginNode().addPassRequest(count);3233????????}34353637????????if?(resourceWrapper.getEntryType()?==?EntryType.IN)?{3839????????????//?Add?count?for?global?inbound?entry?node?for?global?statistics.4041????????????Constants.ENTRY_NODE.increaseThreadNum();4243????????????Constants.ENTRY_NODE.addPassRequest(count);4445????????}46474849????????//?Handle?pass?event?with?registered?entry?callback?handlers.5051????????for?(ProcessorSlotEntryCallback<DefaultNode>?handler?:?StatisticSlotCallbackRegistry.getEntryCallbacks())?{5253????????????handler.onPass(context,?resourceWrapper,?node,?count,?args);5455????????}5657????}?catch?(PriorityWaitException?ex)?{5859????????node.increaseThreadNum();6061????????if?(context.getCurEntry().getOriginNode()?!=?null)?{6263????????????//?Add?count?for?origin?node.6465????????????context.getCurEntry().getOriginNode().increaseThreadNum();6667????????}68697071????????if?(resourceWrapper.getEntryType()?==?EntryType.IN)?{7273????????????//?Add?count?for?global?inbound?entry?node?for?global?statistics.7475????????????Constants.ENTRY_NODE.increaseThreadNum();7677????????}7879????????//?Handle?pass?event?with?registered?entry?callback?handlers.8081????????for?(ProcessorSlotEntryCallback<DefaultNode>?handler?:?StatisticSlotCallbackRegistry.getEntryCallbacks())?{8283????????????handler.onPass(context,?resourceWrapper,?node,?count,?args);8485????????}8687????}?catch?(BlockException?e)?{8889????????//?Blocked,?set?block?exception?to?current?entry.9091????????context.getCurEntry().setBlockError(e);92939495????????//?Add?block?count.9697????????node.increaseBlockQps(count);9899????????if?(context.getCurEntry().getOriginNode()?!=?null)?{ 100 101????????????context.getCurEntry().getOriginNode().increaseBlockQps(count); 102 103????????} 104 105 106 107????????if?(resourceWrapper.getEntryType()?==?EntryType.IN)?{ 108 109????????????//?Add?count?for?global?inbound?entry?node?for?global?statistics. 110 111????????????Constants.ENTRY_NODE.increaseBlockQps(count); 112 113????????} 114 115 116 117????????//?Handle?block?event?with?registered?entry?callback?handlers. 118 119????????for?(ProcessorSlotEntryCallback<DefaultNode>?handler?:?StatisticSlotCallbackRegistry.getEntryCallbacks())?{ 120 121????????????handler.onBlocked(e,?context,?resourceWrapper,?node,?count,?args); 122 123????????} 124 125 126 127????????throw?e; 128 129????}?catch?(Throwable?e)?{ 130 131????????//?Unexpected?internal?error,?set?error?to?current?entry. 132 133????????context.getCurEntry().setError(e); 134 135 136 137????????throw?e; 138 139????} 140 141}StatisticSlot 會先將鏈往下執(zhí)行,等到后面的節(jié)點(diǎn)全部執(zhí)行完畢,再進(jìn)行數(shù)據(jù)統(tǒng)計。
5、AuthoritySlot
@SpiOrder(-6000)
AuthoritySlot
官方文檔:
AuthoritySlot:根據(jù)配置的黑白名單和調(diào)用來源信息,來做黑白名單控制
1@Override23public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?node,?int?count,?boolean?prioritized,?Object...?args)45????throws?Throwable?{67????//?黑白名單權(quán)限控制89????checkBlackWhiteAuthority(resourceWrapper,?context); 10 11????fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args); 12 13} 14 15 16 17void?checkBlackWhiteAuthority(ResourceWrapper?resource,?Context?context)?throws?AuthorityException?{ 18 19????Map<String,?Set<AuthorityRule>>?authorityRules?=?AuthorityRuleManager.getAuthorityRules(); 20 21 22 23????if?(authorityRules?==?null)?{ 24 25????????return; 26 27????} 28 29 30 31????Set<AuthorityRule>?rules?=?authorityRules.get(resource.getName()); 32 33????if?(rules?==?null)?{ 34 35????????return; 36 37????} 38 39 40 41????for?(AuthorityRule?rule?:?rules)?{ 42 43????????if?(!AuthorityRuleChecker.passCheck(rule,?context))?{ 44 45????????????throw?new?AuthorityException(context.getOrigin(),?rule); 46 47????????} 48 49????} 50 51}6、SystemSlot
@SpiOrder(-5000)
SystemSlot
官方文檔:
SystemSlot:這個 slot 會根據(jù)對于當(dāng)前系統(tǒng)的整體情況,對入口資源的調(diào)用進(jìn)行動態(tài)調(diào)配。其原理是讓入口的流量和當(dāng)前系統(tǒng)的預(yù)計容量達(dá)到一個動態(tài)平衡。
1@Override 2public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?node,?int?count, 3?????????????????boolean?prioritized,?Object...?args)?throws?Throwable?{ 4???//?系統(tǒng)規(guī)則校驗(yàn) 5???SystemRuleManager.checkSystem(resourceWrapper); 6???fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args); 7}7、FlowSlot 限流規(guī)則引擎
@SpiOrder(-2000)
FlowSlot
官方文檔:
這個 slot 主要根據(jù)預(yù)設(shè)的資源的統(tǒng)計信息,按照固定的次序,依次生效。如果一個資源對應(yīng)兩條或者多條流控規(guī)則,則會根據(jù)如下次序依次檢驗(yàn),直到全部通過或者有一個規(guī)則生效為止:
指定應(yīng)用生效的規(guī)則,即針對調(diào)用方限流的;
調(diào)用方為 other 的規(guī)則;
調(diào)用方為 default 的規(guī)則。
入口
1@Override2public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?node,?int?count,3?????????????????boolean?prioritized,?Object...?args)?throws?Throwable?{4???//?檢查限流規(guī)則5???checkFlow(resourceWrapper,?context,?node,?count,?prioritized);67???fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args);8}9 10void?checkFlow(ResourceWrapper?resource,?Context?context,?DefaultNode?node,?int?count,?boolean?prioritized) 11???throws?BlockException?{ 12???checker.checkFlow(ruleProvider,?resource,?context,?node,?count,?prioritized); 13}1、所有規(guī)則檢查
調(diào)用了FlowRuleChecker.checkFlow(…)方法。
1public?void?checkFlow(Function<String,?Collection<FlowRule>>?ruleProvider,?ResourceWrapper?resource,2?????????????????????Context?context,?DefaultNode?node,?int?count,?boolean?prioritized)?throws?BlockException?{3???if?(ruleProvider?==?null?||?resource?==?null)?{4???????return;5??}6???//?根據(jù)資源名稱找到對應(yīng)的7???Collection<FlowRule>?rules?=?ruleProvider.apply(resource.getName());8???if?(rules?!=?null)?{9???????//?遍歷規(guī)則,依次判斷是否通過 10???????for?(FlowRule?rule?:?rules)?{ 11???????????if?(!canPassCheck(rule,?context,?node,?count,?prioritized))?{ 12???????????????throw?new?FlowException(rule.getLimitApp(),?rule); 13??????????} 14??????} 15??} 16}2、單個規(guī)則檢查
1public?boolean?canPassCheck(/*@NonNull*/?FlowRule?rule,?Context?context,?DefaultNode?node,?int?acquireCount,2???????????????????????????????????????????????boolean?prioritized)?{3???String?limitApp?=?rule.getLimitApp();4???if?(limitApp?==?null)?{5???????return?true;6??}7???//?集群限流的判斷8???if?(rule.isClusterMode())?{9???????return?passClusterCheck(rule,?context,?node,?acquireCount,?prioritized); 10??} 11???//?本地節(jié)點(diǎn)的判斷 12???return?passLocalCheck(rule,?context,?node,?acquireCount,?prioritized); 13}3、非集群模式的限流判斷
1private?static?boolean?passLocalCheck(FlowRule?rule,?Context?context,?DefaultNode?node,?int?acquireCount,2?????????????????????????????????????boolean?prioritized)?{3???//?根據(jù)請求的信息及策略,選擇不同的node節(jié)點(diǎn)4???Node?selectedNode?=?selectNodeByRequesterAndStrategy(rule,?context,?node);5???if?(selectedNode?==?null)?{6???????return?true;7??}8???//?根據(jù)當(dāng)前規(guī)則,獲取規(guī)則控制器,調(diào)用canPass方法進(jìn)行判斷9//?????? rule.getRater()放回的是TrafficShapingController接口的實(shí)現(xiàn)類,使用了策略模式,根據(jù)使用的控制措施來選擇使用哪種實(shí)現(xiàn)。 10???return?rule.getRater().canPass(selectedNode,?acquireCount,?prioritized); 11}這里是先根據(jù)請求和當(dāng)前規(guī)則的策略,找到該規(guī)則下存儲統(tǒng)計信息的節(jié)點(diǎn),然后根據(jù)當(dāng)前規(guī)則獲取相應(yīng)控制器,通過控制器的canPass(…)方法進(jìn)行判斷。
4、獲取節(jié)點(diǎn)
1static?Node?selectNodeByRequesterAndStrategy(/*@NonNull*/?FlowRule?rule,?Context?context,?DefaultNode?node)?{2???//?The?limit?app?should?not?be?empty.3???String?limitApp?=?rule.getLimitApp();4???int?strategy?=?rule.getStrategy();5???String?origin?=?context.getOrigin();67???//?判斷調(diào)用來源,這種情況下origin不能為default或other8???if?(limitApp.equals(origin)?&&?filterOrigin(origin))?{9???????//?如果調(diào)用關(guān)系策略為STRATEGY_DIRECT,表示僅判斷自己,則返回origin?statistic?node. 10???????if?(strategy?==?RuleConstant.STRATEGY_DIRECT)?{ 11???????????//?Matches?limit?origin,?return?origin?statistic?node. 12???????????return?context.getOriginNode(); 13??????} 14 15???????//?采用調(diào)用來源進(jìn)行判斷的策略 16???????return?selectReferenceNode(rule,?context,?node); 17??}?else?if?(RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp))?{?//?如果調(diào)用來源為default默認(rèn)的 18???????if?(strategy?==?RuleConstant.STRATEGY_DIRECT)?{?//?如果調(diào)用關(guān)系策略為STRATEGY_DIRECT,則返回clusterNode 19???????????//?Return?the?cluster?node. 20???????????return?node.getClusterNode(); 21??????} 22 23???????return?selectReferenceNode(rule,?context,?node); 24??}?else?if?(RuleConstant.LIMIT_APP_OTHER.equals(limitApp) 25???????&&?FlowRuleManager.isOtherOrigin(origin,?rule.getResource()))?{?//?如果調(diào)用來源為other,且調(diào)用來源不在限制規(guī)則內(nèi),為其他來源 26???????if?(strategy?==?RuleConstant.STRATEGY_DIRECT)?{ 27???????????return?context.getOriginNode(); 28??????} 29???????return?selectReferenceNode(rule,?context,?node); 30??} 31???return?null; 32}5、流量整形控制器
rule.getRater()方法會返回一個控制器,接口為TrafficShapingController,該接口的實(shí)現(xiàn)類圖如下:
從類圖可以看出,是很明顯的策略模式,分別針對不同的限流控制策略。
1、默認(rèn)策略
DefaultController該策略是sentinel的默認(rèn)策略,如果請求超出閾值,則直接拒絕請求。
1@Override2public?boolean?canPass(Node?node,?int?acquireCount,?boolean?prioritized)?{3???//?當(dāng)前已經(jīng)統(tǒng)計的數(shù)4???int?curCount?=?avgUsedTokens(node);5???if?(curCount?+?acquireCount?>?count)?{6???????//?如果是高優(yōu)先級的,且是基于qps的限流方式,則可以嘗試從下個未來的滑動窗口中預(yù)支7???????if?(prioritized?&&?grade?==?RuleConstant.FLOW_GRADE_QPS)?{8???????????long?currentTime;9???????????long?waitInMs; 10???????????currentTime?=?TimeUtil.currentTimeMillis(); 11???????????//?從下個滑動窗口中提前透支 12???????????waitInMs?=?node.tryOccupyNext(currentTime,?acquireCount,?count); 13???????????if?(waitInMs?<?OccupyTimeoutProperty.getOccupyTimeout())?{ 14???????????????node.addWaitingRequest(currentTime?+?waitInMs,?acquireCount); 15???????????????node.addOccupiedPass(acquireCount); 16???????????????sleep(waitInMs); 17 18???????????????//?PriorityWaitException?indicates?that?the?request?will?pass?after?waiting?for?{@link?@waitInMs}. 19???????????????throw?new?PriorityWaitException(waitInMs); 20??????????} 21??????} 22???????return?false; 23??} 24???return?true; 25} 26 27private?int?avgUsedTokens(Node?node)?{ 28???if?(node?==?null)?{ 29???????return?DEFAULT_AVG_USED_TOKENS; 30??} 31???//?如果當(dāng)前是線程數(shù)限流,則返回node.curThreadNum()當(dāng)前線程數(shù) 32???//?如果是QPS限流,則返回node.passQps()當(dāng)前已經(jīng)通過的qps數(shù)據(jù) 33???return?grade?==?RuleConstant.FLOW_GRADE_THREAD???node.curThreadNum()?:?(int)(node.passQps()); 34} 35 36private?void?sleep(long?timeMillis)?{ 37???try?{ 38???????Thread.sleep(timeMillis); 39??}?catch?(InterruptedException?e)?{ 40???????//?Ignore. 41??} 42}2、勻速排隊(duì)策略
RateLimiterController
1@Override2public?boolean?canPass(Node?node,?int?acquireCount,?boolean?prioritized)?{3???//?Pass?when?acquire?count?is?less?or?equal?than?0.4???if?(acquireCount?<=?0)?{5???????return?true;6??}7???//?Reject?when?count?is?less?or?equal?than?0.8???//?Otherwise,the?costTime?will?be?max?of?long?and?waitTime?will?overflow?in?some?cases.9???if?(count?<=?0)?{ 10???????return?false; 11??} 12 13???long?currentTime?=?TimeUtil.currentTimeMillis(); 14???//?Calculate?the?interval?between?every?two?requests. 15???//?計算兩個請求之間的時間間隔 16???long?costTime?=?Math.round(1.0?*?(acquireCount)?/?count?*?1000); 17 18???//?Expected?pass?time?of?this?request.?該請求的預(yù)計通過時間?=?上一次通過的時間?+?時間間隔 19???long?expectedTime?=?costTime?+?latestPassedTime.get(); 20 21???//?如果預(yù)計時間比當(dāng)前時間小,表示可以請求完全可以通過 22???if?(expectedTime?<=?currentTime)?{ 23???????//?Contention?may?exist?here,?but?it's?okay. 24???????//?這里可能存在競爭,但是不影響。 25???????latestPassedTime.set(currentTime); 26???????return?true; 27??}?else?{ 28???????//?Calculate?the?time?to?wait. 29???????//?計算等待時間 30???????long?waitTime?=?costTime?+?latestPassedTime.get()?-?TimeUtil.currentTimeMillis(); 31???????//?如果等待時間超出了等待隊(duì)列的最大時間,則無法放入等待隊(duì)列,直接拒絕 32???????if?(waitTime?>?maxQueueingTimeMs)?{ 33???????????return?false; 34??????}?else?{ 35???????????long?oldTime?=?latestPassedTime.addAndGet(costTime); 36???????????try?{ 37???????????????//?重新計算等待時間 38???????????????waitTime?=?oldTime?-?TimeUtil.currentTimeMillis(); 39???????????????//?判斷等待時間是否超過等待隊(duì)列的最大時間,如果超過了,拒絕,并且將latestPassedTime最后一次請求時間重新設(shè)置為原值 40???????????????if?(waitTime?>?maxQueueingTimeMs)?{ 41???????????????????latestPassedTime.addAndGet(-costTime); 42???????????????????return?false; 43??????????????} 44???????????????//?in?race?condition?waitTime?may?<=?0 45???????????????//?線程等待 46???????????????if?(waitTime?>?0)?{ 47???????????????????Thread.sleep(waitTime); 48??????????????} 49???????????????return?true; 50??????????}?catch?(InterruptedException?e)?{ 51??????????} 52??????} 53??} 54???return?false; 55}從代碼可以看出,勻速排隊(duì)策略是使用了虛擬隊(duì)列的方法,通過控制閾值來計算出請求的時間間隔,然后將上一次請求的時間加上時間間隔,表示下一次請求的時間,如果當(dāng)前時間比這個值大,說明已經(jīng)超出時間間隔了,當(dāng)然可以請求,反之,表示需要等待,那么等待的時長就應(yīng)該是要等到當(dāng)前時間達(dá)到預(yù)期時間才能請求,這里就有個虛擬的等待隊(duì)列,而等待其實(shí)是通過線程的等待來實(shí)現(xiàn)的。而這里所說的虛擬隊(duì)列實(shí)際上是由一系列的處于sleep狀態(tài)的線程組成的,但是實(shí)際的數(shù)據(jù)結(jié)構(gòu)上并沒有構(gòu)成隊(duì)列。
3、預(yù)熱/冷啟動策略
WarmUpController
首先看WarmUpController的屬性和構(gòu)造方法:
1//?閾值2protected?double?count;3/**4*?冷啟動的因子?,默認(rèn)為3?{@link?SentinelConfig#coldFactor()}5*/6private?int?coldFactor;7//?轉(zhuǎn)折點(diǎn)的令牌數(shù)8protected?int?warningToken?=?0;9//?最大令牌數(shù) 10private?int?maxToken; 11//?折線初始斜率,標(biāo)志流量的變化程度 12protected?double?slope; 13 14//?累積的令牌數(shù)?,累積的令牌數(shù)越多,說明系統(tǒng)利用率越低,說明當(dāng)前流量低,是冷狀態(tài) 15protected?AtomicLong?storedTokens?=?new?AtomicLong(0); 16//?最后更新令牌的時間 17protected?AtomicLong?lastFilledTime?=?new?AtomicLong(0); 18 19public?WarmUpController(double?count,?int?warmUpPeriodInSec,?int?coldFactor)?{ 20???construct(count,?warmUpPeriodInSec,?coldFactor); 21} 22 23public?WarmUpController(double?count,?int?warmUpPeriodInSec)?{ 24???construct(count,?warmUpPeriodInSec,?3); 25} 26 27/** 28*?@param?count?????????????用戶設(shè)定的閾值(這里假設(shè)設(shè)定為100) 29*?@param?warmUpPeriodInSec?默認(rèn)為10 30*?@param?coldFactor???????默認(rèn)為3 31*/ 32private?void?construct(double?count,?int?warmUpPeriodInSec,?int?coldFactor)?{ 33 34???if?(coldFactor?<=?1)?{ 35???????throw?new?IllegalArgumentException("Cold?factor?should?be?larger?than?1"); 36??} 37 38???this.count?=?count; 39 40???this.coldFactor?=?coldFactor; 41 42???//?thresholdPermits?=?0.5?*?warmupPeriod?/?stableInterval. 43???//?warningToken?=?100; 44 45???//?按默認(rèn)的warmUpPeriodInSec?=?10,表示1秒鐘10個請求,則每個請求為間隔stableInterval?=?100ms,那么coldInterval=stableInterval?*?coldFactor?=?100?*?3?=?300ms 46???//?warningToken?=?10?*?100?/?(3?-?1)?=?500 47???//?thresholdPermits?=?warningToken?=?0.5?*?warmupPeriod?/?stableInterval?=?0.5?*?warmupPeriod?/?100ms?=?500?==>>?warmupPeriod?=?100000ms 48???warningToken?=?(int)(warmUpPeriodInSec?*?count)?/?(coldFactor?-?1); 49 50???//?/?maxPermits?=?thresholdPermits?+?2?*?warmupPeriod?/ 51???//?(stableInterval?+?coldInterval) 52???//?maxToken?=?200 53 54???//?maxPermits?=?500?+?2?*?100000ms?/?(100ms?+?300ms)?=?1000 55???//?maxToken?=?500?+?(2?*?10?*?100?/?(1.0?+?3))?=?1000 56???//?maxPermits?=?maxToken 57???maxToken?=?warningToken?+?(int)(2?*?warmUpPeriodInSec?*?count?/?(1.0?+?coldFactor)); 58 59???//?slope 60???//?slope?=?(coldIntervalMicros?-?stableIntervalMicros)?/?(maxPermits 61???//?-?thresholdPermits); 62 63???//?slope?=?(3?-?1.0)?/?100?/?(600?-?500)?=?0.0002 64???slope?=?(coldFactor?-?1.0)?/?count?/?(maxToken?-?warningToken); 65 66}屬性說明:
count: 用戶設(shè)定的qps閾值。
coldFactor: 冷啟動的因子,初始默認(rèn)為3,通過SentinelConfig類的coldFactor()方法獲取,這里會有個判斷,如果啟動因子小于等于1,則會設(shè)置為默認(rèn)值3,因?yàn)槿绻切∮诘扔?,是沒有意義的,就不是預(yù)熱啟動了。
warningToken:轉(zhuǎn)折點(diǎn)的令牌數(shù),當(dāng)令牌數(shù)開始小于該值得時候,就要開啟預(yù)熱了。
maxToken:最大令牌數(shù)。
slope:折線的斜率。
storedTokens:當(dāng)前存儲的令牌數(shù)。
lastFilledTime:上一次更新令牌的時間。
總體思路:當(dāng)系統(tǒng)存儲的令牌為最大值時,說明系統(tǒng)訪問流量較低,處于冷狀態(tài),這時候當(dāng)有正常請求過來時,會讓請求通過,并且會補(bǔ)充消耗的令牌數(shù)。當(dāng)瞬時流量來臨時,一旦剩余的令牌數(shù)小于警戒令牌數(shù)(restToken <= warningToken),則表示有大流量過來,需要開啟預(yù)熱過程,開始逐漸增大允許的qps。當(dāng)qps達(dá)到用戶設(shè)定的閾值后,系統(tǒng)已經(jīng)預(yù)熱完畢,這時候就進(jìn)入了正常的請求階段。
源碼分析如下:
1@Override2public?boolean?canPass(Node?node,?int?acquireCount,?boolean?prioritized)?{3???//?當(dāng)前已經(jīng)通過的qps4???long?passQps?=?(long)?node.passQps();56???//?上一個滑動窗口的qps7???long?previousQps?=?(long)?node.previousPassQps();8???//?同步令牌,如果是出于冷啟動或預(yù)熱完畢狀態(tài),則考慮要添加令牌9???syncToken(previousQps); 10 11???//?開始計算它的斜率 12???//?如果進(jìn)入了警戒線,開始調(diào)整他的qps 13???long?restToken?=?storedTokens.get(); 14???if?(restToken?>=?warningToken)?{?//?說明一瞬間有大流量過來,消耗了大量的存儲令牌,造成剩余令牌數(shù)第一警戒值,則要開啟預(yù)熱默認(rèn),逐漸增加qps 15???????//?計算當(dāng)前離警戒線的距離 16???????long?aboveToken?=?restToken?-?warningToken; 17???????//?消耗的速度要比warning快,但是要比慢 18???????//?current?interval?=?restToken*slope+1/count 19???????//?restToken越小,interval就越小,表示系統(tǒng)越熱 20???????//?隨著aboveToken的減小,warningQps會逐漸增大 21???????double?warningQps?=?Math.nextUp(1.0?/?(aboveToken?*?slope?+?1.0?/?count)); 22???????if?(passQps?+?acquireCount?<=?warningQps)?{?//?隨著warningQps的增大,acquireCount?=?1,那么passQps允許的范圍就變大,相應(yīng)的流量就越大,系統(tǒng)越熱 23???????????return?true; 24??????} 25??}?else?{ 26???????if?(passQps?+?acquireCount?<=?count)?{ 27???????????return?true; 28??????} 29??} 30 31???return?false; 32} 33 34/** 35*?同步令牌 36*?@param?passQps 37*/ 38protected?void?syncToken(long?passQps)?{ 39???long?currentTime?=?TimeUtil.currentTimeMillis(); 40???//?把當(dāng)前時間的后三位置為0?e.g.?1601456312835?=?1601456312835?-?1601456312835?%?1000?=?1601456312000 41???currentTime?=?currentTime?-?currentTime?%?1000; 42???//?獲取上一次更新令牌的時間 43???long?oldLastFillTime?=?lastFilledTime.get(); 44???if?(currentTime?<=?oldLastFillTime)?{ 45???????return; 46??} 47 48???//?獲得目前的令牌數(shù) 49???long?oldValue?=?storedTokens.get(); 50???//?獲取新的令牌數(shù) 51???long?newValue?=?coolDownTokens(currentTime,?passQps); 52 53???//?更新累積令牌數(shù) 54???if?(storedTokens.compareAndSet(oldValue,?newValue))?{ 55???????//?去除上一次的qps,設(shè)置剩下的令牌數(shù) 56???????long?currentValue?=?storedTokens.addAndGet(0?-?passQps); 57???????if?(currentValue?<?0)?{ 58???????????//?如果剩下的令牌數(shù)小于0,則置為0。 59???????????storedTokens.set(0L); 60??????} 61???????//?設(shè)置令牌更新時間 62???????lastFilledTime.set(currentTime); 63??} 64} 65 66private?long?coolDownTokens(long?currentTime,?long?passQps)?{ 67???//?當(dāng)前擁有的令牌數(shù) 68???long?oldValue?=?storedTokens.get(); 69???long?newValue?=?oldValue; 70 71???//?添加令牌的判斷前提條件: 72???//?當(dāng)令牌的消耗程度遠(yuǎn)遠(yuǎn)低于警戒線的時候 73???if?(oldValue?<?warningToken)?{?//?這種情況表示已經(jīng)預(yù)熱結(jié)束,可以開始生成令牌了 74???????//?這里按照count?=?100來計算的話,表示舊值oldValue?+?距離上次更新的秒數(shù)時間差?*?count?,表示每秒增加count個令牌 75???????//?這里的currentTime?和?lastFilledTime.get()?都是已經(jīng)去掉毫秒數(shù)的 76???????newValue?=?(long)(oldValue?+?(currentTime?-?lastFilledTime.get())?*?count?/?1000); 77??}?else?if?(oldValue?>?warningToken)?{?//?進(jìn)入這里表示當(dāng)前是冷狀態(tài)或正處于預(yù)熱狀態(tài) 78???????if?(passQps?<?(int)count?/?coldFactor)?{?//?如果是冷狀態(tài),則補(bǔ)充令牌數(shù),避免令牌數(shù)為0 79???????????newValue?=?(long)(oldValue?+?(currentTime?-?lastFilledTime.get())?*?count?/?1000); 80??????} 81???????//?預(yù)熱階段則不添加令牌數(shù),從而限制流量的急劇攀升 82??} 83???//?限制令牌數(shù)不能超過最大令牌數(shù)maxToken 84???return?Math.min(newValue,?maxToken); 85}4、預(yù)熱的勻速排隊(duì)策略
WarmUpRateLimiterController
這種是勻速排隊(duì)模式和預(yù)熱模式的結(jié)合,這里不深入了。搞懂了上面兩種,再看這種也比較清晰了。
5、DegradeSlot
官方文檔說明:
這個 slot 主要針對資源的平均響應(yīng)時間(RT)以及異常比率,來決定資源是否在接下來的時間被自動熔斷掉。
源碼解析:
1@Override2public?void?entry(Context?context,?ResourceWrapper?resourceWrapper,?DefaultNode?node,?int?count,3?????????????????boolean?prioritized,?Object...?args)?throws?Throwable?{4???//降級判斷5???performChecking(context,?resourceWrapper);67???//?如果有自定義的slot,還會繼續(xù)進(jìn)行8???fireEntry(context,?resourceWrapper,?node,?count,?prioritized,?args);9} 10 11void?performChecking(Context?context,?ResourceWrapper?r)?throws?BlockException?{ 12???//?使用DegradeRuleManager獲得當(dāng)前資源的熔斷器 13???List<CircuitBreaker>?circuitBreakers?=?DegradeRuleManager.getCircuitBreakers(r.getName()); 14???if?(circuitBreakers?==?null?||?circuitBreakers.isEmpty())?{ 15???????return; 16??} 17???//?遍歷熔斷器,只要有任何一個滿足熔斷條件,就拋出DegradeException異常。 18???for?(CircuitBreaker?cb?:?circuitBreakers)?{ 19???????if?(!cb.tryPass(context))?{ 20???????????throw?new?DegradeException(cb.getRule().getLimitApp(),?cb.getRule()); 21??????} 22??} 23}這里有個關(guān)鍵類,DegradeRuleManager,該類中會保存所有的熔斷規(guī)則,使用Map<String, List>的格式進(jìn)行保存。當(dāng)需要使用的時候,就直接根據(jù)資源名稱,從該map中獲取對應(yīng)的熔斷器列表。
那么規(guī)則是如何加載的呢?我們看到DegradeRuleManager這個類,在加載時候,有個靜態(tài)代碼塊:
1private?static?final?RulePropertyListener?LISTENER?=?new?RulePropertyListener(); 2private?static?SentinelProperty<List<DegradeRule>>?currentProperty 3???=?new?DynamicSentinelProperty<>(); 4 5static?{ 6???currentProperty.addListener(LISTENER); 7}currentProperty.addListener(LISTENER);繼續(xù)分析該段代碼,找到DynamicSentinelProperty的addListener(…)方法:
1@Override 2public?void?addListener(PropertyListener<T>?listener)?{ 3???listeners.add(listener); 4???listener.configLoad(value); 5} 612345發(fā)現(xiàn)會調(diào)用監(jiān)聽器的configLoad(…)方法,最終會調(diào)用RulePropertyListener這個類的reloadFrom(…)方法。具體怎么解析的其實(shí)就是將規(guī)則根據(jù)資源名稱進(jìn)行歸類,并保存為map格式。
FlowSlot 限流規(guī)則引擎之限流算法原理
1、滑動窗口實(shí)現(xiàn)原理
每個時間窗口最大流量為100QPS;
20和80表示當(dāng)時的真實(shí)QPS數(shù)量;
一個時間窗口分為兩個半限,上半限和下半限;
如果時間窗口1的下半限和時間窗口2的上半限的峰值超過100QPS,那么就丟失一部分流量。
但是這樣并不是我們想要的,那么我們來看看計數(shù)器滑動窗口。
2、計數(shù)器滑動窗口原理
在滑動窗口算法上優(yōu)化;
相鄰的兩個半限總和>總閾值,才丟棄流量。
3、令牌桶算法
令牌漏斗桶存著所有的Token;
按期發(fā)放Token;
如果桶滿了,就會熔斷;
達(dá)到Token的Request可以獲取資源;
得不到的就拋棄。
圖文總結(jié)?
1、整體流程
(1)請求發(fā)送到web容器;
(2)Sentinel Aop攔截所有Sentinel Resouce;
(3)如果資源的規(guī)則通過則執(zhí)行正常流程;
(4)不通過則返回流控異常提示。
2、Sentinel AOP切面運(yùn)行流程
更多閱讀推薦
都在說云原生,它的技術(shù)圖譜你真的了解嗎?
SRE 是如何保障穩(wěn)定性的
如何寫出讓 CPU 跑得更快的代碼?
Serverless 在 SaaS 領(lǐng)域的最佳實(shí)踐
云原生人物志 | Pulsar翟佳:社區(qū)的信任最重要
一目了然的 Docker 環(huán)境配置指南
總結(jié)
以上是生活随笔為你收集整理的深度思考 Spring Cloud + Alibaba Sentinel 源码原理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Cloud Native Infrast
- 下一篇: 0 改造,让单体/微服务应用成为 Ser