dubbo源码分析系列——dubbo-cluster模块源码分析
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
模塊功能介紹
該模塊的使用介紹請(qǐng)參考dubbo官方用戶手冊(cè)如下章節(jié)內(nèi)容。
- 集群容錯(cuò)
- 負(fù)載均衡
- 路由規(guī)則
- 配置規(guī)則
- 注冊(cè)中心參考手冊(cè)
其中注冊(cè)中心其實(shí)是對(duì)于目錄服務(wù)的一種實(shí)現(xiàn)方式,本文不會(huì)對(duì)注冊(cè)中心進(jìn)行詳細(xì)講解。
?
核心類圖
核心源碼分析
核心接口概念及關(guān)系
各節(jié)點(diǎn)關(guān)系:
- 這里的Invoker是Provider的一個(gè)可調(diào)用Service的抽象,Invoker封裝了Provider地址及Service接口信息。
- Directory代表多個(gè)Invoker,可以把它看成List<Invoker>,但與List不同的是,它的值可能是動(dòng)態(tài)變化的,比如注冊(cè)中心推送變更。
- Cluster將Directory中的多個(gè)Invoker偽裝成一個(gè)Invoker,對(duì)上層透明,偽裝過程包含了容錯(cuò)邏輯,調(diào)用失敗后,重試另一個(gè)。
- Router負(fù)責(zé)從多個(gè)Invoker中按路由規(guī)則選出子集,比如讀寫分離,應(yīng)用隔離等。
- LoadBalance負(fù)責(zé)從多個(gè)Invoker中選出具體的一個(gè)用于本次調(diào)用,選的過程包含了負(fù)載均衡算法,調(diào)用失敗后,需要重選。
由于每種接口都有多種實(shí)現(xiàn)類,篇幅和時(shí)間有限,我們選擇其中最為典型的一種來進(jìn)行源碼分析。
Cluster
擴(kuò)展接口介紹
集群的源碼如下。
package com.alibaba.dubbo.rpc.cluster;import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.support.FailoverCluster;/*** Cluster. (SPI, Singleton, ThreadSafe)* * <a href="http://en.wikipedia.org/wiki/Computer_cluster">Cluster</a>* <a href="http://en.wikipedia.org/wiki/Fault-tolerant_system">Fault-Tolerant</a>* * @author william.liangf*/ @SPI(FailoverCluster.NAME) public interface Cluster {/*** Merge the directory invokers to a virtual invoker.* * @param <T>* @param directory* @return cluster invoker* @throws RpcException*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException;}該接口只有一個(gè)方法,就是將directory對(duì)象中的多個(gè)invoker的集合整合成一個(gè)invoker對(duì)象。該方法被ReferenceConfig類的createProxy方法調(diào)用,調(diào)用它的代碼如下。
// 對(duì)有注冊(cè)中心的Cluster 只用 AvailableClusterURL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers));Cluster內(nèi)置有9個(gè)擴(kuò)展實(shí)現(xiàn)類,都實(shí)現(xiàn)了不同的集群容錯(cuò)策略,我們只分析默認(rèn)的自動(dòng)故障轉(zhuǎn)移的擴(kuò)展實(shí)現(xiàn)FailoverCluster。
FailoverCluster
源碼如下,只是構(gòu)造了一個(gè)類型為FailoverClusterInvoker的invoker對(duì)象。
public class FailoverCluster implements Cluster {public final static String NAME = "failover";public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker<T>(directory);}}我們進(jìn)入看看FailoverClusterInvoker的源碼。
?
該類又繼承自抽象實(shí)現(xiàn)類AbstractClusterInvoker,使用該類的一些方法,因此也要結(jié)合該類的源碼一起看。
/** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.support;import java.util.ArrayList; import java.util.List;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.Version; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Result; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; import com.alibaba.dubbo.rpc.cluster.LoadBalance; import com.alibaba.dubbo.rpc.support.RpcUtils;/*** AbstractClusterInvoker* * @author william.liangf* @author chao.liuc*/ public abstract class AbstractClusterInvoker<T> implements Invoker<T> {private static final Logger logger = LoggerFactory.getLogger(AbstractClusterInvoker.class);protected final Directory<T> directory;protected final boolean availablecheck;private volatile boolean destroyed = false;private volatile Invoker<T> stickyInvoker = null;public AbstractClusterInvoker(Directory<T> directory) {this(directory, directory.getUrl());}public AbstractClusterInvoker(Directory<T> directory, URL url) {if (directory == null)throw new IllegalArgumentException("service directory == null");this.directory = directory ;//sticky 需要檢測(cè) avaliablecheck this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK) ;}public Class<T> getInterface() {return directory.getInterface();}public URL getUrl() {return directory.getUrl();}public boolean isAvailable() {Invoker<T> invoker = stickyInvoker;if (invoker != null) {return invoker.isAvailable();}return directory.isAvailable();}public void destroy() {directory.destroy();destroyed = true;}/*** 使用loadbalance選擇invoker.</br>* a)先lb選擇,如果在selected列表中 或者 不可用且做檢驗(yàn)時(shí),進(jìn)入下一步(重選),否則直接返回</br>* b)重選驗(yàn)證規(guī)則:selected > available .保證重選出的結(jié)果盡量不在select中,并且是可用的 * * @param availablecheck 如果設(shè)置true,在選擇的時(shí)候先選invoker.available == true* @param selected 已選過的invoker.注意:輸入保證不重復(fù)* */protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;String methodName = invocation == null ? "" : invocation.getMethodName();boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ;{//ignore overloaded methodif ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){stickyInvoker = null;}//ignore cucurrent problemif (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){if (availablecheck && stickyInvoker.isAvailable()){return stickyInvoker;}}}Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);if (sticky){stickyInvoker = invoker;}return invoker;}private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);// 如果只有兩個(gè)invoker,退化成輪循if (invokers.size() == 2 && selected != null && selected.size() > 0) {return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);}Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);//如果 selected中包含(優(yōu)先判斷) 或者 不可用&&availablecheck=true 則重試.if( (selected != null && selected.contains(invoker))||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){try{Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);if(rinvoker != null){invoker = rinvoker;}else{//看下第一次選的位置,如果不是最后,選+1位置.int index = invokers.indexOf(invoker);try{//最后在避免碰撞invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;}catch (Exception e) {logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);}}}catch (Throwable t){logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);}}return invoker;} /*** 重選,先從非selected的列表中選擇,沒有在從selected列表中選擇.* @param loadbalance* @param invocation* @param invokers* @param selected* @return* @throws RpcException*/private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)throws RpcException {//預(yù)先分配一個(gè),這個(gè)列表是一定會(huì)用到的.List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());//先從非select中選if( availablecheck ){ //選isAvailable 的非selectfor(Invoker<T> invoker : invokers){if(invoker.isAvailable()){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}else{ //選全部非selectfor(Invoker<T> invoker : invokers){if(selected ==null || !selected.contains(invoker)){reselectInvokers.add(invoker);}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}//最后從select中選可用的. {if(selected != null){for(Invoker<T> invoker : selected){if((invoker.isAvailable()) //優(yōu)先選available && !reselectInvokers.contains(invoker)){reselectInvokers.add(invoker);}}}if(reselectInvokers.size()>0){return loadbalance.select(reselectInvokers, getUrl(), invocation);}}return null;}public Result invoke(final Invocation invocation) throws RpcException {checkWheatherDestoried();LoadBalance loadbalance;List<Invoker<T>> invokers = list(invocation);if (invokers != null && invokers.size() > 0) {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl().getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));} else {loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);}RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);return doInvoke(invocation, invokers, loadbalance);}protected void checkWheatherDestoried() {if(destroyed){throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost()+ " use dubbo version " + Version.getVersion()+ " is now destroyed! Can not invoke any more.");}}@Overridepublic String toString() {return getInterface() + " -> " + getUrl().toString();}protected void checkInvokers(List<Invoker<T>> invokers, Invocation invocation) {if (invokers == null || invokers.size() == 0) {throw new RpcException("Failed to invoke the method "+ invocation.getMethodName() + " in the service " + getInterface().getName() + ". No provider available for the service " + directory.getUrl().getServiceKey()+ " from registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost()+ " using the dubbo version " + Version.getVersion()+ ". Please check if the providers have been started and registered.");}}protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,LoadBalance loadbalance) throws RpcException;protected List<Invoker<T>> list(Invocation invocation) throws RpcException {List<Invoker<T>> invokers = directory.list(invocation);return invokers;} }源碼實(shí)現(xiàn)分析。
LoadBalance
負(fù)載均衡器
擴(kuò)展接口定義
@SPI(RandomLoadBalance.NAME) public interface LoadBalance {/*** select one invoker in list.* * @param invokers invokers.* @param url refer url* @param invocation invocation.* @return selected invoker.*/@Adaptive("loadbalance")<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}上述源碼所示,負(fù)載均衡只定義了一個(gè)方法,就是在候選的invokers中選擇一個(gè)invoker對(duì)象出來。默認(rèn)的擴(kuò)展實(shí)現(xiàn)是random。那我么就分析RandomLoadBalance的源碼。
RandomLoadBalance
/** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List; import java.util.Random;import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker;/*** random load balance.** @author qianlei* @author william.liangf*/ public class RandomLoadBalance extends AbstractLoadBalance {public static final String NAME = "random";private final Random random = new Random();protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {int length = invokers.size(); // 總個(gè)數(shù)int totalWeight = 0; // 總權(quán)重boolean sameWeight = true; // 權(quán)重是否都一樣for (int i = 0; i < length; i++) {int weight = getWeight(invokers.get(i), invocation);totalWeight += weight; // 累計(jì)總權(quán)重if (sameWeight && i > 0&& weight != getWeight(invokers.get(i - 1), invocation)) {sameWeight = false; // 計(jì)算所有權(quán)重是否一樣}}if (totalWeight > 0 && ! sameWeight) {// 如果權(quán)重不相同且權(quán)重大于0則按總權(quán)重?cái)?shù)隨機(jī)int offset = random.nextInt(totalWeight);// 并確定隨機(jī)值落在哪個(gè)片斷上for (int i = 0; i < length; i++) {offset -= getWeight(invokers.get(i), invocation);if (offset < 0) {return invokers.get(i);}}}// 如果權(quán)重相同或權(quán)重為0則均等隨機(jī)return invokers.get(random.nextInt(length));}}該類繼承了抽象類AbstractLoadBalance,因此我們也要結(jié)合該類一起分析。
/** Copyright 1999-2011 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.loadbalance;import java.util.List;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.cluster.LoadBalance;/*** AbstractLoadBalance* * @author william.liangf*/ public abstract class AbstractLoadBalance implements LoadBalance {public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {if (invokers == null || invokers.size() == 0)return null;if (invokers.size() == 1)return invokers.get(0);return doSelect(invokers, url, invocation);}protected abstract <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation);protected int getWeight(Invoker<?> invoker, Invocation invocation) {int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);if (weight > 0) {long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);if (timestamp > 0L) {int uptime = (int) (System.currentTimeMillis() - timestamp);int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);if (uptime > 0 && uptime < warmup) {weight = calculateWarmupWeight(uptime, warmup, weight);}}}return weight;}static int calculateWarmupWeight(int uptime, int warmup, int weight) {int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) );return ww < 1 ? 1 : (ww > weight ? weight : ww);}}源碼分析如下:
Router
接口定義
public interface Router extends Comparable<Router> {/*** get the router url.* * @return url*/URL getUrl();/*** route.* * @param invokers* @param url refer url* @param invocation* @return routed invokers* @throws RpcException*/<T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;}路由器就定義了上述2個(gè)方法,核心方法是route,從大的invoker列表結(jié)合中根據(jù)規(guī)則過濾出一個(gè)子集合。我們這里只分析實(shí)現(xiàn)類ConditionRouter的源碼。
ConditionRouter
/** Copyright 1999-2012 Alibaba Group.* * Licensed under the Apache License, Version 2.0 (the "License");* you may not use this file except in compliance with the License.* You may obtain a copy of the License at* * http://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/ package com.alibaba.dubbo.rpc.cluster.router.condition;import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern;import com.alibaba.dubbo.common.Constants; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.logger.Logger; import com.alibaba.dubbo.common.logger.LoggerFactory; import com.alibaba.dubbo.common.utils.NetUtils; import com.alibaba.dubbo.common.utils.StringUtils; import com.alibaba.dubbo.common.utils.UrlUtils; import com.alibaba.dubbo.rpc.Invocation; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Router;/*** ConditionRouter* * @author william.liangf*/ public class ConditionRouter implements Router, Comparable<Router> {private static final Logger logger = LoggerFactory.getLogger(ConditionRouter.class);private final URL url;private final int priority;private final boolean force;private final Map<String, MatchPair> whenCondition;private final Map<String, MatchPair> thenCondition;public ConditionRouter(URL url) {this.url = url;this.priority = url.getParameter(Constants.PRIORITY_KEY, 0);this.force = url.getParameter(Constants.FORCE_KEY, false);try {String rule = url.getParameterAndDecoded(Constants.RULE_KEY);if (rule == null || rule.trim().length() == 0) {throw new IllegalArgumentException("Illegal route rule!");}rule = rule.replace("consumer.", "").replace("provider.", "");int i = rule.indexOf("=>");String whenRule = i < 0 ? null : rule.substring(0, i).trim();String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim();Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule);Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule);// NOTE: When條件是允許為空的,外部業(yè)務(wù)來保證類似的約束條件this.whenCondition = when;this.thenCondition = then;} catch (ParseException e) {throw new IllegalStateException(e.getMessage(), e);}}public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation)throws RpcException {if (invokers == null || invokers.size() == 0) {return invokers;}try {if (! matchWhen(url)) {return invokers;}List<Invoker<T>> result = new ArrayList<Invoker<T>>();if (thenCondition == null) {logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey());return result;}for (Invoker<T> invoker : invokers) {if (matchThen(invoker.getUrl(), url)) {result.add(invoker);}}if (result.size() > 0) {return result;} else if (force) {logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(Constants.RULE_KEY));return result;}} catch (Throwable t) {logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t);}return invokers;}public URL getUrl() {return url;}public int compareTo(Router o) {if (o == null || o.getClass() != ConditionRouter.class) {return 1;}ConditionRouter c = (ConditionRouter) o;return this.priority == c.priority ? url.toFullString().compareTo(c.url.toFullString()) : (this.priority > c.priority ? 1 : -1);}public boolean matchWhen(URL url) {return matchCondition(whenCondition, url, null);}public boolean matchThen(URL url, URL param) {return thenCondition != null && matchCondition(thenCondition, url, param);}private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param) {Map<String, String> sample = url.toMap();for (Map.Entry<String, String> entry : sample.entrySet()) {String key = entry.getKey();MatchPair pair = condition.get(key);if (pair != null && ! pair.isMatch(entry.getValue(), param)) {return false;}}return true;}private static Pattern ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)");private static Map<String, MatchPair> parseRule(String rule)throws ParseException {Map<String, MatchPair> condition = new HashMap<String, MatchPair>();if(StringUtils.isBlank(rule)) {return condition;} // 匹配或不匹配Key-Value對(duì)MatchPair pair = null;// 多個(gè)Value值Set<String> values = null;final Matcher matcher = ROUTE_PATTERN.matcher(rule);while (matcher.find()) { // 逐個(gè)匹配String separator = matcher.group(1);String content = matcher.group(2);// 表達(dá)式開始if (separator == null || separator.length() == 0) {pair = new MatchPair();condition.put(content, pair);}// KV開始else if ("&".equals(separator)) {if (condition.get(content) == null) {pair = new MatchPair();condition.put(content, pair);} else {condition.put(content, pair);}}// KV的Value部分開始else if ("=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.matches;values.add(content);}// KV的Value部分開始else if ("!=".equals(separator)) {if (pair == null)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values = pair.mismatches;values.add(content);}// KV的Value部分的多個(gè)條目else if (",".equals(separator)) { // 如果為逗號(hào)表示if (values == null || values.size() == 0)throw new ParseException("Illegal route rule \""+ rule + "\", The error char '" + separator+ "' at index " + matcher.start() + " before \""+ content + "\".", matcher.start());values.add(content);} else {throw new ParseException("Illegal route rule \"" + rule+ "\", The error char '" + separator + "' at index "+ matcher.start() + " before \"" + content + "\".", matcher.start());}}return condition;}private static final class MatchPair {final Set<String> matches = new HashSet<String>();final Set<String> mismatches = new HashSet<String>();public boolean isMatch(String value, URL param) {for (String match : matches) {if (! UrlUtils.isMatchGlobPattern(match, value, param)) {return false;}}for (String mismatch : mismatches) {if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) {return false;}}return true;}} }該源碼實(shí)現(xiàn)了如下條件路由器功能。
基于條件表達(dá)式的路由規(guī)則,如:
| host = 10.20.153.10 => host = 10.20.153.11 |
規(guī)則:
- "=>"之前的為消費(fèi)者匹配條件,所有參數(shù)和消費(fèi)者的URL進(jìn)行對(duì)比,當(dāng)消費(fèi)者滿足匹配條件時(shí),對(duì)該消費(fèi)者執(zhí)行后面的過濾規(guī)則。
- "=>"之后為提供者地址列表的過濾條件,所有參數(shù)和提供者的URL進(jìn)行對(duì)比,消費(fèi)者最終只拿到過濾后的地址列表。
- 如果匹配條件為空,表示對(duì)所有消費(fèi)方應(yīng)用,如:=> host != 10.20.153.11
- 如果過濾條件為空,表示禁止訪問,如:host = 10.20.153.10 =>
表達(dá)式:
- 參數(shù)支持:
- 服務(wù)調(diào)用信息,如:method,?argument?等?(暫不支持參數(shù)路由)
- URL本身的字段,如:protocol, host, port 等
- 以及URL上的所有參數(shù),如:application, organization 等
- 條件支持:
- 等號(hào)"="表示"匹配",如:host = 10.20.153.10
- 不等號(hào)"!="表示"不匹配",如:host != 10.20.153.10
- 值支持:
- 以逗號(hào)","分隔多個(gè)值,如:host != 10.20.153.10,10.20.153.11
- 以星號(hào)"*"結(jié)尾,表示通配,如:host != 10.20.*
- 以美元符"$"開頭,表示引用消費(fèi)者參數(shù),如:host = $host
Directory
接口定義
public interface Directory<T> extends Node {/*** get service type.* * @return service type.*/Class<T> getInterface();/*** list invokers.* * @return invokers*/List<Invoker<T>> list(Invocation invocation) throws RpcException;}目錄服務(wù)定義了一個(gè)核心接口list,就是列舉出某個(gè)接口在目錄中的所有服務(wù)列表。
抽象實(shí)現(xiàn)AbstractDirectory
提供了一個(gè)抽象的目錄實(shí)現(xiàn)類,源碼如下。
/*** 增加router的Directory* * @author chao.liuc*/ public abstract class AbstractDirectory<T> implements Directory<T> {// 日志輸出private static final Logger logger = LoggerFactory.getLogger(AbstractDirectory.class);private final URL url ;private volatile boolean destroyed = false;private volatile URL consumerUrl ;private volatile List<Router> routers;public AbstractDirectory(URL url) {this(url, null);}public AbstractDirectory(URL url, List<Router> routers) {this(url, url, routers);}public AbstractDirectory(URL url, URL consumerUrl, List<Router> routers) {if (url == null)throw new IllegalArgumentException("url == null");this.url = url;this.consumerUrl = consumerUrl;setRouters(routers);}public List<Invoker<T>> list(Invocation invocation) throws RpcException {if (destroyed){throw new RpcException("Directory already destroyed .url: "+ getUrl());}List<Invoker<T>> invokers = doList(invocation);List<Router> localRouters = this.routers; // local referenceif (localRouters != null && localRouters.size() > 0) {for (Router router: localRouters){try {if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {invokers = router.route(invokers, getConsumerUrl(), invocation);}} catch (Throwable t) {logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);}}}return invokers;}public URL getUrl() {return url;}public List<Router> getRouters(){return routers;}public URL getConsumerUrl() {return consumerUrl;}public void setConsumerUrl(URL consumerUrl) {this.consumerUrl = consumerUrl;}protected void setRouters(List<Router> routers){// copy listrouters = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);// append url routerString routerkey = url.getParameter(Constants.ROUTER_KEY);if (routerkey != null && routerkey.length() > 0) {RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerkey);routers.add(routerFactory.getRouter(url));}// append mock invoker selectorrouters.add(new MockInvokersSelector());Collections.sort(routers);this.routers = routers;}public boolean isDestroyed() {return destroyed;}public void destroy(){destroyed = true;}protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException ;}list方法的實(shí)現(xiàn)邏輯是:先檢查目錄是否銷毀狀態(tài),若已經(jīng)銷毀則拋出異常;調(diào)用抽象方法doList實(shí)現(xiàn)真正的從目錄服務(wù)中獲取invoker列表,該方法需要子類實(shí)現(xiàn);循環(huán)對(duì)象中的路由器列表,若路由器url為null或者參數(shù)runtime為true則調(diào)用該路由器的route方法進(jìn)行路由,將返回的invoker列表替換為路由后的結(jié)果; 返回最終的invoker列表。
setRouters方法是設(shè)置路由器列表,除了參數(shù)參入的routers之外,還會(huì)追加2個(gè)默認(rèn)的路由器,一個(gè)是參數(shù)router指定的routerFactory獲得的router,另外一個(gè)是MockInvokersSelector對(duì)象;
默認(rèn)實(shí)現(xiàn)StaticDirectory
模塊還提供了一個(gè)默認(rèn)目錄實(shí)現(xiàn)類StaticDirectory,它是一個(gè)靜態(tài)的內(nèi)存緩存目錄服務(wù)實(shí)現(xiàn)。源碼如下:
public class StaticDirectory<T> extends AbstractDirectory<T> {private final List<Invoker<T>> invokers;public StaticDirectory(List<Invoker<T>> invokers){this(null, invokers, null);}public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers){this(null, invokers, routers);}public StaticDirectory(URL url, List<Invoker<T>> invokers) {this(url, invokers, null);}public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {super(url == null && invokers != null && invokers.size() > 0 ? invokers.get(0).getUrl() : url, routers);if (invokers == null || invokers.size() == 0)throw new IllegalArgumentException("invokers == null");this.invokers = invokers;}public Class<T> getInterface() {return invokers.get(0).getInterface();}public boolean isAvailable() {if (isDestroyed()) {return false;}for (Invoker<T> invoker : invokers) {if (invoker.isAvailable()) {return true;}}return false;}public void destroy() {if(isDestroyed()) {return;}super.destroy();for (Invoker<T> invoker : invokers) {invoker.destroy();}invokers.clear();}@Overrideprotected List<Invoker<T>> doList(Invocation invocation) throws RpcException {return invokers;}}它的doList方法的實(shí)現(xiàn)是直接將屬性invokers的值返回,非常簡(jiǎn)單。
此外還有一個(gè)RegistryDirectory的實(shí)現(xiàn)類,該類是整合了注冊(cè)中心和目錄服務(wù)。
NEXT
因?yàn)榭紤]到本模塊與dubbo-registry相關(guān)性較大,接下來我們將研究dubbo-registry-api和dubbo-registry-default模塊的源碼。
?
轉(zhuǎn)載于:https://my.oschina.net/ywbrj042/blog/689818
總結(jié)
以上是生活随笔為你收集整理的dubbo源码分析系列——dubbo-cluster模块源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CentOS6.x下配置sendmail
- 下一篇: Server2012Web方式修改用户名