【dubbo源码解析】 --- dubbo集群容错(cluster)、负载均衡(loadbalance)底层原理探析 + 扩展自己的集群容错、负载均衡组件
本文對(duì)應(yīng)源碼地址:https://github.com/nieandsun/dubbo-study
文章目錄
- 1 集群容錯(cuò)和負(fù)載均衡的概念
- 2 dubbo集群容錯(cuò) + 負(fù)載均衡底層原理
- 3 簡(jiǎn)單測(cè)試
- 4 自己擴(kuò)展一個(gè)dubbo集群容錯(cuò)組件和負(fù)載均衡組件
- 4.1 擴(kuò)展一個(gè)集群容錯(cuò)組件(Cluster)
- 4.2 擴(kuò)展一個(gè)loadbalance組件
- 4.3 不要忘了在MATE-INF/dubbo文件夾下指定這些SPI擴(kuò)展組件
- 4.3 測(cè)試
1 集群容錯(cuò)和負(fù)載均衡的概念
摘自dubbo官方博客:http://dubbo.apache.org/zh-cn/blog/dubbo-loadbalance.html
摘自dubbo官方博客:http://dubbo.apache.org/zh-cn/blog/dubbo-loadbalance.html
2 dubbo集群容錯(cuò) + 負(fù)載均衡底層原理
首先應(yīng)該明確的是集群容錯(cuò) + 負(fù)載均衡其實(shí)都是由消費(fèi)端的邏輯。
由《【dubbo源碼解析】— dubbo的服務(wù)暴露+服務(wù)消費(fèi)(RPC調(diào)用)底層原理深入探析》文章的知識(shí)可知,在不引入注冊(cè)中心時(shí),在消費(fèi)端, protocol.refer 得到 invoker 對(duì)象, 并拿著該Invoker做一個(gè)【可以調(diào)用服務(wù)端的服務(wù)】的代理對(duì)象作為@Reference標(biāo)注的對(duì)象。
那加入注冊(cè)中心邏輯后,dubbo是如何在RegistryProtocol對(duì)象中將集群容錯(cuò)功能掛入到Dubbo的RPC鏈條中的呢?
Dubbo 在這里玩了個(gè)心眼。 真正的過程走得百繞千回, 看如下這段代碼:
RegistryProtocol. doRefer 方法:
原來的 protocol 被傳進(jìn)RegistryDirectory 類中去了, doRefer 返回的 invoker對(duì)象, 是 cluster.join(directory)返回的 invoker。
而cluster 是一個(gè)擴(kuò)展接口, 因此, 這個(gè)接口方法最終執(zhí)行的對(duì)象, 是根據(jù)容錯(cuò)策略自適配出來的對(duì)象, 如果 url 中不指定則默認(rèn)是 failover 再看 failover 實(shí)現(xiàn)類的邏輯, 非常簡(jiǎn)單, 只是返回一個(gè)FailoverClusterInvoker 對(duì)象:
public class FailoverCluster implements Cluster {public static final String NAME = "failover";public FailoverCluster() {}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new FailoverClusterInvoker(directory);} }再看下FailoverClusterInvoker中invoke方法調(diào)用后的的具體實(shí)現(xiàn)代碼doInvoke的邏輯:
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {//...... (省略部分代碼)//容錯(cuò)次數(shù)int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;if (len <= 0) {len = 1;}//...... (省略部分代碼)for(int i = 0; i < len; ++i) {//容錯(cuò)策略//Reselect before retry to avoid a change of candidate `invokers`.//NOTE: if `invokers` changed, then `invoked` also lose accuracy.if (i > 0) { checkWhetherDestroyed();copyinvokers = list(invocation);// check againcheckInvokers(copyinvokers, invocation);}Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);//記錄已經(jīng)被選擇的Invoker --> 用作進(jìn)行負(fù)載判斷invoked.add(invoker);//...... (省略部分代碼)try {Result result = invoker.invoke(invocation);//...... (省略部分代碼)Result var12 = result;return var12;}//...... (省略部分代碼)}}此 invoker 的邏輯:
(1)按重試次數(shù) for 循環(huán), 只要不是正常返回, 則再試一次
(2)調(diào)用 select 方法,大致邏輯為:先通過SPI機(jī)制取得一個(gè) loadbalance 對(duì)象—>然后根據(jù)獲取的loadbalance 對(duì)象select出來一個(gè)封裝了【可以調(diào)用服務(wù)端的服務(wù)】的Invoker —> 然后執(zhí)行此 invoker 對(duì)象得到結(jié)果。
需要注意的是:
此 select 方法, 是從一組 invoker(即文章《【dubbo源碼解析】— dubbo的服務(wù)注冊(cè)與發(fā)現(xiàn)機(jī)制底層原理探析》中講到的RegistryDirectory對(duì)象的urlInvokerMap容器 中緩存的根據(jù)每一個(gè)服務(wù)端的URL生成的Invoker對(duì)象) 中選擇出來的一個(gè) invoker。
畫個(gè)簡(jiǎn)圖來描述上訴具體邏輯如下:
3 簡(jiǎn)單測(cè)試
注意: 如果消費(fèi)端未配置集群容錯(cuò) + 負(fù)載均衡策略的話,消費(fèi)端會(huì)通過注冊(cè)中心獲取到服務(wù)端配置的參數(shù),也就是說這些參數(shù)服務(wù)端都會(huì)以URL的形式給注冊(cè)中心,然后消費(fèi)端根據(jù)獲取到的URL來根據(jù)SPI機(jī)制選擇到底使用哪種策略。
服務(wù)端:
ExtensionLoader<Protocol> protocolLoader = ExtensionLoader.getExtensionLoader(Protocol.class); ExtensionLoader<ProxyFactory> proxyLoader = ExtensionLoader.getExtensionLoader(ProxyFactory.class);//注冊(cè)中心服務(wù)--zk final URL registryUrl = URL.valueOf("registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?registry=zookeeper");//支持的協(xié)議:dubbo,http,hessian,rmi URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:9001/com.nrsc.service.InvokerDemoService");@Test public void serverRpc() throws IOException {InvokerDemoService service = new InvokerDemoServiceImpl("yoyo");//生成代理工廠// --- 由URL確定到底是動(dòng)態(tài)代理工廠(JdkProxyFactory)還是靜態(tài)代理工廠(JavassistProxyFactory)// --- 默認(rèn)情況下為靜態(tài)代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//下面這兩句話完全可以放在外面 ---> 如果寫在外面,這里的代碼就和上文講到的RPC完整鏈條的代碼一致了//這里為了測(cè)試消費(fèi)端可以動(dòng)態(tài)監(jiān)測(cè)到服務(wù)端的發(fā)布/下線,所以寫在了這里serviceUrl = serviceUrl.setPort(9001);//url中加入負(fù)載均衡和集群容錯(cuò)參數(shù)//serviceUrl = serviceUrl.addParameter("loadbalance", "consistenthash");//serviceUrl = serviceUrl.addParameter("cluster", "failfast");//啟動(dòng)自己擴(kuò)展的loadbalance和cluster組件//serviceUrl = serviceUrl.addParameter("loadbalance", "first");//serviceUrl = serviceUrl.addParameter("cluster", "failsms");URL newRegistryUrl = registryUrl.addParameter(Constants.EXPORT_KEY, serviceUrl.toFullString());Invoker<InvokerDemoService> serviceInvoker = proxy.getInvoker(service, InvokerDemoService.class, newRegistryUrl);//獲取具體的協(xié)議Protocol protocol = protocolLoader.getAdaptiveExtension();Exporter<InvokerDemoService> exporter = protocol.export(serviceInvoker);System.out.println("server 啟動(dòng)協(xié)議:" + serviceUrl.getProtocol());// 保證服務(wù)一直開著System.in.read();exporter.unexport(); }/***** 除了接口外,其他和serverRpc1()一樣,主要用來測(cè)試消費(fèi)端可以動(dòng)態(tài)監(jiān)測(cè)到服務(wù)端的發(fā)布/下線* @throws IOException*/ @Test public void serverRpc2() throws IOException {InvokerDemoService service = new InvokerDemoServiceImpl("nrsc");//生成代理工廠// --- 由URL確定到底是動(dòng)態(tài)代理工廠(JdkProxyFactory)還是靜態(tài)代理工廠(JavassistProxyFactory)// --- 默認(rèn)情況下為靜態(tài)代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//下面這兩句話完全可以放在外面 ---> 如果寫在外面,這里的代碼就和上文講到的RPC完整鏈條的代碼一致了//這里為了測(cè)試消費(fèi)端可以動(dòng)態(tài)監(jiān)測(cè)到服務(wù)端的發(fā)布/下線,所以寫在了這里serviceUrl = serviceUrl.setPort(9002);//url中加入負(fù)載均衡和集群容錯(cuò)參數(shù)//serviceUrl = serviceUrl.addParameter("loadbalance", "consistenthash");//serviceUrl = serviceUrl.addParameter("cluster", "failfast");//啟動(dòng)自己擴(kuò)展的loadbalance和cluster組件//serviceUrl = serviceUrl.addParameter("loadbalance", "first");//serviceUrl = serviceUrl.addParameter("cluster", "failsms");URL newRegistryUrl = registryUrl.addParameter(Constants.EXPORT_KEY, serviceUrl.toFullString());Invoker<InvokerDemoService> serviceInvoker = proxy.getInvoker(service, InvokerDemoService.class, newRegistryUrl);//獲取具體的協(xié)議Protocol protocol = protocolLoader.getAdaptiveExtension();Exporter<InvokerDemoService> exporter = protocol.export(serviceInvoker);System.out.println("server 啟動(dòng)協(xié)議:" + serviceUrl.getProtocol());// 保證服務(wù)一直開著System.in.read();exporter.unexport(); }消費(fèi)端:
@Test public void clientRpc() throws IOException {Protocol protocol = protocolLoader.getAdaptiveExtension();//生成代理工廠ProxyFactory proxy = proxyLoader.getAdaptiveExtension();//由代理工廠生成Invoker對(duì)象Invoker<InvokerDemoService> referInvoker = protocol.refer(InvokerDemoService.class, registryUrl);//生成DemoService的代理類InvokerDemoService service = proxy.getProxy(referInvoker);for (int i = 0; i < 6; i++) {String result = service.sayHello(registryUrl.getProtocol() + "調(diào)用");System.out.println(result);}// 保證服務(wù)一直開著 ,測(cè)試消費(fèi)端可以動(dòng)態(tài)監(jiān)測(cè)到服務(wù)端的發(fā)布/下線//System.in.read(); }測(cè)試結(jié)果1:
分別啟用兩個(gè)服務(wù)端,然后啟用消費(fèi)端,在未配置集群容錯(cuò)+ 負(fù)載均衡的情況下調(diào)用結(jié)果和結(jié)論如下:
測(cè)試結(jié)果2:
放開服務(wù)端代碼中關(guān)于集群容錯(cuò)+ 負(fù)載均衡的注釋代碼,重新啟用兩個(gè)服務(wù)端,然后啟用消費(fèi)端,調(diào)用結(jié)果和結(jié)論如下:
4 自己擴(kuò)展一個(gè)dubbo集群容錯(cuò)組件和負(fù)載均衡組件
4.1 擴(kuò)展一個(gè)集群容錯(cuò)組件(Cluster)
由dubbo源碼可知要想擴(kuò)展一個(gè)集群容錯(cuò)組件,需要做兩件事
- (1)實(shí)現(xiàn)Cluster接口,并重寫其Join方法
- (2)在Join方法里返回一個(gè)XXXClusterInvoker 用于包裝含有【可以調(diào)用服務(wù)端的服務(wù)】的Invoker ,且集群容錯(cuò)的邏輯,就寫在XXXClusterInvoker 的Invoker方法里
這里為了簡(jiǎn)便期間,我以 FailfastClusterInvoker作為XXXClusterInvoker ,則自定的Cluster代碼如下:
/**** 當(dāng)然如果想新擴(kuò)展一個(gè)Cluster組件,肯定還要配套弄一個(gè)XXXClusterInvoker,這里就直接使用FailfastClusterInvoker代替了*/ public class FailSmsCluster implements Cluster {@Overridepublic <T> Invoker<T> join(Directory<T> directory) throws RpcException {sendSms();return new FailfastClusterInvoker<>(directory);}private void sendSms() {System.out.println("send sms notify!");} }4.2 擴(kuò)展一個(gè)loadbalance組件
擴(kuò)展loadbalance組件,要比擴(kuò)展Cluster組件簡(jiǎn)單的多,只需要實(shí)現(xiàn)LoadBalance接口,并重寫其select方法就可以了,這里提供一個(gè)比較簡(jiǎn)單的loadbalance組件 — 只選擇第一個(gè)注冊(cè)的Invoker。
public class FirstLoadBalance implements LoadBalance {@Overridepublic <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {System.out.println("FirstLoadBalance : Select the first invoker...");return invokers.get(0);} }4.3 不要忘了在MATE-INF/dubbo文件夾下指定這些SPI擴(kuò)展組件
***
4.3 測(cè)試
打開服務(wù)端的如下代碼,并重啟兩個(gè)服務(wù)端 —>然后再重啟消費(fèi)端
//啟動(dòng)自己擴(kuò)展的loadbalance和cluster組件serviceUrl = serviceUrl.addParameter("loadbalance", "first");serviceUrl = serviceUrl.addParameter("cluster", "failsms");調(diào)用結(jié)果和結(jié)論如下:
end!
總結(jié)
以上是生活随笔為你收集整理的【dubbo源码解析】 --- dubbo集群容错(cluster)、负载均衡(loadbalance)底层原理探析 + 扩展自己的集群容错、负载均衡组件的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: irqbalance详解(其一)
- 下一篇: 【小工具】hdfs balance脚本(