javascript
SpringCloud源码:Ribbon负载均衡分析
本文主要分析 SpringCloud 中 Ribbon 負載均衡流程和原理。
SpringCloud版本為:Edgware.RELEASE。
一.時序圖
和以前一樣,先把圖貼出來,直觀一點:
二.源碼分析
我們先從 contoller 里面看如何使用 Ribbon 來負載均衡的:
@GetMapping("/user/{id}")public User findById(@PathVariable Long id) {//return this.restTemplate.getForObject("http://192.168.2.110:8001/" + id, User.class);return this.restTemplate.getForObject("http://microservice-provider-user/" + id, User.class);}復制代碼可以看到,在整合 Ribbon 之前,請求Rest是通過IP端口直接請求。整合 Ribbon 之后,請求的地址改成了 http://applicationName ,官方取名為虛擬主機名(virtual host name),當 Ribbon 和 Eureka 配合使用時,會自動將虛擬主機名轉換為微服務的實際IP地址,我們后面會分析這個過程。
首先從 RestTemplate#getForObject 開始:
public <T> T getForObject(String url, Class<T> responseType, Object... uriVariables) throws RestClientException {// 設置RequestCallback的返回類型responseTypeRequestCallback requestCallback = acceptHeaderRequestCallback(responseType);// 實例化responseExtractorHttpMessageConverterExtractor<T> responseExtractor =new HttpMessageConverterExtractor<T>(responseType, getMessageConverters(), logger);return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);}復制代碼接著執行到 RestTemplate 的 execute,主要是拼裝URI,如果存在baseUrl,則插入baseUrl。拼裝好后,進入實際"執行"請求的地方:
public <T> T execute(String url, HttpMethod method, RequestCallback requestCallback,ResponseExtractor<T> responseExtractor, Object... uriVariables) throws RestClientException {// 組裝 URIURI expanded = getUriTemplateHandler().expand(url, uriVariables);// 實際"執行"的地方return doExecute(expanded, method, requestCallback, responseExtractor);}復制代碼RestTemplate#doExecute,實際“執行”請求的地方,執行超過后,返回 response:
protected <T> T doExecute(URI url, HttpMethod method, RequestCallback requestCallback,ResponseExtractor<T> responseExtractor) throws RestClientException {ClientHttpResponse response = null;try {// 實例化請求,url為請求地址,method為GETClientHttpRequest request = createRequest(url, method);if (requestCallback != null) {// AcceptHeaderRequestCallbackrequestCallback.doWithRequest(request);}// 實際處理請求的地方response = request.execute();// 處理response,記錄日志和調用對應的錯誤處理器handleResponse(url, method, response);if (responseExtractor != null) {// 使用前面的HttpMessageConverterExtractor從Response里面抽取數據return responseExtractor.extractData(response);}else {return null;}}......}復制代碼到了請求被執行的地方,AbstractClientHttpRequest#execute,跳轉到 executeInternal:
public final ClientHttpResponse execute() throws IOException {// 斷言請求還沒被執行過assertNotExecuted();// 跳轉到 executeInternal 處理請求ClientHttpResponse result = executeInternal(this.headers);// 標記請求為已經執行過this.executed = true;return result;}復制代碼AbstractBufferingClientHttpRequest#executeInternal,AbstractBufferingClientHttpRequest是AbstractClientHttpRequest的子抽象類,作用是緩存output,使用了一個字節數組輸出流:
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {// 首次進來,bytes內容為空byte[] bytes = this.bufferedOutput.toByteArray();if (headers.getContentLength() < 0) {// 設置 Content-Length 為 1headers.setContentLength(bytes.length);}// 模板方法,跳轉到了實現類中的方法,InterceptingClientHttpRequest#executeInternalClientHttpResponse result = executeInternal(headers, bytes);// 拿到結果后,清空緩存this.bufferedOutput = null;return result;}復制代碼executeInternal是一個抽象方法,跳轉到了其實現類InterceptingClientHttpRequest#executeInternal:
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();// InterceptingRequestExecution是一個內部類return requestExecution.execute(this, bufferedOutput);}// 內部類,負責執行請求private class InterceptingRequestExecution implements ClientHttpRequestExecution {private final Iterator<ClientHttpRequestInterceptor> iterator;// 所有HttpRequest的攔截器public InterceptingRequestExecution() {this.iterator = interceptors.iterator();}@Overridepublic ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {if (this.iterator.hasNext()) {// 如果還有下一個攔截器,則執行其攔截方法// 這里的攔截器是 MetricsClientHttpRequestInterceptor,對應"metrics"信息,記錄執行時間和結果ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();// 執行攔截方法return nextInterceptor.intercept(request, body, this);}......}}復制代碼跳轉到了攔截器 MetricsClientHttpRequestInterceptor 的攔截方法:
public ClientHttpResponse intercept(HttpRequest request, byte[] body,ClientHttpRequestExecution execution) throws IOException {long startTime = System.nanoTime();// 標記開始執行時間ClientHttpResponse response = null;try {// 傳入請求和Body,處理執行,又跳轉回 InterceptingRequestExecutionresponse = execution.execute(request, body);return response;}finally {// 在執行完方法,返回response之前,記錄一下執行的信息SmallTagMap.Builder builder = SmallTagMap.builder();for (MetricsTagProvider tagProvider : tagProviders) {for (Map.Entry<String, String> tag : tagProvider.clientHttpRequestTags(request, response).entrySet()) {builder.add(Tags.newTag(tag.getKey(), tag.getValue()));}}MonitorConfig.Builder monitorConfigBuilder = MonitorConfig.builder(metricName);monitorConfigBuilder.withTags(builder);// 記錄執行時間servoMonitorCache.getTimer(monitorConfigBuilder.build()).record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);}}復制代碼又跳轉回了 InterceptingRequestExecution,下個攔截器是 - LoadBalancerInterceptor,最后的Boss,調用LoadBalancerClient完成請求的負載。
LoadBalancerInterceptor#intercept,主角登場了,終于等到你,還好沒放棄:
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,final ClientHttpRequestExecution execution) throws IOException {// 獲取原始URIfinal URI originalUri = request.getURI();// 獲取請求中的服務名字,也就是所謂的"虛擬主機名"String serviceName = originalUri.getHost();// 轉由 LoadBalancerClient 處理請求return this.loadBalancer.execute(serviceName, requestFactory.createRequest(request, body, execution));}復制代碼下面空一行先停下來休息一下,然后看看,負載均衡是怎樣實現的。
LoadBalancerInterceptor這里默認的實現是 RibbonLoadBalancerClient,Ribbon是Netflix發布的負載均衡器。
RibbonLoadBalancerClient#execute,負載均衡算法選出實際處理請求的Server:
public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {// serviceId即前面的虛擬主機名 "microservice-provider-user",獲取loadBalancer//這里獲取到的是 DynamicServerListLoadBalancerILoadBalancer loadBalancer = getLoadBalancer(serviceId);// 基于loadBalancer,選擇實際處理請求的服務提供者Server server = getServer(loadBalancer);if (server == null) {throw new IllegalStateException("No instances available for " + serviceId);}RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,serviceId), serverIntrospector(serviceId).getMetadata(server));return execute(serviceId, ribbonServer, request);}復制代碼RibbonLoadBalancerClient#getServer,轉交 loadBalancer 選擇Server:
protected Server getServer(ILoadBalancer loadBalancer) {if (loadBalancer == null) {return null;}// 由 loadBalancer 完成選Server的重任,這里的 key 是默認值 "default"return loadBalancer.chooseServer("default"); // TODO: better handling of key}復制代碼chooseServer也是一個抽象的模板方法,最后的實現是 ZoneAwareLoadBalancer#chooseServer:
public Server chooseServer(Object key) {if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {logger.debug("Zone aware logic disabled or there is only one zone");// 到了 BaseLoadBalancer的chooseServerreturn super.chooseServer(key);}......}復制代碼BaseLoadBalancer#chooseServer,轉交規則來選擇Server:
public Server chooseServer(Object key) {if (counter == null) {counter = createCounter();}// counter是一個計數器,起始值是"0",下面自增一次,變為 "1"counter.increment();if (rule == null) {return null;} else {try {// 默認的挑選規則是 "ZoneAvoidanceRule"return rule.choose(key);} catch (Exception e) {logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);return null;}}}復制代碼PredicateBasedRule是ZoneAvoidanceRule的父類。PredicateBasedRule#choose,可以看到,基礎負載規則采用的是"RoundRobin"即輪詢的方式:
public Server choose(Object key) {ILoadBalancer lb = getLoadBalancer();Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);if (server.isPresent()) {return server.get();} else {return null;} }復制代碼下面分析"輪詢"的過程,AbstractServerPredicate#chooseRoundRobinAfterFiltering,傳入Server列表的長度,自增取模實現:
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {// 首先拿到所有"合格"的ServerList<Server> eligible = getEligibleServers(servers, loadBalancerKey);if (eligible.size() == 0) {return Optional.absent();}// 在 incrementAndGetModulo 中獲取,"自增取模"return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));}復制代碼AbstractServerPredicate#incrementAndGetModulo,維護了一個nextIndex,記錄下次請求的下標:
private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextIndex.get();// 第一次 current是"0"int next = (current + 1) % modulo;// current+1對size取模,作為下次的"current"// "0" == current,則以原子方式將該值設置為 nextif (nextIndex.compareAndSet(current, next))return current;}}復制代碼最后,我們通過控制臺來驗證一下請求是不是"輪詢"分配到服務提供者的,本地啟動了8000和8001兩個Provider:
2018-12-09 18:55:30.794 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8001 2018-12-09 18:55:33.196 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8000 2018-12-09 18:55:34.713 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8001 2018-12-09 18:55:34.975 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8000 2018-12-09 18:55:35.175 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8001 2018-12-09 18:55:35.351 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8000 2018-12-09 18:55:35.534 c.i.c.s.user.controller.MovieController : microservice-provider-user:192.168.2.117:8001復制代碼可以看到,請求確實被輪詢給兩個Provider處理的。
至此,我們完成了 SpringCloud 中 Ribbon 負載均衡的過程,知道了默認采用的是"輪詢"的方式,實現是通過維護一個index,自增后取模來作為下標挑選實際響應請求的Server。除了輪詢的方式,還有隨機等算法。感興趣可以按照類似思路分析測試一下。
轉載于:https://juejin.im/post/5c0e13de5188256d0e5ac030
總結
以上是生活随笔為你收集整理的SpringCloud源码:Ribbon负载均衡分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 三种特质 做领导
- 下一篇: webpack与babel的深奥,渣渣的