【源码系列】Eureka源码分析
??對(duì)于服務(wù)注冊(cè)中心、服務(wù)提供者、服務(wù)消費(fèi)者這個(gè)三個(gè)主要元素來說,服務(wù)提供者和服務(wù)消費(fèi)者(即Eureka客戶端)在整個(gè)運(yùn)行機(jī)制中是大部分通信行為的主動(dòng)發(fā)起者(服務(wù)注冊(cè)、續(xù)約、下線等),而注冊(cè)中心主要是處理請(qǐng)求的接收者。所以,我們從Eureka的客戶端為入口分析它是如何完成這些主動(dòng)通信的。
??一般情況下,我們將一個(gè)SpringBoot應(yīng)用注冊(cè)到 Eureka Server 或者從 Eureka Server 獲取服務(wù)器列表時(shí),就做了兩件事:
我們先看看 @EnableDiscoveryClient 這個(gè)注解的源碼,如下:
/*** Annotation to enable a DiscoveryClient implementation.* @author Spencer Gibb*/ (ElementType.TYPE) (RetentionPolicy.RUNTIME) (EnableDiscoveryClientImportSelector.class) public EnableDiscoveryClient {/*** If true, the ServiceRegistry will automatically register the local server.*/boolean autoRegister() default true; }通過注釋可以知道,該注解可以開啟 DiscoveryClient 實(shí)例,然后我們搜索 DiscoveryClient 會(huì)發(fā)現(xiàn)一個(gè)類和一個(gè)接口,它們的關(guān)系如圖。
?
enter description here
右邊的org.springframework.cloud.client.discovery.DiscoveryClient 是SpringCloud的接口,體現(xiàn)了面向接口編程的思想,定義了用來發(fā)現(xiàn)服務(wù)的常用抽象方法。org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是該接口的實(shí)現(xiàn),是對(duì)Eureka發(fā)現(xiàn)服務(wù)的封裝,內(nèi)部依賴了一個(gè)EurekaClient接口,所以真正實(shí)現(xiàn)發(fā)現(xiàn)服務(wù)的是com.netflix.discovery.DiscoveryClient類。
查看類注釋的內(nèi)容:
?
/*** The class that is instrumental for interactions with <tt>Eureka Server</tt>.** <p>* <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the* instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with* <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from* <tt>Eureka Server</tt> during shutdown* <p>* d) <em>Querying</em> the list of services/instances registered with* <tt>Eureka Server</tt>* <p>** <p>* <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt>* {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips* which do not change. All of the functions defined above fail-over to other* {@link java.net.URL}s specified in the list in the case of failure.* </p>** @author Karthik Ranganathan, Greg Kim* @author Spencer Gibb**/ public class DiscoveryClient implements EurekaClient {... }這個(gè)類用于幫助與 Eureka Server 相互協(xié)作
Eureka Client客戶端負(fù)責(zé)以下內(nèi)容:
哪里對(duì)Eureka Server的URL列表配置?
根據(jù)我們配置的屬性名eureka.client.serviceUrl.defaultZone,通過serviceUrl可以找到該屬性相關(guān)的加載屬性,就是DiscoveryClient里有個(gè)getEurekaServiceUrlsFromConfig()方法但是棄用了,改用EndpointUtils這個(gè)工具類,代碼如下:
Region,Zone
getRegion()方法可以看出一個(gè)微服務(wù)應(yīng)用只可以屬于一個(gè)Region,如果沒配置則為default,可以通過eureka.client.region屬性來定義。
public static String getRegion(EurekaClientConfig clientConfig) {String region = clientConfig.getRegion();if (region == null) {region = DEFAULT_REGION;}region = region.trim().toLowerCase();return region;}getAvailabilityZones()方法可以看出Region與Zone的關(guān)系,一個(gè)Region可以有多個(gè)Zone,設(shè)置時(shí)可以用逗號(hào)來分隔。默認(rèn)采用defaultZone。
public String[] getAvailabilityZones(String region) {String value = (String)this.availabilityZones.get(region);if(value == null) {value = "defaultZone";}return value.split(",");}在獲取Region和Zone的信息后,根據(jù)傳入的參數(shù)按一定的算法確定加載位于哪一個(gè)Zone的serviceUrls。
?
enter description here
getEurekaServerServiceUrls方法是EurekaClientConfigBean的實(shí)現(xiàn)類,該方法用來獲取一個(gè)Zone下配置的所以serviceUrl,通過標(biāo)注出來的地方可以知道,eureka.client.serviceUrl.defaultZone屬性可以配置多個(gè),用逗號(hào)來分隔。
?
?
enter description here
注意: Ribbon具有區(qū)域親和特性,Ribbon的默認(rèn)策略會(huì)優(yōu)先訪問同客戶端處于同一個(gè)Zone中的實(shí)例。所以通過Zone屬性的定義,配置實(shí)際部署的物理結(jié)構(gòu),我們就可以有效地設(shè)計(jì)出對(duì)區(qū)域性故障的容錯(cuò)集群。
?
服務(wù)注冊(cè)
前面說了多個(gè)服務(wù)注冊(cè)中心信息的加載,這里再看看 DiscoveryClient 類是如何實(shí)現(xiàn)服務(wù)注冊(cè)的。通過查看該類的構(gòu)造函數(shù),發(fā)現(xiàn)它調(diào)用了以下方法。
/*** Initializes all scheduled tasks.*/private void initScheduledTasks() {...if (clientConfig.shouldRegisterWithEureka()) {...// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicatorinstanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}這里先根據(jù)配置判斷是不是要注冊(cè)到 Eureka,然后創(chuàng)建心跳檢測(cè)任務(wù),獲取 instanceInfoReplicator。InstanceInfoReplicator類實(shí)現(xiàn) Runnable接口,instanceInfoReplicator實(shí)例會(huì)執(zhí)行一個(gè)定時(shí)任務(wù),這個(gè)定時(shí)任務(wù)的內(nèi)容可以查看該類的run()方法。
?
enter description here
這里定時(shí)刷新實(shí)例信息,discoveryClient.register()這里觸發(fā)了服務(wù)注冊(cè),register()的內(nèi)容如下:
?
?
服務(wù)注冊(cè)的方法
通過注釋也能看出來,這里是通過發(fā)送REST請(qǐng)求的方式進(jìn)行的,com.netflix.appinfo.InstanceInfo就是注冊(cè)時(shí)客戶端給服務(wù)端的元數(shù)據(jù)。
?
服務(wù)獲取與服務(wù)續(xù)約
上面說到的 initScheduledTasks() 方法還有兩個(gè)定時(shí)任務(wù),分別是服務(wù)獲取和服務(wù)續(xù)約。
private void initScheduledTasks() {if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timerint registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();scheduler.schedule(new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread()),registryFetchIntervalSeconds, TimeUnit.SECONDS);}if (clientConfig.shouldRegisterWithEureka()) {int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timerscheduler.schedule(new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread()),renewalIntervalInSecs, TimeUnit.SECONDS);...instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}clientConfig.shouldFetchRegistry()這里其實(shí)是通過eureka.client.fetch-registry參數(shù)來判斷的,默認(rèn)為true,它可以定期更新客戶端的服務(wù)清單,從而客戶端能訪問到健康的服務(wù)實(shí)例。
服務(wù)續(xù)約也是發(fā)送REST請(qǐng)求實(shí)現(xiàn)的。
服務(wù)獲取的過程省略。
服務(wù)下線
服務(wù)端根據(jù)實(shí)例Id和appName執(zhí)行remove操作。
void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}注冊(cè)中心處理
前面的分析都是從客戶端出發(fā)的,現(xiàn)在看看 Eureka Server是如何處理各種Rest請(qǐng)求的。這種請(qǐng)求的定義都在com.netflix.eureka.resources包下。
以服務(wù)注冊(cè)為例:
調(diào)用 ApplicationResource 類下的 addInstance()方法。
在對(duì)注冊(cè)信息進(jìn)行校驗(yàn)后,會(huì)調(diào)用org.springframework.cloud.netflix.eureka.server.InstanceRegistry的register(InstanceInfo info, int leaseDuration, boolean isReplication)方法。
?
enter description here
?
?
enter description here
首先會(huì)把新服務(wù)注冊(cè)事件傳播出去,然后調(diào)用父類com.netflix.eureka.registry.AbstractInstanceRegistry中的實(shí)現(xiàn)。
?
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {this.read.lock();Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());...} finally {this.read.unlock();}}?
保存實(shí)例信息的雙層Map
InstanceInfo的元數(shù)據(jù)信息保存在一個(gè)ConcurrentHashMap中,它是一個(gè)雙層的Map結(jié)構(gòu),第一層的key是服務(wù)名(即InstanceInfo的appName屬性),第二層的key是實(shí)例名(即InstanceInfo的InstanceId屬性)。
ApplicationResource中的其他方法可以自行研究。
?
轉(zhuǎn)載于:https://www.cnblogs.com/2YSP/p/11072255.html
總結(jié)
以上是生活随笔為你收集整理的【源码系列】Eureka源码分析的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: shell 函数的高级用法
- 下一篇: 穿越火线王者之魄觉醒有啥用