當(dāng)前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
Spring Cloud Feign - 内部实现细节
生活随笔
收集整理的這篇文章主要介紹了
Spring Cloud Feign - 内部实现细节
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
1. 概述
Feign用于服務(wù)間調(diào)用,它的內(nèi)部實(shí)現(xiàn)是一個包含Ribbon(負(fù)載均衡)的JDK-HttpURLConnection(Http)調(diào)用。雖然調(diào)用形式是類似于RPC,但是實(shí)際調(diào)用是Http,這也是為什么Feign被稱為偽RPC調(diào)用的原因。
內(nèi)部調(diào)用過程如下:
?
2. 代碼細(xì)節(jié)
1) BaseLoadBalancer.java配置初始化
重點(diǎn)功能: 1. 初始化負(fù)載均衡策略 2. 初始化取服務(wù)注冊列表調(diào)度策略
?
void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {...// 每隔30s Ping一次int pingIntervalTime = Integer.parseInt(""+ clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30")));// 每次最多Ping 2sint maxTotalPingTime = Integer.parseInt(""+ clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2")));setPingInterval(pingIntervalTime);setMaxTotalPingTime(maxTotalPingTime);// cross associate with each other// i.e. Rule,Ping meet your container LB// LB, these are your Ping and Rule guys ...// 設(shè)置負(fù)載均衡規(guī)則setRule(rule);// 初始化取服務(wù)注冊列表調(diào)度策略setPing(ping);setLoadBalancerStats(stats);rule.setLoadBalancer(this);... }2) 負(fù)載均衡策略初始化
重點(diǎn)功能: 1. 默認(rèn)使用輪詢策略
- BaseLoadBalancer.java
?
public void setRule(IRule rule) {if (rule != null) {this.rule = rule;} else {/* default rule */// 默認(rèn)使用輪詢策略this.rule = new RoundRobinRule();}if (this.rule.getLoadBalancer() != this) {this.rule.setLoadBalancer(this);} }- RoundRobinRule.java
?
private AtomicInteger nextServerCyclicCounter;public Server choose(ILoadBalancer lb, Object key) {if (lb == null) {log.warn("no load balancer");return null;}Server server = null;int count = 0;while (server == null && count++ < 10) {List<Server> reachableServers = lb.getReachableServers();List<Server> allServers = lb.getAllServers();int upCount = reachableServers.size();int serverCount = allServers.size();if ((upCount == 0) || (serverCount == 0)) {log.warn("No up servers available from load balancer: " + lb);return null;}// 輪詢重點(diǎn)算法int nextServerIndex = incrementAndGetModulo(serverCount);server = allServers.get(nextServerIndex);if (server == null) {/* Transient. */Thread.yield();continue;}if (server.isAlive() && (server.isReadyToServe())) {return (server);}// Next.server = null;}if (count >= 10) {log.warn("No available alive servers after 10 tries from load balancer: "+ lb);}return server; }private int incrementAndGetModulo(int modulo) {for (;;) {int current = nextServerCyclicCounter.get();int next = (current + 1) % modulo;if (nextServerCyclicCounter.compareAndSet(current, next))return next;} }3) 初始化取服務(wù)注冊列表調(diào)度策略
重點(diǎn)功能: 1. 設(shè)置輪詢間隔為30s 一次
注意: 這里沒有做實(shí)際的Ping,只是獲取緩存的注冊列表的alive服務(wù),原因是為了提高性能
- BaseLoadBalancer.java
?
public void setPing(IPing ping) {if (ping != null) {if (!ping.equals(this.ping)) {this.ping = ping;setupPingTask(); // since ping data changed}} else {this.ping = null;// cancel the timer tasklbTimer.cancel();} }void setupPingTask() {if (canSkipPing()) {return;}if (lbTimer != null) {lbTimer.cancel();}lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,true);// 這里雖然默認(rèn)設(shè)置是10s一次,但是在初始化的時候,設(shè)置了30s一次lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);forceQuickPing(); }class Pinger {private final IPingStrategy pingerStrategy;public Pinger(IPingStrategy pingerStrategy) {this.pingerStrategy = pingerStrategy;}public void runPinger() throws Exception {if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do}// we are "in" - we get to PingServer[] allServers = null;boolean[] results = null;Lock allLock = null;Lock upLock = null;try {/** The readLock should be free unless an addServer operation is* going on...*/allLock = allServerLock.readLock();allLock.lock();allServers = allServerList.toArray(new Server[allServerList.size()]);allLock.unlock();int numCandidates = allServers.length;results = pingerStrategy.pingServers(ping, allServers);final List<Server> newUpList = new ArrayList<Server>();final List<Server> changedServers = new ArrayList<Server>();for (int i = 0; i < numCandidates; i++) {boolean isAlive = results[i];Server svr = allServers[i];boolean oldIsAlive = svr.isAlive();svr.setAlive(isAlive);if (oldIsAlive != isAlive) {changedServers.add(svr);logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));}if (isAlive) {newUpList.add(svr);}}upLock = upServerLock.writeLock();upLock.lock();upServerList = newUpList;upLock.unlock();notifyServerStatusChangeListener(changedServers);} finally {pingInProgress.set(false);}} }private static class SerialPingStrategy implements IPingStrategy {@Overridepublic boolean[] pingServers(IPing ping, Server[] servers) {int numCandidates = servers.length;boolean[] results = new boolean[numCandidates];logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);for (int i = 0; i < numCandidates; i++) {results[i] = false; /* Default answer is DEAD. */try {// NOTE: IFF we were doing a real ping// assuming we had a large set of servers (say 15)// the logic below will run them serially// hence taking 15 times the amount of time it takes// to ping each server// A better method would be to put this in an executor// pool// But, at the time of this writing, we dont REALLY// use a Real Ping (its mostly in memory eureka call)// hence we can afford to simplify this design and run// this// seriallyif (ping != null) {results[i] = ping.isAlive(servers[i]);}} catch (Exception e) {logger.error("Exception while pinging Server: '{}'", servers[i], e);}}return results;} }4) 最后拼接完整URL使用JDK-HttpURLConnection進(jìn)行調(diào)用
- SynchronousMethodHandler.java(io.github.openfeign:feign-core:10.10.1/feign.SynchronousMethodHandler.java)
?
Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {Request request = this.targetRequest(template);if (this.logLevel != Level.NONE) {this.logger.logRequest(this.metadata.configKey(), this.logLevel, request);}long start = System.nanoTime();Response response;try {response = this.client.execute(request, options);response = response.toBuilder().request(request).requestTemplate(template).build();} catch (IOException var13) {if (this.logLevel != Level.NONE) {this.logger.logIOException(this.metadata.configKey(), this.logLevel, var13, this.elapsedTime(start));}throw FeignException.errorExecuting(request, var13);}... }- LoadBalancerFeignClient.java
?
@Override public Response execute(Request request, Request.Options options) throws IOException {try {URI asUri = URI.create(request.url());String clientName = asUri.getHost();URI uriWithoutHost = cleanUrl(request.url(), clientName);FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(this.delegate, request, uriWithoutHost);IClientConfig requestConfig = getClientConfig(options, clientName);return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();}catch (ClientException e) {IOException io = findIOException(e);if (io != null) {throw io;}throw new RuntimeException(e);} }- AbstractLoadBalancerAwareClient.java
?
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);try {return command.submit(new ServerOperation<T>() {@Overridepublic Observable<T> call(Server server) {// 獲取真實(shí)訪問URLURI finalUri = reconstructURIWithServer(server, request.getUri());S requestForServer = (S) request.replaceUri(finalUri);try {return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));} catch (Exception e) {return Observable.error(e);}}}).toBlocking().single();} catch (Exception e) {Throwable t = e.getCause();if (t instanceof ClientException) {throw (ClientException) t;} else {throw new ClientException(e);}}}- FeignLoadBalancer.java
?
@Override public RibbonResponse execute(RibbonRequest request, IClientConfig configOverride)throws IOException {Request.Options options;if (configOverride != null) {RibbonProperties override = RibbonProperties.from(configOverride);options = new Request.Options(override.connectTimeout(this.connectTimeout),override.readTimeout(this.readTimeout));}else {options = new Request.Options(this.connectTimeout, this.readTimeout);}Response response = request.client().execute(request.toRequest(), options);return new RibbonResponse(request.getUri(), response); }- feign.Client.java
?
@Override public Response execute(Request request, Options options) throws IOException {HttpURLConnection connection = convertAndSend(request, options);return convertResponse(connection, request); }Response convertResponse(HttpURLConnection connection, Request request) throws IOException {// 使用HttpURLConnection 真實(shí)進(jìn)行Http調(diào)用int status = connection.getResponseCode();String reason = connection.getResponseMessage();if (status < 0) {throw new IOException(format("Invalid status(%s) executing %s %s", status,connection.getRequestMethod(), connection.getURL()));}Map<String, Collection<String>> headers = new LinkedHashMap<>();for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) {// response messageif (field.getKey() != null) {headers.put(field.getKey(), field.getValue());}}Integer length = connection.getContentLength();if (length == -1) {length = null;}InputStream stream;if (status >= 400) {stream = connection.getErrorStream();} else {stream = connection.getInputStream();}return Response.builder().status(status).reason(reason).headers(headers).request(request).body(stream, length).build(); }參考
?
總結(jié)
以上是生活随笔為你收集整理的Spring Cloud Feign - 内部实现细节的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Windows 上配置Docker De
- 下一篇: 揪出XXL-JOB中的细节