springcloud ribbon @LoadBalance负载均衡源码流程分析
一、編寫示例
? ?1.服務端
? ? pom.xml
<properties><java.version>1.8</java.version><spring-cloud.version>Finchley.SR2</spring-cloud.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>0.2.1.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>? ? 2.提供者代碼,就是一個普通的HTTP REST接口。
@SpringBootApplication @EnableDiscoveryClient @RestController @Slf4j public class ProviderApplication {public static void main(String[] args) {SpringApplication.run(ProviderApplication.class, args);}@GetMapping("provider")public String provider(){log.info("..................................");return "hello,provider";} }3.消費端
POM
<properties><java.version>1.8</java.version><spring-cloud.version>Finchley.SR2</spring-cloud.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId><exclusions><exclusion><groupId>org.springframework.retry</groupId><artifactId>spring-retry</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency> <!-- <dependency>--> <!-- <groupId>org.springframework.retry</groupId>--> <!-- <artifactId>spring-retry</artifactId>--> <!-- </dependency>--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--<dependency>--><!--<groupId>org.springframework.retry</groupId>--><!--<artifactId>spring-retry</artifactId>--><!--</dependency>--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>0.2.1.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement>這里注意,如果不想要RetryTemplate,就去掉Retry的依賴,否則默認會引入的。
消費端代碼
@SpringBootApplication @Slf4j @EnableDiscoveryClient @RestController public class ConsumeApplication {@AutowiredRestTemplate restTemplate;@Bean@LoadBalancedpublic RestTemplate getRestTemplate(){HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory();clientHttpRequestFactory.setConnectTimeout(5 * 1000);clientHttpRequestFactory.setReadTimeout(5 * 1000);return new RestTemplate(clientHttpRequestFactory);}public static void main(String[] args) {SpringApplication.run(ConsumeApplication.class, args);}@GetMapping("test")public String getContent(){log.info("發起請求");return restTemplate.getForObject("http://provider/provider",String.class);}}二、初始化自動配置代碼分析
1.org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration,這個為RIBBON私有的 LoadBalancerClient,LoadBalancer,IRule對象的初始化。 @Configuration @ConditionalOnClass({ IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class}) @RibbonClients @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class}) @EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class}) public class RibbonAutoConfiguration {@Beanpublic SpringClientFactory springClientFactory() {SpringClientFactory factory = new SpringClientFactory();factory.setConfigurations(this.configurations);return factory;}@Bean@ConditionalOnMissingBean(LoadBalancerClient.class)public LoadBalancerClient loadBalancerClient() {return new RibbonLoadBalancerClient(springClientFactory());}@Bean@ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate")@ConditionalOnMissingBeanpublic LoadBalancedRetryFactory loadBalancedRetryPolicyFactory(final SpringClientFactory clientFactory) {return new RibbonLoadBalancedRetryFactory(clientFactory);}2.org.springframework.cloud.client.loadbalancer.LoadBalancerAutoConfiguration為springcloud-common的對象自動初始化。?
@Configuration @ConditionalOnClass(RestTemplate.class) @ConditionalOnBean(LoadBalancerClient.class) @EnableConfigurationProperties(LoadBalancerRetryProperties.class) public class LoadBalancerAutoConfiguration {@LoadBalanced@Autowired(required = false)private List<RestTemplate> restTemplates = Collections.emptyList();@Beanpublic SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {return () -> restTemplateCustomizers.ifAvailable(customizers -> {for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {for (RestTemplateCustomizer customizer : customizers) {customizer.customize(restTemplate);}}});}@Autowired(required = false)private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();@Bean@ConditionalOnMissingBeanpublic LoadBalancerRequestFactory loadBalancerRequestFactory(LoadBalancerClient loadBalancerClient) {return new LoadBalancerRequestFactory(loadBalancerClient, transformers);}@Configuration@ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")static class LoadBalancerInterceptorConfig {@Beanpublic LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient,LoadBalancerRequestFactory requestFactory) {return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);}@Bean@ConditionalOnMissingBeanpublic RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {return restTemplate -> {List<ClientHttpRequestInterceptor> list = new ArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);};}}?這里的 restTemplate只會取加了@LoadBalance注解的,因為@LoadBalance加上了@Qualifer,這個注解會有篩選過濾作用。
去除了RetryTemplate就會走默認,否則走重試balance.
1.首先創建LoadBalancerRequestFactory工廠對象。
?2.創建LoadBalancerInterceptor對象
3.對RestTemplate列表對象,設置攔截器屬性。
三、調用分析
? 1.RestTemplate.getObject方法開始。這個方法會調用doExecute
public class RestTemplate extends InterceptingHttpAccessor implements RestOperations { @Nullableprotected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,@Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {Assert.notNull(url, "URI is required");Assert.notNull(method, "HttpMethod is required");ClientHttpResponse response = null;try {ClientHttpRequest request = createRequest(url, method);if (requestCallback != null) {requestCallback.doWithRequest(request);}response = request.execute();handleResponse(url, method, response);return (responseExtractor != null ? responseExtractor.extractData(response) : null);}catch (IOException ex) {String resource = url.toString();String query = url.getRawQuery();resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);throw new ResourceAccessException("I/O error on " + method.name() +" request for \"" + resource + "\": " + ex.getMessage(), ex);}finally {if (response != null) {response.close();}}}?
2.createRequest方法,RestTemplate是繼承于InterceptingHttpAccessor和HttpAccessor
HttpAccessor protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {ClientHttpRequest request = getRequestFactory().createRequest(url, method);if (logger.isDebugEnabled()) {logger.debug("Created " + method.name() + " request for \"" + url + "\"");}return request;}?3.getRequestFactory方法又是來源于InterceptingHttpAccessor,這里會對原有httpRequestFactory工廠包裝。
public ClientHttpRequestFactory getRequestFactory() {List<ClientHttpRequestInterceptor> interceptors = getInterceptors();if (!CollectionUtils.isEmpty(interceptors)) {ClientHttpRequestFactory factory = this.interceptingRequestFactory;if (factory == null) {factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);this.interceptingRequestFactory = factory;}return factory;}else {return super.getRequestFactory();}} public class InterceptingClientHttpRequestFactory extends AbstractClientHttpRequestFactoryWrapper {private final List<ClientHttpRequestInterceptor> interceptors;/*** Create a new instance of the {@code InterceptingClientHttpRequestFactory} with the given parameters.* @param requestFactory the request factory to wrap* @param interceptors the interceptors that are to be applied (can be {@code null})*/public InterceptingClientHttpRequestFactory(ClientHttpRequestFactory requestFactory,@Nullable List<ClientHttpRequestInterceptor> interceptors) {super(requestFactory);this.interceptors = (interceptors != null ? interceptors : Collections.emptyList());}@Overrideprotected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);}}?所以最后的ClientHttpRequest為InterceptingClientHttpRequest
?3.接著我們回到第一步,response = request.execute();因為已經創建好request,現在開始執行。
class InterceptingClientHttpRequest extends AbstractBufferingClientHttpRequest {private final ClientHttpRequestFactory requestFactory;private final List<ClientHttpRequestInterceptor> interceptors;private HttpMethod method;private URI uri;protected InterceptingClientHttpRequest(ClientHttpRequestFactory requestFactory,List<ClientHttpRequestInterceptor> interceptors, URI uri, HttpMethod method) {this.requestFactory = requestFactory;this.interceptors = interceptors;this.method = method;this.uri = uri;}@Overridepublic HttpMethod getMethod() {return this.method;}@Overridepublic String getMethodValue() {return this.method.name();}@Overridepublic URI getURI() {return this.uri;}@Overrideprotected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();return requestExecution.execute(this, bufferedOutput);}private class InterceptingRequestExecution implements ClientHttpRequestExecution {private final Iterator<ClientHttpRequestInterceptor> iterator;public InterceptingRequestExecution() {this.iterator = interceptors.iterator();}@Overridepublic ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {if (this.iterator.hasNext()) {ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();return nextInterceptor.intercept(request, body, this);}else {HttpMethod method = request.getMethod();Assert.state(method != null, "No standard HTTP method");ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));if (body.length > 0) {if (delegate instanceof StreamingHttpOutputMessage) {StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));}else {StreamUtils.copy(body, delegate.getBody());}}return delegate.execute();}}}}?InterceptingClientHttpRequest的execute會執行executeInternel方法,最后調用
InterceptingRequestExecution.execute?InterceptingRequestExecution.execute這個方法先會調用各種攔截器進行處理,處理完后,再會由攔截器重新調用這個對象的execute方法,做成責任鏈模式。最后調用缺省的HTTP代理請求工廠生成代理對象真正發起HTTP請求。
目前的攔截器就是從注冊中心獲取實例地址,替換URL.
4.接著進行LoadBalancerInterceptor.interropt方法。
public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {private LoadBalancerClient loadBalancer;private LoadBalancerRequestFactory requestFactory;public LoadBalancerInterceptor(LoadBalancerClient loadBalancer, LoadBalancerRequestFactory requestFactory) {this.loadBalancer = loadBalancer;this.requestFactory = requestFactory;}public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {// for backwards compatibilitythis(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));}@Overridepublic ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {final URI originalUri = request.getURI();String serviceName = originalUri.getHost();return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));} }?5.這里的loadBalancer就是RibbonLoadBalancerClient,所以調用這個進行URL替換。
public class RibbonLoadBalancerClient implements LoadBalancerClient {private SpringClientFactory clientFactory;public RibbonLoadBalancerClient(SpringClientFactory clientFactory) {this.clientFactory = clientFactory;}@Overridepublic URI reconstructURI(ServiceInstance instance, URI original) {Assert.notNull(instance, "instance can not be null");String serviceId = instance.getServiceId();RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);URI uri;Server server;if (instance instanceof RibbonServer) {RibbonServer ribbonServer = (RibbonServer) instance;server = ribbonServer.getServer();uri = updateToSecureConnectionIfNeeded(original, ribbonServer);} else {server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);ServerIntrospector serverIntrospector = serverIntrospector(serviceId);uri = updateToSecureConnectionIfNeeded(original, clientConfig,serverIntrospector, server);}return context.reconstructURIWithServer(server, uri);}@Overridepublic ServiceInstance choose(String serviceId) {Server server = getServer(serviceId);if (server == null) {return null;}return new RibbonServer(serviceId, server, isSecure(server, serviceId),serverIntrospector(serviceId).getMetadata(server));}@Overridepublic <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {ILoadBalancer loadBalancer = getLoadBalancer(serviceId);Server server = getServer(loadBalancer);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,serviceId), serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}6.接著調用RibbonLoadBalancerClient.getLoadBalancer獲取負載均衡器。
?7.SpringClientFactory.getLoadBalancer
return getInstance(name, ILoadBalancer.class);這個會去容器工廠自動創建獲取ILoadBalancer類。?8.ribbonLoadBalancingHttpClient為實現類。
package org.springframework.cloud.netflix.ribbon.apache;/*** @author Spencer Gibb*/ @Configuration @ConditionalOnClass(name = "org.apache.http.client.HttpClient") @ConditionalOnProperty(name = "ribbon.httpclient.enabled", matchIfMissing = true) public class HttpClientRibbonConfiguration { @Bean@ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)@ConditionalOnMissingClass(value = "org.springframework.retry.support.RetryTemplate")public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(IClientConfig config, ServerIntrospector serverIntrospector,ILoadBalancer loadBalancer, RetryHandler retryHandler, CloseableHttpClient httpClient) {RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(httpClient, config, serverIntrospector);client.setLoadBalancer(loadBalancer);client.setRetryHandler(retryHandler);Monitors.registerObject("Client_" + this.name, client);return client;}9.這個依賴于ribbonLoadBalancer,實際為這個類ZoneAwareLoadBalancer
package org.springframework.cloud.netflix.ribbon;/*** @author Dave Syer* @author Tim Ysewyn*/ @SuppressWarnings("deprecation") @Configuration @EnableConfigurationProperties //Order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}?10.上面那個類又依賴于IRULE,所以最后創建?ribbonRule,實現類為
ZoneAvoidanceRule?
?11.創建完RULE后,創建loadBalance
12.創建ribbonLoadBalancer
?13.我們再回到第5步,?? ?@Override
?? ?public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
?? ??? ?ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
?? ??? ?Server server = getServer(loadBalancer);,開始獲取server
?
?
?
?14.選擇好server后開始執行。
?
?15.回到LoadBalancerRequestFactory.apply
?16.又回到InterceptingRequestExecution.execute
?17.接下來開始替換URL
?18。RibbonLoadBalancerClient.reconstructURI
?
?19.
LoadBalancerContext.reconstructURIWithServer?20.url替換完畢,調用默認的HTTP代理對象發起請求,返回
?
21 請求結束,處理消息體轉換,返回數據。?
21.請求完畢。
四、服務實例獲取和選擇分析
? 1.我們先回到第三單,第9步,這個依賴于ribbonLoadBalancer,實際為這個類ZoneAwareLoadBalancer,這個為負載均衡器的選擇服務實例邏輯。
2.ZoneAwareLoadBalancer主要為感知分區的實例選擇,我們一般用不到。目前使用DynamicServerListLoadBalancer
AbstractLoadBalancer
AbstractLoadBalancer類的定義如下:
public abstract class AbstractLoadBalancer implements ILoadBalancer {public enum ServerGroup{ALL,STATUS_UP,STATUS_NOT_UP }public Server chooseServer() {return chooseServer(null);}public abstract List<Server> getServerList(ServerGroup serverGroup);public abstract LoadBalancerStats getLoadBalancerStats(); }關于這個類我說以下幾點:
1.?AbstractLoadBalancer實現了ILoadBalancer接口,但它是一個抽象類,它里邊定義了一個關于服務實例的分組枚舉類,包含了三種類型的服務:ALL表示所有服務,STATUS_UP表示正常運行的服務,STATUS_NOT_UP表示下線的服務。
2.?chooseServer方法毫無疑問是用來選取一個服務實例,但是要怎么選這里并沒有說,我們以后在它的實現類里邊尋找選取策略。
3.?getServerList方法用來獲取某一個分組中所有的的服務實例。
4.?getLoadBalancerStats方法用來獲取LoadBalancerStats對象,LoadBalancerStats對象中保存了每一個服務的所有細節信息。
BaseLoadBalancer
BaseLoadBalancer是AbstractLoadBalancer的一個實現類,源碼比較長我就不貼出來的,我們在這里主要來說說BaseLoadBalancer提供了哪些功能:
1.?首先這個類中有兩個List集合中放的Server對象,一個List集合用來保存所有的服務實例,還有一個List集合用來保存當前有效的服務實例。
2.?BaseLoadBalancer中定義了一個IPingStrategy,用來描述服務檢查策略,IPingStrategy默認實現采用了SerialPingStrategy實現,SerialPingStrategy中的pingServers方法就是遍歷所有的服務實例,一個一個發送請求,查看這些服務實例是否還有效,如果網絡環境不好的話,這種檢查策略效率會很低,如果我們想自定義檢查策略的話,可以重寫SerialPingStrategy的pingServers方法。
3.?在BaseLoadBalancer的chooseServer方法中(負載均衡的核心方法),我們發現最終調用了IRule中的choose方法來找到一個具體的服務實例,IRule是一個接口,在BaseLoadBalancer它的默認實現是RoundRobinRule類,RoundRobinRule類中采用了最常用的線性負載均衡規則,也就是所有有效的服務端輪流調用。
4.?在BaseLoadBalancer的構造方法中會啟動一個PingTask,這個PingTask用來檢查Server是否有效,PingTask的默認執行時間間隔為10秒。
5.?markServerDown方法用來標記一個服務是否有效,標記方式為調用Server對象的setAlive方法設置isAliveFlag屬性為false。
6.?getReachableServers方法用來獲取所有有效的服務實例列表。
7.?getAllServers方法用來獲取所有服務的實例列表。
8.?addServers方法表示向負載均衡器中添加一個新的服務實例列表。
BaseLoadBalancer的功能大概就這么多。
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer是BaseLoadBalancer的一個子類,在DynamicServerListLoadBalancer中對基礎負載均衡器的功能做了進一步的擴展,我們來看看。
1.?首先DynamicServerListLoadBalancer類一開始就聲明了一個變量serverListImpl,serverListImpl變量的類型是一個ServerList<T extends Server>,這里的泛型得是Server的子類,ServerList是一個接口,里邊定義了兩個方法:一個getInitialListOfServers用來獲取初始化的服務實例清單;另一個getUpdatedListOfServers用于獲取更新的服務實例清單。
2.?ServerList接口有很多實現類,DynamicServerListLoadBalancer默認使用了DomainExtractingServerList類作為ServerList的實現,但是在DomainExtractingServerList的構造方法中又傳入了DiscoveryEnabledNIWSServerList對象,查看源碼發現最終兩個清單的獲取方式是由DiscoveryEnabledNIWSServerList類來提供的。
3.?DomainExtractingServerList類中的obtainServersViaDiscovery方法是用來發現服務實例并獲取的,obtainServersViaDiscovery方法的主要邏輯是這樣:首先依靠EurekaClient從服務注冊中心獲取到具體的服務實例InstanceInfo列表,然后對這個列表進行遍歷,將狀態為UP的實例轉換成DiscoveryEnabledServer對象并放到一個集合中,最后將這個集合返回。
4.?DynamicServerListLoadBalancer中還定義了一個ServerListUpdater.UpdateAction類型的服務更新器,Spring Cloud提供了兩種服務更新策略:一種是PollingServerListUpdater,表示定時更新;另一種是EurekaNotificationServerListUpdater表示由Eureka的事件監聽來驅動服務列表的更新操作,默認的實現策略是第一種,即定時更新,定時的方式很簡單,創建Runnable,調用DynamicServerListLoadBalancer中updateAction對象的doUpdate方法,Runnable延遲啟動時間為1秒,重復周期為30秒。
5.?在更新服務清單的時候,調用了我們在第一點提到的getUpdatedListOfServers方法,拿到實例清單之后,又調用了一個過濾器中的方法進行過濾。過濾器的類型有好幾種,默認是DefaultNIWSServerListFilter,這是一個繼承自ZoneAffinityServerListFilter的過濾器,具有區域感知功能。即它會對服務提供者所處的Zone和服務消費者所處的Zone進行比較,過濾掉哪些不是同一個區域的實例。
綜上,DynamicServerListLoadBalancer主要是實現了服務實例清單在運行期間的動態更新能力,同時提供了對服務實例清單的過濾功能。
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer是DynamicServerListLoadBalancer的子類,ZoneAwareLoadBalancer的出現主要是為了彌補DynamicServerListLoadBalancer的不足。由于DynamicServerListLoadBalancer中并沒有重寫chooseServer方法,所以DynamicServerListLoadBalancer中負責均衡的策略依然是我們在BaseLoadBalancer中分析出來的線性輪詢策略,這種策略不具備區域感知功能,這樣當需要跨區域調用時,可能會產生高延遲。ZoneAwareLoadBalancer重寫了setServerListForZones方法,該方法在其父類中的功能主要是根據區域Zone分組的實例列表,為負載均衡器中的LoadBalancerStats對象創建ZoneStats并存入集合中,ZoneStats是一個用來存儲每個Zone的狀態和統計信息。重寫之后的setServerListForZones方法主要做了兩件事:一件是調用getLoadBalancer方法來創建負載均衡器,同時創建服務選擇策略;另一件是對Zone區域中的實例清單進行檢查,如果對應的Zone下已經沒有實例了,則將Zone區域的實例列表清空,防止節點選擇時出現異常。
2.我們來看調用堆棧分析
首先回到第9步,自動配置生成
package org.springframework.cloud.netflix.ribbon;/*** @author Dave Syer* @author Tim Ysewyn*/ @SuppressWarnings("deprecation") @Configuration @EnableConfigurationProperties //Order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean@ConditionalOnMissingBeanpublic ILoadBalancer ribbonLoadBalancer(IClientConfig config,ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,IRule rule, IPing ping, ServerListUpdater serverListUpdater) {if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {return this.propertiesFactory.get(ILoadBalancer.class, config, name);}return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,serverListFilter, serverListUpdater);}從這里可以看到,會依賴自動注入ServerList,IRule等。
package org.springframework.cloud.alibaba.nacos.ribbon;import com.netflix.client.config.IClientConfig; import com.netflix.loadbalancer.ServerList; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** integrated Ribbon by default* @author xiaojing*/ @Configuration @ConditionalOnRibbonNacos public class NacosRibbonClientConfiguration {@Bean@ConditionalOnMissingBeanpublic ServerList<?> ribbonServerList(IClientConfig config) {NacosServerList serverList = new NacosServerList();serverList.initWithNiwsConfig(config);return serverList;} }因為我們使用的是NACOS,所以會注入NacosServerList.
IRule使用的是RoundRobinRule.
最后生成ZoneAwareLoadBalancer
3.接著進入到ZoneAwareLoadBalancer的構造,這里主要看DynamicServerListLoadBalancer
?
重點在restOfInit方法。
package com.netflix.loadbalancer;/*** A LoadBalancer that has the capabilities to obtain the candidate list of* servers using a dynamic source. i.e. The list of servers can potentially be* changed at Runtime. It also contains facilities wherein the list of servers* can be passed through a Filter criteria to filter out servers that do not* meet the desired criteria.* * @author stonse* */ public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);boolean isSecure = false;boolean useTunnel = false;// to keep track of modification of server listsprotected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);volatile ServerList<T> serverListImpl;volatile ServerListFilter<T> filter;protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {@Overridepublic void doUpdate() {updateListOfServers();}};protected volatile ServerListUpdater serverListUpdater;@Deprecatedpublic DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter) {this(clientConfig,rule,ping,serverList,filter,new PollingServerListUpdater());}public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,ServerList<T> serverList, ServerListFilter<T> filter,ServerListUpdater serverListUpdater) {super(clientConfig, rule, ping);this.serverListImpl = serverList;this.filter = filter;this.serverListUpdater = serverListUpdater;if (filter instanceof AbstractServerListFilter) {((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());}restOfInit(clientConfig);}void restOfInit(IClientConfig clientConfig) {boolean primeConnection = this.isEnablePrimingConnections();// turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()this.setEnablePrimingConnections(false);enableAndInitLearnNewServersFeature();updateListOfServers();if (primeConnection && this.getPrimeConnections() != null) {this.getPrimeConnections().primeConnections(getReachableServers());}this.setEnablePrimingConnections(primeConnection);LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());}?
?這個ServerListUpdater就是一個定時任務更新服務列表。
?具體的動作為updateAction,即為DynamicServerListLoadBalancer的updateAction
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {@Overridepublic void doUpdate() {updateListOfServers();} }; @VisibleForTesting public void updateListOfServers() {List<T> servers = new ArrayList<T>();if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);if (filter != null) {servers = filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);}}updateAllServerList(servers); }最后也是updateAllServerList去獲取服務列表。
?serverListImpl.getUpdatedListOfServers();就是從NacosServerList服務注冊中心獲取實例列表。
?獲取完后setServersList去設置服務列表。
?
至此,服務列表就獲取到allServer,upServer了。
4.然后使用IRULE策略來選擇服務,IRULE就會從綁定的LOADBALANCER取出所有的服務列表,然后去選擇一個。
?
?
到這里選擇完畢。?
總結
以上是生活随笔為你收集整理的springcloud ribbon @LoadBalance负载均衡源码流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redisson 管道批量发送命令流程分
- 下一篇: springcloud ribbon 配