dubbo源码解析(二)
大家好,我是烤鴨:
? ? ? dubbo 源碼解析:
1.服務導出
介紹:
Dubbo 服務導出過程始于 Spring 容器發布刷新事件,Dubbo 在接收到事件后,會立即執行服務導出邏輯。整個邏輯大致可分為三個部分,第一部分是前置工作,主要用于檢查參數,組裝 URL。第二部分是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三部分是向注冊中心注冊服務,用于服務發現。
源碼:
com.alibaba.dubbo.config.spring.ServiceBean?
public void onApplicationEvent(ApplicationEvent event) {if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {//如果不延遲,導出if (isDelay() && ! isExported() && ! isUnexported()) {if (logger.isInfoEnabled()) {logger.info("The service ready on spring started. service: " + getInterface());}export();}} }服務導出,查看 export 和 isDelay,需要 導出或者延遲導出。保留主要代碼。
com.alibaba.dubbo.config.ServiceConfig
public synchronized void export() {//如果延遲,另起一條線程,睡眠延遲時間再導出if (delay != null && delay > 0) {Thread thread = new Thread(new Runnable() {public void run() {try {Thread.sleep(delay);} catch (Throwable e) {}doExport();}});thread.setDaemon(true);thread.setName("DelayExportServiceThread");thread.start();} else {doExport();} }com.alibaba.dubbo.config.ServiceConfig
doExport 方法較長,就不貼源碼了。主要是一堆校驗,為的是后續的服務導出。
簡單總結:
1. 檢測 <dubbo:service> 標簽的 interface 屬性合法性,不合法則拋出異常
2. 檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否為空,若為空,則嘗試從其他配置類對象中獲取相應的實例。
3. 檢測并處理泛化服務和普通服務類
4. 檢測本地存根配置,并進行相應的處理
5. 對 ApplicationConfig、RegistryConfig 等配置類進行檢測,為空則嘗試創建,若無法創建則拋出異常
?
多協議多注冊中心導出服務 ,這個也不貼源碼了。
通過 loadRegistries 加載注冊中心鏈接,然后再遍歷 ProtocolConfig 集合導出每個服務。并在導出服務的過程中,將服務注冊到注冊中心。
com.alibaba.dubbo.config.ServiceConfig.loadRegistries
1. 檢測是否存在注冊中心配置類,不存在則拋出異常
2. 構建參數映射集合,也就是 map
3. 構建注冊中心鏈接列表
4. 遍歷鏈接列表,并根據條件決定是否將其添加到 registryList 中
組裝 URL:
URL 是 Dubbo 配置的載體,通過 URL 可讓 Dubbo 的各種配置在各個模塊之間傳遞。
com.alibaba.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol?前半部分
這段代碼用于檢測 <dubbo:method> 標簽中的配置信息,并將相關配置添加到 map 中。
導出 Dubbo 服務:
服務導出分為導出到本地 (JVM),和導出到遠程。
com.alibaba.dubbo.config.ServiceConfig.doExportUrlsFor1Protocol? 后半部分
根據 url 中的 scope 參數決定服務導出方式,分別如下:
scope = none,不導出服務
scope != remote,導出到本地
scope != local,導出到遠程
這里貼一下第三種情況的源碼:
//如果配置不是local則暴露為遠程服務.(配置為local,則表示只暴露遠程服務) if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){if (logger.isInfoEnabled()) {logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);}if (registryURLs != null && registryURLs.size() > 0&& url.getParameter("register", true)) {for (URL registryURL : registryURLs) {url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));URL monitorUrl = loadMonitor(registryURL);if (monitorUrl != null) {url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());}if (logger.isInfoEnabled()) {logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);}Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);}} else {Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);Exporter<?> exporter = protocol.export(invoker);exporters.add(exporter);} }Invoker 創建過程
Invoker 是實體域,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起 invoke 調用,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現
com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory.getInvoker
代碼生成完畢后,通過 Javassist 生成 Class 對象,最后再通過反射創建 Wrapper 實例。
com.alibaba.dubbo.common.bytecode.Wrapper.makeWrapper
(http://dubbo.apache.org/zh-cn/docs/source_code_guide/export-service.html? 搜 2.2.1)
動態創建類的過程就不分析了。
通過 ClassGenerator 為剛剛生成的代碼構建 Class 類,并通過反射創建對象。ClassGenerator 是 Dubbo 自己封裝的,該類的核心是 toClass() 的重載方法 toClass(ClassLoader, ProtectionDomain),該方法通過 javassist 構建 Class。
服務導出到本地:
com.alibaba.dubbo.config.ServiceConfig.exportLocal
導出服務到遠程:
com.alibaba.dubbo.registry.integration.RegistryProtocol
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {//export invokerfinal ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);//registry providerfinal Registry registry = getRegistry(originInvoker);final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);registry.register(registedProviderUrl);// 訂閱override數據// FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);//保證每次export都返回一個新的exporter實例return new Exporter<T>() {public Invoker<T> getInvoker() {return exporter.getInvoker();}public void unexport() {try {exporter.unexport();} catch (Throwable t) {logger.warn(t.getMessage(), t);}try {registry.unregister(registedProviderUrl);} catch (Throwable t) {logger.warn(t.getMessage(), t);}try {overrideListeners.remove(overrideSubscribeUrl);registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);} catch (Throwable t) {logger.warn(t.getMessage(), t);}}}; }1. 調用 doLocalExport 導出服務
2. 向注冊中心注冊服務
3. 向注冊中心進行訂閱 override 數據
4. 創建并返回 DestroyableExporter
主要分析下服務導出:
com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {URL url = invoker.getUrl();// 獲取服務標識,理解成服務坐標也行。由服務組名,服務名,服務版本號以及端口組成。比如:// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880String key = serviceKey(url);// 創建 DubboExporterDubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);// 將 <key, exporter> 鍵值對放入緩存中exporterMap.put(key, exporter);// 本地存根相關代碼Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);if (isStubSupportEvent && !isCallbackservice) {String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);if (stubServiceMethods == null || stubServiceMethods.length() == 0) {// 省略日志打印代碼} else {stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);}}// 啟動服務器openServer(url);// 優化序列化optimizeSerialization(url);return exporter; } private ExchangeServer createServer(URL url) {url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,// 添加心跳檢測配置到 url 中url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));// 獲取 server 參數,默認為 nettyString str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);// 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))throw new RpcException("Unsupported server type: " + str + ", url: " + url);// 添加編碼解碼器參數url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);ExchangeServer server;try {// 創建 ExchangeServerserver = Exchangers.bind(url, requestHandler);} catch (RemotingException e) {throw new RpcException("Fail to start server...");}// 獲取 client 參數,可指定 netty,minastr = url.getParameter(Constants.CLIENT_KEY);if (str != null && str.length() > 0) {// 獲取所有的 Transporter 實現類名稱集合,比如 supportedTypes = [netty, mina]Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();// 檢測當前 Dubbo 所支持的 Transporter 實現類名稱列表中,// 是否包含 client 所表示的 Transporter,若不包含,則拋出異常if (!supportedTypes.contains(str)) {throw new RpcException("Unsupported client type...");}}return server; }createServer 包含三個核心的邏輯。
第一是檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常。
第二是創建服務器實例。
第三是檢測是否支持 client 參數所表示的 Transporter 拓展,不存在也是拋出異常。
兩次檢測操作所對應的代碼比較直白了,無需多說。
看下 bind 方法,最終構建netty容器,開放鏈接。
com.alibaba.dubbo.remoting.exchange.Exchangers
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handler == null) {throw new IllegalArgumentException("handler == null");}url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");// 獲取 Exchanger,默認為 HeaderExchanger。// 緊接著調用 HeaderExchanger 的 bind 方法創建 ExchangeServer 實例return getExchanger(url).bind(url, handler); }com.alibaba.dubbo.remoting.Transporters
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {if (url == null) {throw new IllegalArgumentException("url == null");}if (handlers == null || handlers.length == 0) {throw new IllegalArgumentException("handlers == null");}ChannelHandler handler;if (handlers.length == 1) {handler = handlers[0];} else {// 如果 handlers 元素數量大于1,則創建 ChannelHandler 分發器handler = new ChannelHandlerDispatcher(handlers);}// 獲取自適應 Transporter 實例,并調用實例方法return getTransporter().bind(url, handler); }com.alibaba.dubbo.remoting.transport.netty.NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {// 調用父類構造方法super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } @Override protected void doOpen() throws Throwable {NettyHelper.setNettyLoggerFactory();// 創建 boss 和 worker 線程池ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));// 創建 ServerBootstrapbootstrap = new ServerBootstrap(channelFactory);final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);channels = nettyHandler.getChannels();bootstrap.setOption("child.tcpNoDelay", true);// 設置 PipelineFactorybootstrap.setPipelineFactory(new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() {NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);ChannelPipeline pipeline = Channels.pipeline();pipeline.addLast("decoder", adapter.getDecoder());pipeline.addLast("encoder", adapter.getEncoder());pipeline.addLast("handler", nettyHandler);return pipeline;}});// 綁定到指定的 ip 和端口上channel = bootstrap.bind(getBindAddress()); }dubbo 默認使用的 NettyServer 是基于 netty 3.x 版本實現的,比較老了。
因此 Dubbo 另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的過程中按需進行配置。
再分析下服務注冊:
服務注冊操作對于 Dubbo 來說不是必需的,通過服務直連的方式就可以繞過注冊中心。
但通常我們不會這么做,直連方式不利于服務治理,僅推薦在測試服務時使用。對于 Dubbo 來說,注冊中心雖不是必需,但卻是必要的。
獲取注冊中心
com.alibaba.dubbo.registry.support.AbstractRegistryFactory.getRegistry
先創建注冊中心實例,之后再通過注冊中心實例注冊服務。
com.alibaba.dubbo.registry.zookeeper.ZookeeperRegistry.doRegister
@Override protected void doRegister(URL url) {try {zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));} catch (Throwable e) {throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);} }zkClient.create()方法是根據 CuratorFrameWork 與zk連接的。
?
?
?
總結
以上是生活随笔為你收集整理的dubbo源码解析(二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决 org.apache.hadoop
- 下一篇: 谭浩强《c语言程序设计》第五章第十六题