Dubbo负载均衡与集群容错
文章目錄
- 負載均衡與集群容錯
- Invoker
- 服務目錄
- RegistryDirectory
- StaticDirectory
- 服務路由
- Cluster
- 負載均衡
負載均衡與集群容錯
Invoker
在Dubbo中Invoker就是一個具有調用功能的對象,在服務提供端就是實際的服務實現,只是將服務實現封裝起來變成一個Invoker。
在服務消費端,從注冊中心得到服務提供者的信息之后,將一條條信息封裝為Invoker,這個Invoker就具備了遠程調用的能力。
綜上,Dubbo就是創建了一個統一的模型,將可調用(可執行體)的服務對象都統一封裝為Invoker。
而ClusterInvoker就是將多個服務引入的Invoker封裝起來,對外統一暴露一個Invoker,并且賦予這些Invoker集群容錯的功能。
服務目錄
服務目錄,即Directory,實際上它就是多個Invoker的集合,服務提供端一般都會集群分布,同樣的服務會有多個提供者,因此需要一個服務目錄來統一存放它們,需要調用服務的時候便從這個服務目錄中進行挑選。
同時服務目錄還是實現了NotifyListener接口,當集群中新增了一臺服務提供者或者下線了一臺服務提供者,目錄都會對服務提供者進行更新,新增或者刪除對應的Invoker。
從上圖中,可以看到用了一個抽象類AbstractDirectory來實現 Directory接口,抽象類中運用到了模板方法模式,將一些公共方法和邏輯寫好,作為一個骨架,然后具體實現由了兩個子類來完成,兩個子類分別為StaticDirectory和RegistryDirectory。
RegistryDirectory
RegistryDirectory實現了NotifyListener接口,可以監聽注冊中心的變化,當注冊中心配置發生變化時,服務目錄也可以收到變更通知,然后根據更新之后的配置刷新Invoker列表。
由此可知RegistryDirectory共有三個作用:
- 獲取Invoker列表
- 監聽注冊中心
- 刷新Invoker列表
獲取Invoker列表
RegistryDirectory實現了父類AbstractDirectory的抽象方法doList(),該方法可以得到Invoker列表
public List<Invoker<T>> doList(Invocation invocation) {if (this.forbidden) {throw new RpcException(....);} else {List<Invoker<T>> invokers = null;Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; //獲取方法調用名和Invoker的映射表if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {String methodName = RpcUtils.getMethodName(invocation);Object[] args = RpcUtils.getArguments(invocation);//以下就是根據方法名和方法參數獲取可調用的Invokerif (args != null && args.length > 0 && args[0] != null && (args[0] instanceof String || args[0].getClass().isEnum())) {invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);}if (invokers == null) {invokers = (List)localMethodInvokerMap.get(methodName);}if (invokers == null) {invokers = (List)localMethodInvokerMap.get("*");}if (invokers == null) {Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();if (iterator.hasNext()) {invokers = (List)iterator.next();}}}return (List)(invokers == null ? new ArrayList(0) : invokers);} }監聽注冊中心
通過實現NotifyListener接口可以感知注冊中心的數據變更。
RegistryDirectory定義了三個集合invokerUrls routerUrls configuratorUrls分別處理對應的配置然后轉化成對象。
public synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList();List<URL> routerUrls = new ArrayList();List<URL> configuratorUrls = new ArrayList();Iterator i$ = urls.iterator();while(true) {while(true) {while(i$.hasNext()) {//....根據urls填充上述三個列表}if (configuratorUrls != null && !configuratorUrls.isEmpty()) {this.configurators = toConfigurators(configuratorUrls); //根據urls轉化為configurators配置}List localConfigurators;if (routerUrls != null && !routerUrls.isEmpty()) {localConfigurators = this.toRouters(routerUrls);if (localConfigurators != null) {this.setRouters(localConfigurators); //根據urls轉化為routers配置}}localConfigurators = this.configurators;this.overrideDirectoryUrl = this.directoryUrl;Configurator configurator;if (localConfigurators != null && !localConfigurators.isEmpty()) {for(Iterator i$ = localConfigurators.iterator(); i$.hasNext(); this.overrideDirectoryUrl = configurator.configure(this.overrideDirectoryUrl)) {configurator = (Configurator)i$.next();}}this.refreshInvoker(invokerUrls); //根據invokerUrls刷新invoker列表return;}} }刷新Invoker列表
private void refreshInvoker(List<URL> invokerUrls) {//如果invokerUrls只有一個URL并且協議是empty,那么清除所有invokerif (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && "empty".equals(((URL)invokerUrls.get(0)).getProtocol())) {this.forbidden = true;this.methodInvokerMap = null;this.destroyAllInvokers();} else {this.forbidden = false;Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; //獲取舊的Invoker列表if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {invokerUrls.addAll(this.cachedInvokerUrls);} else {this.cachedInvokerUrls = new HashSet();this.cachedInvokerUrls.addAll(invokerUrls);}if (invokerUrls.isEmpty()) {return;}//根據URL生成InvokerMapMap<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);//根據新的InvokerMap生成方法名和Invoker列表對應的MapMap<String, List<Invoker<T>>> newMethodInvokerMap = this.toMethodInvokers(newUrlInvokerMap);if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls.toString()));return;}this.methodInvokerMap = this.multiGroup ? this.toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;this.urlInvokerMap = newUrlInvokerMap;try {this.destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); //銷毀無效的Invoker} catch (Exception var6) {logger.warn("destroyUnusedInvokers error. ", var6);}} }上述操作就是根據invokerUrls數量以及協議頭是否為empty來判斷是否禁用所有invokers,如果不禁用的話將invokerUrls轉化為Invoker,并且得到<url,Invoker>的映射關系。
再進一步進行轉化,得到<methodName,List>的映射關系,再將同一組的Invoker進行合并,將合并結果賦值給methodInvokerMap,這個methodInvokerMap就是在doList中使用到的Map。
最后刷新InvokerMap,銷毀無效的Invoker。
StaticDirectory
StaticDirectory是靜態目錄,所有Invoker是固定的不會刪減的,并且所有Invoker由構造器來傳入。
內部邏輯也相當簡單,只定義了一個列表用于存儲Invokers。實現父類的方法也只是將這些Invokers原封不動地返回。
private final List<Invoker<T>> invokers;protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {return this.invokers; }服務路由
服務路由規定了服務消費者可以調用哪些服務提供者,Dubbo常用的是條件路由ConditionRouter。
條件路由由兩個條件組成,格式為[服務消費者匹配條件] => [服務提供者匹配條件],例如172.26.29.15 => 172.27.19.89規定了只有IP為172.26.29.15的服務消費者才可以訪問IP為172.27.19.89的服務提供者,不可以調用其他的服務。
路由一樣是通過RegistryDirectory中的notify()更新的,在調用toMethodInvokers()的時候會進行服務器級別的路由和方法級別的路由。
Cluster
在前面的流程中我們已經通過Directory獲取了服務目錄,并且通過路由獲取了一個或多個Invoker,但是對于服務消費者還是需要進行選擇,篩選出一個Invoker進行調用。
Dubbo默認的Cluster實現有多種,如下:
- FailoverCluster
- FailfastCluster
- FailsafeCluster
- FailbackCluster
- BroadcastCluster
- AvailableCluster
每個Cluster內部返回的都是xxxClusterInvoker,例如FailoverCluster:
public class FailoverCluster implements Cluster {public static final String NAME = "failover";public FailoverCluster() {}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker(directory);} }FailoverClusterInvoker
FailoverClusterInvoker實現的功能是失敗調用(有重試次數)自動切換。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;this.checkInvokers(invokers, invocation);//重試次數int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;if (len <= 0) {len = 1;}RpcException le = null;List<Invoker<T>> invoked = new ArrayList(invokers.size());Set<String> providers = new HashSet(len);//根據重試次數循環調用for(int i = 0; i < len; ++i) {if (i > 0) {this.checkWhetherDestroyed();copyinvokers = this.list(invocation);this.checkInvokers(copyinvokers, invocation);}//負載均衡篩選出一個Invoker作本次調用Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);//將使用過的Invoker保存起來,下次重試時做過濾用invoked.add(invoker);//記錄到上下文中RpcContext.getContext().setInvokers(invoked);try {//發起調用Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("....");}Result var12 = result;return var12;} catch (RpcException var17) { //catch異常 繼續下次循環重試if (var17.isBiz()) {throw var17;}le = var17;} catch (Throwable var18) {le = new RpcException(var18.getMessage(), var18);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(....); }上述方法中,首先獲取重試次數len,根據重試次數進行循環調用,調用發生異常會被catch住,然后重新調用。
每次循環會通過負載均衡選出一個Invoker,然后利用這個Invoker進行遠程調用,每次選出的Invoker會記錄下來,在下次調用的select()中會將使用上次調用的Invoker進行重試,如果上一次沒有調用或者上次調用的Invoker下線了,那么會重新進行負載均衡進行選擇。
FailfastClusterInvoker
FailfastClusterInvoker只會進行一次遠程調用,如果失敗后立馬拋出異常。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {this.checkInvokers(invokers, invocation);Invoker invoker = this.select(loadbalance, invocation, invokers, (List)null); //負載均衡選擇Invokertry {return invoker.invoke(invocation); //發起遠程調用} catch (Throwable var6) { //失敗調用直接將錯誤拋出if (var6 instanceof RpcException && ((RpcException)var6).isBiz()) {throw (RpcException)var6;} else {throw new RpcException(....);}} }FailsafeClusterInvoker
FailsafeClusterInvoker是一種安全失敗的cluster,調用發生錯誤僅僅是記錄一下日志,然后就返回了空結果。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {try {this.checkInvokers(invokers, invocation);//負載均衡選出Invoker后直接進行調用Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null); return invoker.invoke(invocation);} catch (Throwable var5) { //調用錯誤只是打印日志logger.error("Failsafe ignore exception: " + var5.getMessage(), var5);return new RpcResult();} }FailbackClusterInvoker
FailbackClusterInvoker調用失敗后,會記錄下本次調用,然后返回一個空結果給服務消費者,并且會通過一個定時任務對失敗的調用進行重試。適用于執行消息通知等最大努力場景。
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {try {this.checkInvokers(invokers, invocation);//負載均衡選出InvokerInvoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)null);//執行調用,執行成功返回調用結果return invoker.invoke(invocation);} catch (Throwable var5) {//調用失敗logger.error("....");//記錄下本次失敗調用this.addFailed(invocation, this);//返回空結果return new RpcResult();} }private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {if (this.retryFuture == null) {synchronized(this) {//如果未創建重試本次調用的定時任務if (this.retryFuture == null) {//創建定時任務this.retryFuture = this.scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {public void run() {try {//定時進行重試FailbackClusterInvoker.this.retryFailed();} catch (Throwable var2) {FailbackClusterInvoker.logger.error("....", var2);}}}, 5000L, 5000L, TimeUnit.MILLISECONDS);}}}//將invocation和router存入mapthis.failed.put(invocation, router); }void retryFailed() {if (this.failed.size() != 0) {Iterator i$ = (new HashMap(this.failed)).entrySet().iterator();while(i$.hasNext()) {Entry<Invocation, AbstractClusterInvoker<?>> entry = (Entry)i$.next();Invocation invocation = (Invocation)entry.getKey();Invoker invoker = (Invoker)entry.getValue();try {//進行重試調用invoker.invoke(invocation);//調用成功未產生異常則移除本次失敗調用的記錄,銷毀定時任務this.failed.remove(invocation);} catch (Throwable var6) {logger.error("....", var6);}}} }邏輯比較簡單,大致就是當調用錯誤時返回空結果,并記錄下本次失敗調用到failed<invocation,router>中,并且會創建一個定時任務定時地去調用failed中記錄的失敗調用,如果調用成功了就從failed中移除這個調用。
ForkingClusterInvoker
ForkingClusterInvoker運行時,會將所有Invoker都放入線程池中并發調用,只要有一個Invoker調用成功了就返回結果,doInvoker方法立即停止運行。
適用于對實時性比較高的讀寫操作。
public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {Result var19;try {this.checkInvokers(invokers, invocation);int forks = this.getUrl().getParameter("forks", 2);int timeout = this.getUrl().getParameter("timeout", 1000);final Object selected;if (forks > 0 && forks < invokers.size()) {selected = new ArrayList();for(int i = 0; i < forks; ++i) {Invoker<T> invoker = this.select(loadbalance, invocation, invokers, (List)selected);if (!((List)selected).contains(invoker)) {//選擇好的Invoker放入這個selected列表((List)selected).add(invoker);}}} else {selected = invokers;}RpcContext.getContext().setInvokers((List)selected);final AtomicInteger count = new AtomicInteger();//阻塞隊列final BlockingQueue<Object> ref = new LinkedBlockingQueue();Iterator i$ = ((List)selected).iterator();while(i$.hasNext()) {final Invoker<T> invoker = (Invoker)i$.next();this.executor.execute(new Runnable() {public void run() {try {Result result = invoker.invoke(invocation);ref.offer(result);} catch (Throwable var3) {int value = count.incrementAndGet();if (value >= ((List)selected).size()) { //等待所有調用都產生異常才入隊ref.offer(var3);}}}});}try {//阻塞獲取結果Object ret = ref.poll((long)timeout, TimeUnit.MILLISECONDS);if (ret instanceof Throwable) {Throwable e = (Throwable)ret;throw new RpcException(....);}var19 = (Result)ret;} catch (InterruptedException var14) {throw new RpcException(....);}} finally {RpcContext.getContext().clearAttachments();}return var19; }BroadcastClusterInvoker
BroadcastClusterInvoker運行時會將所有Invoker逐個調用,在最后判斷中如果有一個調用產生錯誤,則拋出異常。
適用于通知所有提供者更新緩存或日志等本地資源的場景。
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {this.checkInvokers(invokers, invocation);RpcContext.getContext().setInvokers(invokers);RpcException exception = null;Result result = null;Iterator i$ = invokers.iterator();while(i$.hasNext()) {Invoker invoker = (Invoker)i$.next();try {result = invoker.invoke(invocation);} catch (RpcException var9) {exception = var9;logger.warn(var9.getMessage(), var9);} catch (Throwable var10) {exception = new RpcException(var10.getMessage(), var10);logger.warn(var10.getMessage(), var10);}}//如果調用過程中發生過錯誤 拋出異常if (exception != null) {throw exception;} else {//返回調用結果return result;} }AbstractClusterInvoker
AbstractClusterInvoker是上述所有類的父類,內部結構較為簡單。AvailableCluster內部返回結果就是AvailableClusterInvoker。
public class AvailableCluster implements Cluster {public static final String NAME = "available";public AvailableCluster() {}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new AbstractClusterInvoker<T>(directory) {public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {Iterator i$ = invokers.iterator();Invoker invoker;do { //循環判斷:哪個invoker能用就調用哪個if (!i$.hasNext()) {throw new RpcException("No provider available in " + invokers);}invoker = (Invoker)i$.next();} while(!invoker.isAvailable());return invoker.invoke(invocation);}};} }總結
上述中有很多種集群的實現,各適用于不同的場景,加了Cluster這個中間層,向服務消費者屏蔽了集群調用的細節,并且支持不同場景使用不同的模式。
負載均衡
Dubbo中的負載均衡,即LoadBalance,服務提供者一般都是集群分布,所以需要Dubbo選擇出合適的服務提供者來給服務消費者調用。
Dubbo中提供了多種負載均衡算法:
- RandomLoadBalance
- LeastActiveLoadBalance
- ConsistentHashLoadBalance
- RoundRobinLoadBalance
AbstractLoadBalance
實現類都繼承了于這個類,該類實現了LoadBalance,使用模板方法模式,將一些公用的邏輯封裝好,而具體的實現由子類自定義。
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers != null && !invokers.isEmpty()) {//子類實現return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);} else {return null;} }protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> var1, URL var2, Invocation var3);服務剛啟動需要預熱,不能突然讓服務負載過高,需要進行服務的降權。
protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), "weight", 100); //獲得權重if (weight > 0) {long timestamp = invoker.getUrl().getParameter("remote.timestamp", 0L); //啟動時間if (timestamp > 0L) {int uptime = (int)(System.currentTimeMillis() - timestamp); //計算已啟動時長int warmup = invoker.getUrl().getParameter("warmup", 600000);if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight(uptime, warmup, weight); //降權}}}return weight; }RandomLoadBalance
使用了加權隨機算法,假設現在有三個節點A,B,C,然后賦予這幾個節點一定權重,分別為1,2,3,那么可計算得到總權重為6,那么這幾個節點被訪問的可能性分別為1/6,2/6,3/6。
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); //Invoker個數int totalWeight = 0; //總權重boolean sameWeight = true; //權重是否相同int offset; int i;for(offset = 0; offset < length; ++offset) { i = this.getWeight((Invoker)invokers.get(offset), invocation); //得到權重totalWeight += i; //計算總權重//是否權重都相同if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {sameWeight = false; }}if (totalWeight > 0 && !sameWeight) {offset = this.random.nextInt(totalWeight); //獲得隨機偏移量//判斷偏移量落在哪個片段上for(i = 0; i < length; ++i) {offset -= this.getWeight((Invoker)invokers.get(i), invocation);if (offset < 0) {return (Invoker)invokers.get(i); }}}return (Invoker)invokers.get(this.random.nextInt(length)); }LeastActiveLoadBalance
最少活躍數負載均衡,接收一個請求后,請求活躍數+1,處理完一個請求后,請求活躍數-1,請求活躍數少既說明現在服務器壓力小也說明該服務器處理請求快,沒有堆積什么請求。
總的流程是先遍歷Invokers列表,尋找當前請求活躍數最少的Invoker,如果有多個Invoker具有相同的最小請求活躍數,則根據他們的權重來進行篩選。
ConsistentHashLoadBalance
將服務器的IP等信息生成一個Hash值,將這個值映射到Hash圓環上作為某個節點,當查找節點時,通過一個Key來順時針查找。
Dubbo還引入了160個虛擬節點,使得數據更加分散,避免請求積壓在某個節點上。
并且Hash值是方法級別的,一個服務的每個方法都有一個ConsistentHashSelector,根據參數值來計算得出Hash值,
RoundRobinLoadBalance
加權輪詢負載均衡,這種輪詢是平滑的,假設A和B的權重為10:30,那么輪詢的結果可能是A、B、B、A、A、B、B、B…,40次調用下來A調用了10次,B調用了30次。
總結
服務引入時,會將多個遠程調用塞入Directory,然后通過Cluster來封裝,同時根據需要提供各種容錯功能,最終統一暴露一個Invoker給服務消費者,服務消費者調用的時候會從目錄得到Invoker列表,經過路由的過濾以及負載均衡最終得到一個Invoker發起調用。
以上。
如有不足或錯誤歡迎指正。
總結
以上是生活随笔為你收集整理的Dubbo负载均衡与集群容错的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Dubbo服务引用过程
- 下一篇: SpringtBoot+SpringSe