Dubbo 源码分析 - 服务导出
1.服務導出過程
本篇文章,我們來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始于 Spring 容器發布刷新事件,Dubbo 在接收到事件后,會立即執行服務導出邏輯。整個邏輯大致可分為三個部分,第一是前置工作,主要用于檢查參數,組裝 URL。第二是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三是向注冊中心注冊服務,用于服務發現。本篇文章將會對這三個部分代碼進行詳細的分析,在分析之前,我們先來了解一下服務的導出過程。
Dubbo 支持兩種服務導出方式,分別延遲導出和立即導出。延遲導出的入口是 ServiceBean 的 afterPropertiesSet 方法,立即導出的入口是 ServiceBean 的 onApplicationEvent 方法。本文打算分析服務延遲導出過程,因此不會分析 afterPropertiesSet 方法。下面從 onApplicationEvent 方法說起,該方法收到 Spring 容器的刷新事件后,會調用 export 方法執行服務導出操作。服務導出之前,要進行對一系列的配置進行檢查,以及生成 URL。準備工作做完,隨后開始導出服務。首先導出到本地,然后再導出到遠程。導出到本地就是將服務導出到 JVM 中,此過程比較簡單。導出到遠程的過程則要復雜的多,以 dubbo 協議為例,DubboProtocol 類的 export 方法將會被調用。該方法主要用于創建 Exporter 和 ExchangeServer。ExchangeServer 本身并不具備通信能力,需要借助更底層的 Server 實現通信功能。因此,在創建 ExchangeServer 實例時,需要先創建 NettyServer 或者 MinaServer 實例,并將實例作為參數傳給 ExchangeServer 實現類的構造方法。ExchangeServer 實例創建完成后,導出服務到遠程的過程也就接近尾聲了。服務導出結束后,服務消費者即可通過直聯的方式消費服務。當然,一般我們不會使用直聯的方式消費服務。所以,在服務導出結束后,緊接著要做的事情是向注冊中心注冊服務。此時,客戶端即可從注冊中心發現服務。
以上就是 Dubbo 服務導出的過程,比較復雜。下面開始分析源碼,從源碼的角度展現整個過程。
?2.源碼分析
一場 Dubbo 源碼分析的馬拉松比賽即將開始,現在我們站在賽道的起點進行熱身準備。本次比賽的起點位置位于 ServiceBean 的 onApplicationEvent 方法處。好了,發令槍響了,我將和一些朋友從 onApplicationEvent 方法處出發,探索 Dubbo 服務導出的全過程。下面我們來看一下 onApplicationEvent 方法的源碼。
| 1 2 3 4 5 6 7 | public void onApplicationEvent(ContextRefreshedEvent event) {// 是否有延遲導出 && 是否已導出 && 是不是已被取消導出if (isDelay() && !isExported() && !isUnexported()) {// 導出服務export();} } |
onApplicationEvent 是一個事件響應方法,該方法會在收到 Spring 上下文刷新事件后執行。這個方法首先會根據條件決定是否導出服務,比如有些服務設置了延時導出,那么此時就不應該在此處導出。還有一些服務已經被導出了,或者當前服務被取消導出了,此時也不能再次導出相關服務。注意這里的 isDelay 方法,這個方法字面意思是“是否延遲導出服務”,返回 true 表示延遲導出,false 表示不延遲導出。但是該方法真實意思卻并非如此,當方法返回 true 時,表示無需延遲導出。返回 false 時,表示需要延遲導出。與字面意思恰恰相反,讓人覺得很奇怪。下面我們來看一下這個方法的邏輯。
| 1 2 3 4 5 6 7 8 9 10 11 12 | // -☆- ServiceBean private boolean isDelay() {// 獲取 delayInteger delay = getDelay();ProviderConfig provider = getProvider();if (delay == null && provider != null) {// 如果前面獲取的 delay 為空,這里繼續獲取delay = provider.getDelay();}// 判斷 delay 是否為空,或者等于 -1return supportedApplicationListener && (delay == null || delay == -1); } |
暫時忽略 supportedApplicationListener 這個條件,當 delay 為空,或者等于-1時,該方法返回 true,而不是 false。這個方法的返回值讓人有點困惑,因此我重構了該方法的代碼,并給 Dubbo 提了一個 Pull Request,最終這個 PR 被合到了 Dubbo 主分支中。詳細請參見?Dubbo #2686。
現在解釋一下 supportedApplicationListener 變量含義,該變量用于表示當前的 Spring 容器是否支持 ApplicationListener,這個值初始為 false。在 Spring 容器將自己設置到 ServiceBean 中時,ServiceBean 的 setApplicationContext 方法會檢測 Spring 容器是否支持 ApplicationListener。若支持,則將 supportedApplicationListener 置為 true。代碼就不分析了,大家自行查閱了解。
ServiceBean 是 Dubbo 與 Spring 框架進行整合的關鍵,可以看做是兩個框架之間的橋梁。具有同樣作用的類還有 ReferenceBean。ServiceBean 實現了 Spring 的一些拓展接口,有 FactoryBean、ApplicationContextAware、ApplicationListener、DisposableBean 和 BeanNameAware。這些接口我在?Spring 源碼分析系列文章中介紹過,大家可以參考一下,這里就不贅述了。
現在我們知道了 Dubbo 服務導出過程的起點。那么接下來,我們快馬加鞭,繼續進行比賽。賽程預告,下一站是“服務導出的前置工作”。
?2.1 前置工作
前置工作主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導出服務之前,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補充缺省配置。配置檢查完成后,接下來需要根據這些配置組裝 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作為配置載體,所有的拓展點都是通過 URL 獲取配置。這一點,官方文檔中有所說明。
采用 URL 作為配置信息的統一格式,所有擴展點都通過傳遞 URL 攜帶配置信息。
接下來,我們先來分析配置檢查部分的源碼,隨后再來分析 URL 組裝部分的源碼。
?2.1.1 檢查配置
本節我們接著前面的源碼向下分析,前面說過 onApplicationEvent 方法在經過一些判斷后,會決定是否調用 export 方法導出服務。那么下面我們從 export 方法開始進行分析,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | public synchronized void export() {if (provider != null) {// 獲取 export 和 delay 配置if (export == null) {export = provider.getExport();}if (delay == null) {delay = provider.getDelay();}}// 如果 export 為 false,則不導出服務if (export != null && !export) {return;}if (delay != null && delay > 0) { // delay > 0,延時導出服務delayExportExecutor.schedule(new Runnable() {@Overridepublic void run() {doExport();}}, delay, TimeUnit.MILLISECONDS);} else { // 立即導出服務doExport();} } |
export 對兩個配置進行了檢查,并配置執行相應的動作。首先是 export,這個配置決定了是否導出服務。有時候我們只是想本地啟動服務進行一些調試工作,這個時候我們并不希望把本地啟動的服務暴露出去給別人調用。此時,我們就可以通過配置 export 禁止服務導出,比如:
| 1 | <dubbo:provider export="false" /> |
delay 見名知意了,用于延遲導出服務。下面,我們繼續分析源碼,這次要分析的是 doExport 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 | protected synchronized void doExport() {if (unexported) {throw new IllegalStateException("Already unexported!");}if (exported) {return;}exported = true;// 檢測 interfaceName 是否合法if (interfaceName == null || interfaceName.length() == 0) {throw new IllegalStateException("interface not allow null!");}// 檢測 provider 是否為空,為空則新建一個,并通過系統變量為其初始化checkDefault();// 下面幾個 if 語句用于檢測 provider、application 等核心配置類對象是否為空,// 若為空,則嘗試從其他配置類對象中獲取相應的實例。if (provider != null) {if (application == null) {application = provider.getApplication();}if (module == null) {module = provider.getModule();}if (registries == null) {...}if (monitor == null) {...}if (protocols == null) {...}}if (module != null) {if (registries == null) {registries = module.getRegistries();}if (monitor == null) {...}}if (application != null) {if (registries == null) {registries = application.getRegistries();}if (monitor == null) {...}}// 檢測 ref 是否泛化服務類型if (ref instanceof GenericService) {// 設置 interfaceClass 為 GenericService.classinterfaceClass = GenericService.class;if (StringUtils.isEmpty(generic)) {// 設置 generic = "true"generic = Boolean.TRUE.toString();}} else { // ref 非 GenericService 類型try {interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader());} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 對 interfaceClass,以及 <dubbo:method> 必要字段進行檢查checkInterfaceAndMethods(interfaceClass, methods);// 對 ref 合法性進行檢測checkRef();// 設置 generic = "false"generic = Boolean.FALSE.toString();}// local 屬性 Dubbo 官方文檔中沒有說明,不過 local 和 stub 在功能應該是一致的,用于配置本地存根if (local != null) {if ("true".equals(local)) {local = interfaceName + "Local";}Class<?> localClass;try {// 獲取本地存根類localClass = ClassHelper.forNameWithThreadContextClassLoader(local);} catch (ClassNotFoundException e) {throw new IllegalStateException(e.getMessage(), e);}// 檢測本地存根類是否可賦值給接口類,若不可賦值則會拋出異常,提醒使用者本地存根類類型不合法if (!interfaceClass.isAssignableFrom(localClass)) {throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceName);}}// stub 和 local 均用于配置本地存根if (stub != null) {// 此處的代碼和上一個 if 分支的代碼基本一致,這里省略了}// 檢測各種對象是否為空,為空則新建,或者拋出異常checkApplication();checkRegistry();checkProtocol();appendProperties(this);checkStubAndMock(interfaceClass);if (path == null || path.length() == 0) {path = interfaceName;}// 導出服務doExportUrls();// ProviderModel 表示服務提供者模型,此對象中存儲了和服務提供者相關的信息。// 比如服務的配置信息,服務實例等。每個被導出的服務對應一個 ProviderModel。// ApplicationModel 持有所有的 ProviderModel。ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); } |
以上就是配置檢查的相關分析,代碼比較多,需要大家耐心看一下。下面對配置檢查的邏輯進行簡單的總結,如下:
配置檢查并非本文重點,因此我不打算對 doExport 方法所調用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了 appendProperties 方法稍微復雜一些,其他方法都還好。因此,大家可自行進行分析。好了,其他的就不多說了,繼續向下分析。
?2.1.2 多協議多注冊中心導出服務
Dubbo 允許我們使用不同的協議導出服務,也允許我們向多個注冊中心注冊服務。Dubbo 在 doExportUrls 方法中對多協議,多注冊中心進行了支持。相關代碼如下:
| 1 2 3 4 5 6 7 8 | private void doExportUrls() {// 加載注冊中心鏈接List<URL> registryURLs = loadRegistries(true);// 遍歷 protocols,導出每個服務for (ProtocolConfig protocolConfig : protocols) {doExportUrlsFor1Protocol(protocolConfig, registryURLs);} } |
上面代碼比較簡單,首先是通過 loadRegistries 加載注冊中心鏈接,然后再遍歷 ProtocolConfig 集合導出每個服務。并在導出服務的過程中,將服務注冊到注冊中心處。下面,我們先來看一下 loadRegistries 方法的邏輯。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | protected List<URL> loadRegistries(boolean provider) {// 檢測是否存在注冊中心配置類,不存在則拋出異常checkRegistry();List<URL> registryList = new ArrayList<URL>();if (registries != null && !registries.isEmpty()) {for (RegistryConfig config : registries) {String address = config.getAddress();if (address == null || address.length() == 0) {// 若 address 為空,則將其設為 0.0.0.0address = Constants.ANYHOST_VALUE;}// 從系統屬性中加載注冊中心地址String sysaddress = System.getProperty("dubbo.registry.address");if (sysaddress != null && sysaddress.length() > 0) {address = sysaddress;}// 判斷 address 是否合法if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) {Map<String, String> map = new HashMap<String, String>();// 添加 ApplicationConfig 中的字段信息到 map 中appendParameters(map, application);// 添加 RegistryConfig 字段信息到 map 中appendParameters(map, config);map.put("path", RegistryService.class.getName());map.put("dubbo", Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}if (!map.containsKey("protocol")) {if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) {map.put("protocol", "remote");} else {map.put("protocol", "dubbo");}}// 解析得到 URL 列表,address 可能包含多個注冊中心 ip,// 因此解析得到的是一個 URL 列表List<URL> urls = UrlUtils.parseURLs(address, map);for (URL url : urls) {url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());// 將 URL 協議頭設置為 registryurl = url.setProtocol(Constants.REGISTRY_PROTOCOL);// 通過判斷條件,決定是否添加 url 到 registryList 中,條件如下:// (服務提供者 && register = true 或 null) // || (非服務提供者 && subscribe = true 或 null)if ((provider && url.getParameter(Constants.REGISTER_KEY, true))|| (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {registryList.add(url);}}}}}return registryList; } |
上面代碼不是很復雜,包含如下邏輯:
關于多協議多注冊中心導出服務就先分析到這,代碼不是很多,就不過多敘述了。接下來分析 URL 組裝過程。
?2.1.3 組裝 URL
配置檢查完畢后,緊接著要做的事情是根據配置,以及其他一些信息組裝 URL。前面說過,URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。URL 之于 Dubbo,猶如水之于魚,非常重要。大家在閱讀 Dubbo 服務導出相關源碼的過程中,要注意 URL 內容的變化。既然 URL 如此重要,那么下面我們來了解一下 URL 組裝的過程。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {String name = protocolConfig.getName();// 如果協議名為空,或空串,則將協議名變量設置為 dubboif (name == null || name.length() == 0) {name = "dubbo";}Map<String, String> map = new HashMap<String, String>();// 添加 side、版本、時間戳以及進程號等信息到 map 中map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));if (ConfigUtils.getPid() > 0) {map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));}// 通過反射將對象的字段信息到 map 中appendParameters(map, application);appendParameters(map, module);appendParameters(map, provider, Constants.DEFAULT_KEY);appendParameters(map, protocolConfig);appendParameters(map, this);// methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標簽的配置信息if (methods != null && !methods.isEmpty()) {// 這段代碼用于添加 Callback 配置到 map 中,代碼太長,待會單獨分析}// 檢測 generic 是否為 "true",并根據檢測結果向 map 中添加不同的信息if (ProtocolUtils.isGeneric(generic)) {map.put(Constants.GENERIC_KEY, generic);map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {String revision = Version.getVersion(interfaceClass, version);if (revision != null && revision.length() > 0) {map.put("revision", revision);}// 為接口生成包裹類 Wrapper,Wrapper 中包含了接口的詳細信息,比如接口方法名數組,字段信息等String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();// 添加方法名到 map 中,如果包含多個方法名,則用逗號隔開,比如 method = init,destroyif (methods.length == 0) {logger.warn("NO method found in service interface ...");map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);} else {// 將逗號作為分隔符連接方法名,并將連接后的字符串放入 map 中map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));}}// 添加 token 到 map 中if (!ConfigUtils.isEmpty(token)) {if (ConfigUtils.isDefault(token)) {map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());} else {map.put(Constants.TOKEN_KEY, token);}}// 判斷協議名是否為 injvmif (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {protocolConfig.setRegister(false);map.put("notify", "false");}// 獲取上下文路徑String contextPath = protocolConfig.getContextpath();if ((contextPath == null || contextPath.length() == 0) && provider != null) {contextPath = provider.getContextpath();}// 獲取 host 和 portString host = this.findConfigedHosts(protocolConfig, registryURLs, map);Integer port = this.findConfigedPorts(protocolConfig, name, map);// 組裝 URLURL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);// 省略無關代碼 } |
上面的代碼首先是將一些信息,比如版本、時間戳、方法名以及各種配置對象的字段信息放入到 map 中,map 中的內容將作為 URL 的查詢字符串。構建好 map 后,緊接著是獲取上下文路徑、主機名以及端口號等信息。最后將 map 和主機名等數據傳給 URL 構造方法創建 URL 對象。需要注意的是,這里出現的 URL 并非 java.net.URL,而是 com.alibaba.dubbo.common.URL。
上面省略了一段代碼,這里簡單分析一下。這段代碼用于檢測 <dubbo:argument> 標簽中的配置信息,并將相關配置添加到 map 中。代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// ...// methods 為 MethodConfig 集合,MethodConfig 中存儲了 <dubbo:method> 標簽的配置信息if (methods != null && !methods.isEmpty()) {for (MethodConfig method : methods) {// 添加 MethodConfig 對象的字段信息到 map 中,鍵 = 方法名.屬性名。// 比如存儲 <dubbo:method name="sayHello" retries="2"> 對應的 MethodConfig,// 鍵 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}appendParameters(map, method, method.getName());String retryKey = method.getName() + ".retry";if (map.containsKey(retryKey)) {String retryValue = map.remove(retryKey);// 檢測 MethodConfig retry 是否為 false,若是,則設置重試次數為0if ("false".equals(retryValue)) {map.put(method.getName() + ".retries", "0");}}// 獲取 ArgumentConfig 列表List<ArgumentConfig> arguments = method.getArguments();if (arguments != null && !arguments.isEmpty()) {for (ArgumentConfig argument : arguments) {// 檢測 type 屬性是否為空,或者空串(分支1 ??)if (argument.getType() != null && argument.getType().length() > 0) {Method[] methods = interfaceClass.getMethods();if (methods != null && methods.length > 0) {for (int i = 0; i < methods.length; i++) {String methodName = methods[i].getName();// 比對方法名,查找目標方法if (methodName.equals(method.getName())) {Class<?>[] argtypes = methods[i].getParameterTypes();if (argument.getIndex() != -1) {// 檢測 ArgumentConfig 中的 type 屬性與方法參數列表// 中的參數名稱是否一致,不一致則拋出異常(分支2 ??)if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {// 添加 ArgumentConfig 字段信息到 map 中,// 鍵前綴 = 方法名.index,比如:// map = {"sayHello.3": true}appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config error: ...");}} else { // 分支3 ??for (int j = 0; j < argtypes.length; j++) {Class<?> argclazz = argtypes[j];// 從參數類型列表中查找類型名稱為 argument.type 的參數if (argclazz.getName().equals(argument.getType())) {appendParameters(map, argument, method.getName() + "." + j);if (argument.getIndex() != -1 && argument.getIndex() != j) {throw new IllegalArgumentException("argument config error: ...");}}}}}}}// 用戶未配置 type 屬性,但配置了 index 屬性,且 index != -1} else if (argument.getIndex() != -1) { // 分支4 ??// 添加 ArgumentConfig 字段信息到 map 中appendParameters(map, argument, method.getName() + "." + argument.getIndex());} else {throw new IllegalArgumentException("argument config must set index or type");}}}}}// ... } |
上面這段代碼 for 循環和 if else 分支嵌套太多,導致層次太深,不利于閱讀,需要耐心看一下。大家在看這段代碼時,注意把幾個重要的條件分支找出來。只要理解了這幾個分支的意圖,就可以弄懂這段代碼。我在上面代碼中用??符號標識出了4個重要的分支,下面用偽代碼解釋一下這幾個分支的含義。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | // 獲取 ArgumentConfig 列表 for (遍歷 ArgumentConfig 列表) {if (type 不為 null,也不為空串) { // 分支11. 通過反射獲取 interfaceClass 的方法列表for (遍歷方法列表) {1. 比對方法名,查找目標方法2. 通過反射獲取目標方法的參數類型數組 argtypesif (index != -1) { // 分支21. 從 argtypes 數組中獲取下標 index 處的元素 argType2. 檢測 argType 的名稱與 ArgumentConfig 中的 type 屬性是否一致3. 添加 ArgumentConfig 字段信息到 map 中,或拋出異常} else { // 分支31. 遍歷參數類型數組 argtypes,查找 argument.type 類型的參數2. 添加 ArgumentConfig 字段信息到 map 中}}} else if (index != -1) { // 分支41. 添加 ArgumentConfig 字段信息到 map 中} } |
在本節分析的源碼中,appendParameters 這個方法出現的次數比較多,該方法用于將對象字段信息添加到 map 中。實現上則是通過反射獲取目標對象的 getter 方法,并調用該方法獲取屬性值。然后再通過 getter 方法名解析出屬性名,比如從方法名 getName 中可解析出屬性 name。如果用戶傳入了屬性名前綴,此時需要將屬性名加入前綴內容。最后將 <屬性名,屬性值> 鍵值對存入到 map 中就行了。限于篇幅原因,這里就不分析 appendParameters 方法的源碼了,大家請自行分析。
?2.2 導出 Dubbo 服務
前置工作做完,接下來就可以進行服務導出工作。服務導出,分為導出到本地 (JVM),和導出到遠程。在深入分析服務導出源碼前,我們先來從宏觀層面上看一下服務導出邏輯。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {// 省略無關代碼if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) {// 加載 ConfiguratorFactory,并生成 Configurator 配置 urlurl = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getExtension(url.getProtocol()).getConfigurator(url).configure(url);}String scope = url.getParameter(Constants.SCOPE_KEY);// 如果 scope = none,則什么都不做if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {// scope != remote,導出到本地if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {exportLocal(url);}// scope != local,導出到遠程if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {if (registryURLs != null && !registryURLs.isEmpty()) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));// 加載監視器鏈接URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {// 將監視器鏈接作為參數添加到 url 中url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}String proxy = url.getParameter(Constants.PROXY_KEY);if (StringUtils.isNotEmpty(proxy)) {registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);}// 為服務提供類(ref)生成 InvokerInvoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));// DelegateProviderMetaDataInvoker 僅用于持有 Invoker 和 ServiceConfigDelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 導出服務,并生成 ExporterExporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}} else { // 不存在注冊中心,僅導出服務Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);Exporter<?> exporter = protocol.export(wrapperInvoker);exporters.add(exporter);}}}this.urls.add(url); } |
上面代碼根據 url 中的 scope 參數決定服務導出方式,分別如下:
- scope = none,不導出服務
- scope != remote,導出到本地
- scope != local,導出到遠程
不管是導出到本地,還是遠程。進行服務導出之前,均需要先創建 Invoker。這是一個很重要的步驟,因此接下來我會先分析 Invoker 的創建過程。
?2.2.1 Invoker 創建過程
在 Dubbo 中,Invoker 是一個非常重要的模型。在服務提供端,以及服務引用端均會出現 Invoker。Dubbo 官方文檔中對 Invoker 進行了說明,這里引用一下。
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 調用,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現。
既然 Invoker 如此重要,那么我們很有必要搞清楚 Invoker 的用途。Invoker 是由 ProxyFactory 創建而來,Dubbo 默認的 ProxyFactory 實現類是 JavassistProxyFactory。下面我們到 JavassistProxyFactory 代碼中,探索 Invoker 的創建過程。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | -- JavassistProxyFactory public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {// 為目標類創建 Wrapperfinal Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);// 創建匿名 Invoker 類對象,并實現 doInvoke 方法。return new AbstractProxyInvoker<T>(proxy, type, url) {@Overrideprotected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable {// 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);}}; } |
如上,JavassistProxyFactory 創建了一個繼承自 AbstractProxyInvoker 類的匿名對象,并覆寫了抽象方法 doInvoke。覆寫后的 doInvoke 邏輯比較簡單,僅是將調用請求轉發給了 Wrapper 類的 invokeMethod 方法。Wrapper 用于“包裹”目標類,Wrapper 是一個抽象類,僅可通過 getWrapper(Class) 方法創建子類。在創建 Wrapper 子類的過程中,子類代碼生成邏輯會對 getWrapper 方法傳入的 Class 對象進行解析,拿到諸如類方法,類成員變量等信息。以及生成 invokeMethod 方法代碼,和其他一些方法代碼。代碼生成完畢后,通過 Javassist 生成 Class 對象,最后再通過反射創建 Wrapper 實例。相關的代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static Wrapper getWrapper(Class<?> c) {while (ClassGenerator.isDynamicClass(c))c = c.getSuperclass();if (c == Object.class)return OBJECT_WRAPPER;// 訪存Wrapper ret = WRAPPER_MAP.get(c);if (ret == null) {// 緩存未命中,創建 Wrapperret = makeWrapper(c);// 寫入緩存WRAPPER_MAP.put(c, ret);}return ret; } |
getWrapper 方法只是包含了一些緩存操作邏輯,非重點。下面我們重點關注 makeWrapper 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | private static Wrapper makeWrapper(Class<?> c) {// 檢測 c 是否為私有類型,若是則拋出異常if (c.isPrimitive())throw new IllegalArgumentException("Can not create wrapper for primitive type: " + c);String name = c.getName();ClassLoader cl = ClassHelper.getClassLoader(c);// c1 用于存儲 setPropertyValue 方法代碼StringBuilder c1 = new StringBuilder("public void setPropertyValue(Object o, String n, Object v){ ");// c2 用于存儲 getPropertyValue 方法代碼StringBuilder c2 = new StringBuilder("public Object getPropertyValue(Object o, String n){ ");// c3 用于存儲 invokeMethod 方法代碼StringBuilder c3 = new StringBuilder("public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws " + InvocationTargetException.class.getName() + "{ ");// 生成類型轉換代碼及異常捕捉代碼,比如:// DemoService w; try { w = ((DemoServcie) $1); }}catch(Throwable e){ throw new IllegalArgumentException(e); }c1.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c2.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");c3.append(name).append(" w; try{ w = ((").append(name).append(")$1); }catch(Throwable e){ throw new IllegalArgumentException(e); }");// pts 用于存儲成員變量名和類型Map<String, Class<?>> pts = new HashMap<String, Class<?>>();// ms 用于存儲方法描述信息(可理解為方法簽名)及 Method 實例Map<String, Method> ms = new LinkedHashMap<String, Method>();// mns 為方法名列表List<String> mns = new ArrayList<String>();// dmns 用于存儲定義在當前類中的方法的名稱List<String> dmns = new ArrayList<String>();// --------------------------------? 分割線1 ?-------------------------------------// 獲取 public 訪問級別的字段,并為所有字段生成條件判斷語句for (Field f : c.getFields()) {String fn = f.getName();Class<?> ft = f.getType();if (Modifier.isStatic(f.getModifiers()) || Modifier.isTransient(f.getModifiers()))// 忽略關鍵字 static 或 transient 修飾的變量continue;// 生成條件判斷及賦值語句,比如:// if( $2.equals("name") ) { w.name = (java.lang.String) $3; return;}// if( $2.equals("age") ) { w.age = ((Number) $3).intValue(); return;}c1.append(" if( $2.equals(\"").append(fn).append("\") ){ w.").append(fn).append("=").append(arg(ft, "$3")).append("; return; }");// 生成條件判斷及返回語句,比如:// if( $2.equals("name") ) { return ($w)w.name; }c2.append(" if( $2.equals(\"").append(fn).append("\") ){ return ($w)w.").append(fn).append("; }");// 存儲 <字段名, 字段類型> 鍵值對到 pts 中pts.put(fn, ft);}// --------------------------------? 分割線2 ?-------------------------------------Method[] methods = c.getMethods();// 檢測 c 中是否包含在當前類中聲明的方法boolean hasMethod = hasMethods(methods);if (hasMethod) {c3.append(" try{");}for (Method m : methods) {if (m.getDeclaringClass() == Object.class)// 忽略 Object 中定義的方法continue;String mn = m.getName();// 生成方法名判斷語句,示例如下:// if ( "sayHello".equals( $2 )c3.append(" if( \"").append(mn).append("\".equals( $2 ) ");int len = m.getParameterTypes().length;// 生成運行時傳入參數的數量與方法的參數列表長度判斷語句,示例如下:// && $3.length == 2c3.append(" && ").append(" $3.length == ").append(len);boolean override = false;for (Method m2 : methods) {// 檢測方法是否存在重載情況,條件為:方法對象不同 && 方法名相同if (m != m2 && m.getName().equals(m2.getName())) {override = true;break;}}// 對重載方法進行處理,考慮下面的方法:// 1. void sayHello(Integer, String)// 2. void sayHello(Integer, Integer)// 方法名相同,參數列表長度也相同,因此不能僅通過這兩項判斷兩個方法是否相等。// 需要進一步判斷方法的參數類型if (override) {if (len > 0) {for (int l = 0; l < len; l++) {// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")c3.append(" && ").append(" $3[").append(l).append("].getName().equals(\"").append(m.getParameterTypes()[l].getName()).append("\")");}}}// 添加 ) {,完成方法判斷語句,此時生成的方法可能如下(已格式化):// if ("sayHello".equals($2) // && $3.length == 2// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) {c3.append(" ) { ");// 根據返回值類型生成目標方法調用語句if (m.getReturnType() == Void.TYPE)// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); return null;c3.append(" w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");").append(" return null;");else// return w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]);c3.append(" return ($w)w.").append(mn).append('(').append(args(m.getParameterTypes(), "$4")).append(");");// 添加 }, 當前”方法判斷條件“代碼生成完畢,示例代碼如下(已格式化):// if ("sayHello".equals($2) // && $3.length == 2// && $3[0].getName().equals("java.lang.Integer") // && $3[1].getName().equals("java.lang.String")) {//// w.sayHello((java.lang.Integer)$4[0], (java.lang.String)$4[1]); // return null;// }c3.append(" }");// 添加方法名到 mns 集合中mns.add(mn);// 檢測當前方法是否在 c 中被聲明的if (m.getDeclaringClass() == c)// 若是,則將當前方法名添加到 dmns 中dmns.add(mn);ms.put(ReflectUtils.getDesc(m), m);}if (hasMethod) {// 添加異常捕捉語句c3.append(" } catch(Throwable e) { ");c3.append(" throw new java.lang.reflect.InvocationTargetException(e); ");c3.append(" }");}// 添加 NoSuchMethodException 異常拋出代碼c3.append(" throw new " + NoSuchMethodException.class.getName() + "(\"Not found method \\\"\"+$2+\"\\\" in class " + c.getName() + ".\"); }");// --------------------------------? 分割線3 ?-------------------------------------Matcher matcher;// 處理 get/set 方法for (Map.Entry<String, Method> entry : ms.entrySet()) {String md = entry.getKey();Method method = (Method) entry.getValue();// 匹配以 get 開頭的方法if ((matcher = ReflectUtils.GETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {// 獲取屬性名String pn = propertyName(matcher.group(1));// 生成屬性判斷以及返回語句,示例如下:// if( $2.equals("name") ) { return ($w).w.getName(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 is/has/can 開頭的方法} else if ((matcher = ReflectUtils.IS_HAS_CAN_METHOD_DESC_PATTERN.matcher(md)).matches()) {String pn = propertyName(matcher.group(1));// 生成屬性判斷以及返回語句,示例如下:// if( $2.equals("dream") ) { return ($w).w.hasDream(); }c2.append(" if( $2.equals(\"").append(pn).append("\") ){ return ($w)w.").append(method.getName()).append("(); }");pts.put(pn, method.getReturnType());// 匹配以 set 開頭的方法} else if ((matcher = ReflectUtils.SETTER_METHOD_DESC_PATTERN.matcher(md)).matches()) {Class<?> pt = method.getParameterTypes()[0];String pn = propertyName(matcher.group(1));// 生成屬性判斷以及 setter 調用語句,示例如下:// if( $2.equals("name") ) { w.setName((java.lang.String)$3); return; }c1.append(" if( $2.equals(\"").append(pn).append("\") ){ w.").append(method.getName()).append("(").append(arg(pt, "$3")).append("); return; }");pts.put(pn, pt);}}// 添加 NoSuchPropertyException 異常拋出代碼c1.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");c2.append(" throw new " + NoSuchPropertyException.class.getName() + "(\"Not found property \\\"\"+$2+\"\\\" filed or setter method in class " + c.getName() + ".\"); }");// --------------------------------? 分割線4 ?-------------------------------------long id = WRAPPER_CLASS_COUNTER.getAndIncrement();// 創建類生成器ClassGenerator cc = ClassGenerator.newInstance(cl);// 設置類名及超類cc.setClassName((Modifier.isPublic(c.getModifiers()) ? Wrapper.class.getName() : c.getName() + "$sw") + id);cc.setSuperClass(Wrapper.class);// 添加默認構造方法cc.addDefaultConstructor();// 添加字段cc.addField("public static String[] pns;");cc.addField("public static " + Map.class.getName() + " pts;");cc.addField("public static String[] mns;");cc.addField("public static String[] dmns;");for (int i = 0, len = ms.size(); i < len; i++)cc.addField("public static Class[] mts" + i + ";");// 添加方法代碼cc.addMethod("public String[] getPropertyNames(){ return pns; }");cc.addMethod("public boolean hasProperty(String n){ return pts.containsKey($1); }");cc.addMethod("public Class getPropertyType(String n){ return (Class)pts.get($1); }");cc.addMethod("public String[] getMethodNames(){ return mns; }");cc.addMethod("public String[] getDeclaredMethodNames(){ return dmns; }");cc.addMethod(c1.toString());cc.addMethod(c2.toString());cc.addMethod(c3.toString());try {// 生成類Class<?> wc = cc.toClass();// 設置字段值wc.getField("pts").set(null, pts);wc.getField("pns").set(null, pts.keySet().toArray(new String[0]));wc.getField("mns").set(null, mns.toArray(new String[0]));wc.getField("dmns").set(null, dmns.toArray(new String[0]));int ix = 0;for (Method m : ms.values())wc.getField("mts" + ix++).set(null, m.getParameterTypes());// 創建 Wrapper 實例return (Wrapper) wc.newInstance();} catch (RuntimeException e) {throw e;} catch (Throwable e) {throw new RuntimeException(e.getMessage(), e);} finally {cc.release();ms.clear();mns.clear();dmns.clear();} } |
上面代碼很長,大家耐心看一下。我在上面代碼中做了大量的注釋,并按功能對代碼進行了分塊,以幫助大家理解代碼邏輯。下面對這段代碼進行講解。首先我們把目光移到分割線1之上的代碼,這段代碼主要用于進行一些初始化操作。比如創建 c1、c2、c3 以及 pts、ms、mns 等變量,以及向 c1、c2、c3 中添加方法定義和類型類型轉換代碼。接下來是分割線1到分割線2之間的代碼,這段代碼用于為 public 級別的字段生成條件判斷取值與賦值代碼。這段代碼不是很難看懂,就不多說了。繼續向下看,分割線2和分隔線3之間的代碼用于為定義在當前類中的方法生成判斷語句,和方法調用語句。因為需要對方法重載進行校驗,因此到這這段代碼看起來有點復雜。不過耐心開一下,也不是很難理解。接下來是分割線3和分隔線4之間的代碼,這段代碼用于處理 getter、setter 以及以 is/has/can 開頭的方法。處理方式是通過正則表達式獲取方法類型(get/set/is/…),以及屬性名。之后為屬性名生成判斷語句,然后為方法生成調用語句。最后我們再來看一下分隔線4以下的代碼,這段代碼通過 ClassGenerator 為剛剛生成的代碼構建 Class 類,并通過反射創建對象。ClassGenerator 是 Dubbo 自己封裝的,該類的核心是 toClass() 的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過 javassist 構建 Class。這里就不分析 toClass 方法了,大家請自行分析。
閱讀 Wrapper 類代碼需要對 javassist 框架有所了解。關于 javassist,大家如果不熟悉,請自行查閱資料,本節不打算介紹 javassist 相關內容。
好了,關于 Wrapper 類生成過程就分析到這。如果大家看的不是很明白,可以單獨為 Wrapper 創建單元測試,然后單步調試。并將生成的代碼拷貝出來,格式化后再進行觀察和理解。好了,本節先到這。
?2.2.2 導出服務到本地
本節我們來看一下服務導出相關的代碼,按照代碼執行順序,本節先來分析導出服務到本地的過程。相關代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 | private void exportLocal(URL url) {// 如果 URL 的協議頭等于 injvm,說明已經導出到本地了,無需再次導出if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL) // 設置協議頭為 injvm.setHost(LOCALHOST).setPort(0);ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));// 創建 Invoker,并導出服務,這里的 protocol 會在運行時調用 InjvmProtocol 的 export 方法Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local));exporters.add(exporter);} } |
exportLocal 方法比較簡單,首先根據 URL 協議頭決定是否導出服務。若需導出,則創建一個新的 URL 并將協議頭、主機名以及端口設置成新的值。然后創建 Invoker,并調用 InjvmProtocol 的 export 方法導出服務。下面我們來看一下 InjvmProtocol 的 export 方法都做了哪些事情。
| 1 2 3 4 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {// 創建 InjvmExporterreturn new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); } |
如上,InjvmProtocol 的 export 方法僅創建了一個 InjvmExporter,無其他邏輯。到此導出服務到本地就分析完了,接下來,我們繼續分析導出服務到遠程的過程。
?2.2.3 導出服務到遠程
與導出服務到本地相比,導出服務到遠程的過程要復雜不少,其包含了服務導出與服務注冊兩個過程。這兩個過程涉及到了大量的調用,因此比較復雜。不過不管再難,我們都要看一下,萬一看懂了呢。按照代碼執行順序,本節先來分析服務導出邏輯,服務注冊邏輯將在下一節進行分析。下面開始分析,我們把目光移動到 RegistryProtocol 的 export 方法上。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// 導出服務final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);// 獲取注冊中心 URL,以 zookeeper 注冊中心為例,得到的示例 URL 如下:// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-providerURL registryUrl = getRegistryUrl(originInvoker);// 根據 URL 加載 Registry 實現類,比如 ZookeeperRegistryfinal Registry registry = getRegistry(originInvoker);// 獲取已注冊的服務提供者 URL,比如:// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);// 獲取 register 參數boolean register = registeredProviderUrl.getParameter("register", true);// 向服務提供者與消費者注冊表中注冊服務提供者ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);// 根據 register 的值決定是否注冊服務if (register) {// 向注冊中心注冊服務register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}// 獲取訂閱 URL,比如:// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHellofinal URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);// 創建監聽器final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 向注冊中心進行訂閱 override 數據registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 創建并返回 DestroyableExporterreturn new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl); } |
上面代碼看起來比較復雜,主要做如下一些操作:
在以上操作中,除了創建并返回 DestroyableExporter 沒啥難度外,其他幾步操作都不是很簡單。這其中,導出服務和注冊服務是本章要重點分析的邏輯。 訂閱 override 數據這個是非重點內容,后面會簡單介紹一下。下面開始本節的分析,先來分析 doLocalExport 方法的邏輯,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {String key = getCacheKey(originInvoker);// 訪問緩存ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {synchronized (bounds) {exporter = (ExporterChangeableWrapper<T>) bounds.get(key);if (exporter == null) {// 創建 Invoker 為委托類對象final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));// 調用 protocol 的 export 方法導出服務exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);// 寫緩存bounds.put(key, exporter);}}}return exporter; } |
上面的代碼是典型的雙重檢查,這個大家應該都知道。接下來,我們把重點放在 Protocol 的 export 方法上。假設運行時協議為 dubbo,此處的 protocol 會在運行時加載 DubboProtocol,并調用 DubboProtocol 的 export 方法。我們目光轉移到 DubboProtocol 的 export 方法上,相關分析如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// 獲取服務標識,理解成服務坐標也行。由服務組名,服務名,服務版本號以及端口組成。比如:// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880String key = serviceKey(url);// 創建 DubboExporterDubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// 將 <key, exporter> 鍵值對放入緩存中exporterMap.put(key, exporter);// 以下代碼應該和本地存根有關,代碼不難看懂,但具體用途暫時不清楚,先忽略Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {// 省略日志打印代碼} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}// 啟動服務器openServer(url);// 優化序列化optimizeSerialization(url);return exporter; } |
如上,我們重點關注 DubboExporter 的創建以及 openServer 方法,其他邏輯看不懂也沒關系,不影響理解服務導出過程。另外,DubboExporter 的代碼比較簡單,就不分析了。下面分析 openServer 方法。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | private void openServer(URL url) {// 獲取 host:port,并將其作為服務器實例的 key,用于標識當前的服務器實例String key = url.getAddress();boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);if (isServer) {// 訪問緩存ExchangeServer server = serverMap.get(key);if (server == null) {// 創建服務器實例serverMap.put(key, createServer(url));} else {// 服務器已創建,則根據 url 中的配置重置服務器server.reset(url);}} } |
如上,在同一臺機器上(單網卡),同一個端口上僅允許啟動一個服務器實例。若某個端口上已有服務器實例,此時則調用 reset 方法重置服務器的一些配置。考慮到篇幅問題,關于服務器實例重置的代碼就不分析了。接下來分析服務器實例的創建過程。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | private ExchangeServer createServer(URL url) {url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,// 添加心跳檢測配置到 url 中url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 獲取 server 參數,默認為 nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);// 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);// 添加編碼解碼器參數url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {// 創建 ExchangeServerserver = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server...");}// 獲取 client 參數,可指定 netty,minastr = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {// 獲取所有的 Transporter 實現類名稱集合,比如 supportedTypes = [netty, mina]Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();// 檢測當前 Dubbo 所支持的 Transporter 實現類名稱列表中,// 是否包含 client 所表示的 Transporter,若不包含,則拋出異常if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type...");}}return server; } |
如上,createServer 包含三個核心的操作。第一是檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常。第二是創建服務器實例。第三是檢測是否支持 client 參數所表示的 Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應的代碼比較直白了,無需多說。但創建服務器的操作目前還不是很清晰,我們繼續往下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 | public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 獲取 Exchanger,默認為 HeaderExchanger。// 緊接著調用 HeaderExchanger 的 bind 方法創建 ExchangeServer 實例return getExchanger(url).bind(url, handler); } |
上面代碼比較簡單,就不多說了。下面看一下 HeaderExchanger 的 bind 方法。
| 1 2 3 4 5 6 7 | public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {// 創建 HeaderExchangeServer 實例,該方法包含了多步操作,本別如下:// 1. new HeaderExchangeHandler(handler)// 2. new DecodeHandler(new HeaderExchangeHandler(handler))// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } |
HeaderExchanger 的 bind 方法包含的邏輯比較多,但目前我們僅需關心 Transporters 的 bind 方法邏輯即可。該方法的代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handlers 元素數量大于1,則創建 ChannelHandler 分發器handler = new ChannelHandlerDispatcher(handlers);}// 獲取自適應 Transporter 實例,并調用實例方法return getTransporter().bind(url, handler); } |
如上,getTransporter() 方法獲取的 Transporter 是在運行時動態創建的,類名為 TransporterAdaptive,也就是自適應拓展類。我在[上一篇文章](http://www.tianxiaobo.com/2018/10/13/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E8%87%AA%E9%80%82%E5%BA%94%E6%8B%93%E5%B1%95%E5%8E%9F%E7%90%86/)中詳細分析了自適應拓展類的生成過程,對自適應拓展類不了解的同學可以參考我之前的文章,這里不再贅述。TransporterAdaptive 會在運行時根據傳入的 URL 參數決定加載什么類型的 Transporter,默認為 NettyTransporter。下面我們繼續跟下去,這次分析的是 NettyTransporter 的 bind 方法。
| 1 2 3 4 | public Server bind(URL url, ChannelHandler listener) throws RemotingException {// 創建 NettyServerreturn new NettyServer(url, listener); } |
這里僅有一句創建 NettyServer 的代碼,沒啥好講的,我們繼續向下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | public class NettyServer extends AbstractServer implements Server {public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 調用父類構造方法super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));} }public abstract class AbstractServer extends AbstractEndpoint implements Server {public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {// 調用父類構造方法,這里就不用跟進去了,沒什么復雜邏輯super(url, handler);localAddress = getUrl().toInetSocketAddress();// 獲取 ip 和端口String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {// 設置 ip 為 0.0.0.0bindIp = NetUtils.ANYHOST;}bindAddress = new InetSocketAddress(bindIp, bindPort);// 獲取最大可接受連接數this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);try {// 調用模板方法 doOpen 啟動服務器doOpen();} catch (Throwable t) {throw new RemotingException("Failed to bind ");}DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));}protected abstract void doOpen() throws Throwable;protected abstract void doClose() throws Throwable; } |
上面多數代碼為賦值代碼,不需要多講。我們重點關注 doOpen 抽象方法,該方法需要子類實現。下面回到 NettyServer 中。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();// 創建 boss 和 worker 線程池ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 創建 ServerBootstrapbootstrap = new ServerBootstrap(channelFactory);final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();bootstrap.setOption("child.tcpNoDelay", true);// 設置 PipelineFactorybootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// 綁定到指定的 ip 和端口上channel = bootstrap.bind(getBindAddress()); } |
以上就是 NettyServer 創建的過程,dubbo 默認使用的 NettyServer 是基于 netty 3.x 版本實現的,比較老了。因此 Dubbo 中另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的過程中按需進行配置。
到此,關于服務導出的過程就分析完了。整個過程比較復雜,大家在分析的過程中耐心一些。并且多寫 Demo 進行進行調試,以便能夠更好的理解代碼邏輯。好了,本節內容先到這里,接下來分析服務導出的另一塊邏輯 – 服務注冊。
?2.2.4 服務注冊
本節我們來分析服務注冊過程,服務注冊操作對于 Dubbo 來說不是必需的,通過服務直連的方式就可以繞過注冊中心。但通常我們不會這么做,直連方式不利于服務治理,僅推薦在測試環境測試服務時使用。對于 Dubbo 來說,注冊中心雖不是必需,但卻是必要的。因此,關于注冊中心以及服務注冊相關邏輯,我們也需要搞懂。
本節內容以 Zookeeper 注冊中心作為分析目標,其他類型注冊中心大家可自行分析。下面從服務注冊的入口方法開始分析,我們把目光再次移到 RegistryProtocol 的 export 方法上。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {// ${導出服務}// 省略其他代碼boolean register = registeredProviderUrl.getParameter("register", true);if (register) {// 注冊服務register(registryUrl, registeredProviderUrl);ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);}final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);// 訂閱 override 數據registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);// 省略部分代碼 } |
RegistryProtocol 的 export 方法包含了服務導出,注冊,以及數據訂閱等邏輯。其中服務導出邏輯上一節已經分析過了,本節將分析服務注冊邏輯,數據訂閱邏輯將在下一節進行分析。下面開始本節的分析,相關代碼如下:
| 1 2 3 4 5 6 | public void register(URL registryUrl, URL registedProviderUrl) {// 獲取 RegistryRegistry registry = registryFactory.getRegistry(registryUrl);// 注冊服務registry.register(registedProviderUrl); } |
register 方法包含兩步操作,第一步是獲取注冊中心實例,第二步是向注冊中心注冊服務。接下來,我分兩節內容對這兩步操作進行分析。按照順序,先來分析獲取注冊中心的邏輯。
?2.2.4.1 創建注冊中心
本節內容以 Zookeeper 注冊中心為例進行分析。下面先來看一下 getRegistry 方法的源碼,這個方法由 AbstractRegistryFactory 實現。如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | public Registry getRegistry(URL url) {url = url.setPath(RegistryService.class.getName()).addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()).removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);String key = url.toServiceString();LOCK.lock();try {// 訪問緩存Registry registry = REGISTRIES.get(key);if (registry != null) {return registry;}// 緩存未命中,創建 Registry 實例registry = createRegistry(url);if (registry == null) {throw new IllegalStateException("Can not create registry...");}// 寫入緩存REGISTRIES.put(key, registry);return registry;} finally {LOCK.unlock();} }protected abstract Registry createRegistry(URL url); |
如上,getRegistry 方法先訪問緩存,緩存未命中則調用 createRegistry 創建 Registry,然后寫入緩存。這里的 createRegistry 是一個模板方法,由具體的子類實現。因此,下面我們到 ZookeeperRegistryFactory 中探究一番。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | public class ZookeeperRegistryFactory extends AbstractRegistryFactory {// zookeeperTransporter 由 SPI 在運行時注入,類型為 ZookeeperTransporter$Adaptiveprivate ZookeeperTransporter zookeeperTransporter;public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {this.zookeeperTransporter = zookeeperTransporter;}@Overridepublic Registry createRegistry(URL url) {// 創建 ZookeeperRegistryreturn new ZookeeperRegistry(url, zookeeperTransporter);} } |
ZookeeperRegistryFactory 的 createRegistry 方法僅包含一句代碼,無需解釋,繼續跟下去。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {super(url);if (url.isAnyHost()) {throw new IllegalStateException("registry address == null");}// 獲取組名,默認為 dubboString group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);if (!group.startsWith(Constants.PATH_SEPARATOR)) {// group = "/" + groupgroup = Constants.PATH_SEPARATOR + group;}this.root = group;// 創建 Zookeeper 客戶端,默認為 CuratorZookeeperTransporterzkClient = zookeeperTransporter.connect(url);// 添加狀態監聽器zkClient.addStateListener(new StateListener() {@Overridepublic void stateChanged(int state) {if (state == RECONNECTED) {try {recover();} catch (Exception e) {logger.error(e.getMessage(), e);}}}}); } |
在上面的代碼代碼中,我們重點關注 ZookeeperTransporter 的 connect 方法調用,這個方法用于創建 Zookeeper 客戶端。創建好 Zookeeper 客戶端,意味著注冊中心的創建過程就結束了。不過,顯然我們不能就此停止,難道大家沒有興趣了解一下 Zookeeper 客戶端的創建過程嗎?如果有,那么繼續向下看。沒有的話,直接跳到下一節。那我接著分析了。
前面說過,這里的 zookeeperTransporter 類型為自適應拓展類,因此 connect 方法會在被調用時決定加載什么類型的 ZookeeperTransporter 拓展,默認為 CuratorZookeeperTransporter。下面我們到 CuratorZookeeperTransporter 中看一看。
| 1 2 3 4 | public ZookeeperClient connect(URL url) {// 創建 CuratorZookeeperClientreturn new CuratorZookeeperClient(url); } |
上面方法僅用于創建 CuratorZookeeperClient 實例,沒什么好說的,繼續往下看。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {private final CuratorFramework client;public CuratorZookeeperClient(URL url) {super(url);try {// 創建 CuratorFramework 構造器CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(url.getBackupAddress()).retryPolicy(new RetryNTimes(1, 1000)).connectionTimeoutMs(5000);String authority = url.getAuthority();if (authority != null && authority.length() > 0) {builder = builder.authorization("digest", authority.getBytes());}// 構建 CuratorFramework 實例client = builder.build();// 添加監聽器client.getConnectionStateListenable().addListener(new ConnectionStateListener() {@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {if (state == ConnectionState.LOST) {CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);} else if (state == ConnectionState.CONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);} else if (state == ConnectionState.RECONNECTED) {CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);}}});// 啟動客戶端client.start();} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}} } |
CuratorZookeeperClient 構造方法主要用于創建和啟動 CuratorFramework 實例。以上基本上都是 Curator 框架的代碼,大家如果對 Curator 框架不是很了解,可以參考 Curator 官方文檔,并寫點 Demo 跑跑。
本節分析了 ZookeeperRegistry 實例的創建過程,整個過程并不是很復雜。大家在看完分析后,可以自行調試,以加深印象。現在注冊中心實例創建好了,接下來要做的事情是向注冊中心注冊服務,我們繼續往下看。
?2.2.4.2 節點創建
以 Zookeeper 為例,所謂的服務注冊,本質上是將服務配置數據寫入到 Zookeeper 的某個路徑的節點下。為了驗證這個說法,下面我們將 Dobbo 官方提供提供的實例跑起來,然后通過 Zookeeper 可視化客戶端?ZooInspector?查看節點數據。如下:
從上圖中可以看到 com.alibaba.dubbo.demo.DemoService 這個服務對應的配置信息(存儲在 URL 中)最終被注冊到了 /dubbo/com.alibaba.dubbo.demo.DemoService/providers/ 節點下。搞懂了服務注冊的本質,那么接下來我們就可以去閱讀服務注冊的代碼了。服務注冊的接口為 register(URL),這個方法定義在 FailbackRegistry 抽象類中。方法代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public void register(URL url) {super.register(url);failedRegistered.remove(url);failedUnregistered.remove(url);try {// 模板方法,由子類實現doRegister(url);} catch (Exception e) {Throwable t = e;// 獲取 check 參數,若 check = true 將會直接拋出異常boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)&& url.getParameter(Constants.CHECK_KEY, true)&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());boolean skipFailback = t instanceof SkipFailbackWrapperException;if (check || skipFailback) {if (skipFailback) {t = t.getCause();}throw new IllegalStateException("Failed to register");} else {logger.error("Failed to register");}// 記錄注冊失敗的鏈接failedRegistered.add(url);} }protected abstract void doRegister(URL url); |
如上,我們重點關注 doRegister 方法調用即可,其他的代碼先忽略。doRegister 方法是一個模板方法,因此我們到 FailbackRegistry 子類 ZookeeperRegistry 中進行分析。如下:
| 1 2 3 4 5 6 7 8 9 10 11 | protected void doRegister(URL url) {try {// 通過 Zookeeper 客戶端創建節點,節點路徑由 toUrlPath 方法生成,路徑格式如下:// /${group}/${serviceInterface}/providers/${url}// 比如// /dubbo/com.tianxiaobo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register...");} } |
如上,ZookeeperRegistry 在 doRegister 中調用了 Zookeeper 客戶端創建服務節點。節點路徑由 toUrlPath 方法生成,該方法邏輯不難理解,就不分析了。接下來分析 create 方法,如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | public void create(String path, boolean ephemeral) {if (!ephemeral) {// 如果要創建的節點類型非臨時節點,那么這里要檢測節點是否存在if (checkExists(path)) {return;}}int i = path.lastIndexOf('/');if (i > 0) {create(path.substring(0, i), false); // 遞歸創建上一級路徑}// 根據 ephemeral 的值創建臨時或持久節點if (ephemeral) {createEphemeral(path);} else {createPersistent(path);} } |
上面方法先是通過遞歸創建當前節點的上一級路徑,然后再根據 ephemeral 的值決定創建臨時還是持久節點。createEphemeral 和 createPersistent 這兩個方法都比較簡單,這里簡單分析其中的一個。如下:
| 1 2 3 4 5 6 7 8 9 | public void createEphemeral(String path) {try {// 通過 Curator 框架創建節點client.create().withMode(CreateMode.EPHEMERAL).forPath(path);} catch (NodeExistsException e) {} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);} } |
好了,到此關于服務注冊的過程就分析完了。整個過程可簡單總結為:先創建注冊中心實例,之后再通過注冊中心實例注冊服務。本節先到這,接下來分析數據訂閱過程。
?2.2.5 訂閱 override 數據
訂閱 override 數據對應的代碼我粗略看了一遍,這部分代碼的主要目的是為了在服務配置發生變化時,重新導出服務。具體的使用場景應該當我們通過 Dubbo 管理后臺修改了服務配置后,Dubbo 得到服務配置被修改的通知,然后重新導出服務。這個使用場景只是猜測,我并未進行過驗證。如果大家有興趣可以自行驗證。
override 數據訂閱相關代碼也不是很少,考慮到文章篇幅問題以及重要性,遂決定不對此邏輯進行詳細的分析。如果大家有興趣,可自行分析。
?3.總結
本篇文章詳細分析了 Dubbo 服務導出過程,包括配置檢測,URL 組裝,Invoker 創建過程、導出服務以及注冊服務等等。篇幅比較大,需要大家耐心閱讀。對于這篇文章,我建議大家當成一個工具書使用。需要的時候跳到指定章節看一下,通讀可能會有點累。由于文章篇幅比較大,因此可能會隱藏一些我沒意識到的錯誤。若大家在閱讀的過程中發現了錯誤,還請指出。如果能夠不吝賜教,那就更好了,先在這里說聲謝謝。
好了,本篇文章就到這了。謝謝閱讀。
- 本文鏈接:?https://www.tianxiaobo.com/2018/10/31/Dubbo-源碼分析-服務導出/
http://www.tianxiaobo.com/2018/10/31/Dubbo-%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90-%E6%9C%8D%E5%8A%A1%E5%AF%BC%E5%87%BA/?
總結
以上是生活随笔為你收集整理的Dubbo 源码分析 - 服务导出的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Dubbo 源码分析 - 自适应拓展原理
- 下一篇: Dubbo 源码分析 - 服务引用