eureka 集群失败的原因_eureka集群中的疑问?
題主的問(wèn)題描述太繞了,我們先把集群中的角色定義下:
Eureka架構(gòu)
比較細(xì)節(jié)的架構(gòu)圖如下所示:
在配置多個(gè)EurekaServer的Service Provider,每次Service Provider啟動(dòng)的時(shí)候會(huì)選擇一個(gè)Eureka Server,之后如果這個(gè)Eureka Server掛了,才會(huì)切換Eureka Server,在當(dāng)前使用的Eureka Server掛掉之前,不會(huì)切換。
被Service Provider選擇用來(lái)發(fā)送請(qǐng)求Eureka Server其實(shí)比其他Server多了一項(xiàng)工作,就是發(fā)客戶端發(fā)來(lái)的請(qǐng)求,轉(zhuǎn)發(fā)到集群中其他的Eureka Server。其實(shí)這個(gè)壓力并沒(méi)有太大,但是如果集群中實(shí)例個(gè)數(shù)比較多,或者心跳間隔比較短的情況下,的確有不小的壓力。可以考慮每個(gè)服務(wù)配置的Eureka Server順序不一樣。
但是其實(shí)仔細(xì)想想,只是個(gè)請(qǐng)求轉(zhuǎn)發(fā),能有多大壓力啊。。。。
最后,我們?cè)敿?xì)分析下服務(wù)注冊(cè)與取消的源代碼(可以直接參考下我的博客關(guān)于Eureka的系列分析張哈希的博客 - CSDN博客?blog.csdn.net
):
關(guān)于服務(wù)注冊(cè)開(kāi)啟/關(guān)閉服務(wù)注冊(cè)配置:eureka.client.register-with-eureka = true (默認(rèn))
什么時(shí)候注冊(cè)?應(yīng)用第一次啟動(dòng)時(shí),初始化EurekaClient時(shí),應(yīng)用狀態(tài)改變:從STARTING變?yōu)閁P會(huì)觸發(fā)這個(gè)Listener,調(diào)用instanceInfoReplicator.onDemandUpdate(); 可以推測(cè)出,實(shí)例狀態(tài)改變時(shí),也會(huì)通過(guò)注冊(cè)接口更新實(shí)例狀態(tài)信息
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};定時(shí)任務(wù),如果InstanceInfo發(fā)生改變,也會(huì)通過(guò)注冊(cè)接口更新信息
public void run() {
try {
discoveryClient.refreshInstanceInfo();
//如果實(shí)例信息發(fā)生改變,則需要調(diào)用register更新InstanceInfo
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}在定時(shí)renew時(shí),如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊(cè)失敗或者注冊(cè)過(guò)期導(dǎo)致的。這時(shí)需要調(diào)用register重新注冊(cè)
boolean renew() {
EurekaHttpResponse httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
//如果renew接口返回404(代表這個(gè)實(shí)例在EurekaServer上面找不到),可能是之前注冊(cè)失敗或者注冊(cè)過(guò)期導(dǎo)致的
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
return false;
}
}
向Eureka發(fā)送注冊(cè)請(qǐng)求EurekaServer發(fā)生了什么?
主要有兩個(gè)存儲(chǔ),一個(gè)是之前提到過(guò)的registry,還有一個(gè)最近變化隊(duì)列,后面我們會(huì)知道,這個(gè)最近變化隊(duì)列里面就是客戶端獲取增量實(shí)例信息的內(nèi)容:
# 整體注冊(cè)信息緩存
private final ConcurrentHashMap>> registry = new ConcurrentHashMap>>();
# 最近變化隊(duì)列
private ConcurrentLinkedQueue recentlyChangedQueue = new ConcurrentLinkedQueue();
EurekaServer收到實(shí)例注冊(cè)主要分兩步:調(diào)用父類方法注冊(cè)
同步到其他EurekaServer實(shí)例
public void register(InstanceInfo info, boolean isReplication) {
int leaseDuration = 90;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//調(diào)用父類方法注冊(cè)
super.register(info, leaseDuration, isReplication);
//同步到其他EurekaServer實(shí)例
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
}
我們先看同步到其他EurekaServer實(shí)例
其實(shí)就是,注冊(cè)到的EurekaServer再依次調(diào)用其他集群內(nèi)的EurekaServer的Register方法將實(shí)例信息同步過(guò)去
private void replicateToPeers(Action action, String appName, String id,
InstanceInfo info /* optional */,
InstanceStatus newStatus /* optional */, boolean isReplication) {
Stopwatch tracer = action.getTimer().start();
try {
if (isReplication) {
numberOfReplicationsLastMin.increment();
}
// If it is a replication already, do not replicate again as this will create a poison replication
if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
return;
}
for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
// If the url represents this host, do not replicate to yourself.
if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
continue;
}
replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
}
} finally {
tracer.stop();
}
}
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry = null;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
}
}
然后看看調(diào)用父類方法注冊(cè):
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
//register雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋
read.lock();
//從registry中查看這個(gè)app是否存在
Map> gMap = registry.get(registrant.getAppName());
//不存在就創(chuàng)建
if (gMap == null) {
final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//查看這個(gè)app的這個(gè)實(shí)例是否已存在
Lease existingLease = gMap.get(registrant.getId());
if (existingLease != null && (existingLease.getHolder() != null)) {
//如果已存在,對(duì)比時(shí)間戳,保留比較新的實(shí)例信息......
} else {
// 如果不存在,證明是一個(gè)新的實(shí)例
//更新自我保護(hù)監(jiān)控變量的值的代碼.....
}
Lease lease = new Lease(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
//放入registry
gMap.put(registrant.getId(), lease);
//加入最近修改的記錄隊(duì)列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
//初始化狀態(tài),記錄時(shí)間等相關(guān)代碼......
//主動(dòng)讓Response緩存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
} finally {
read.unlock();
}
}
總結(jié)起來(lái),就是主要三件事:
1.將實(shí)例注冊(cè)信息放入或者更新registry
2.將實(shí)例注冊(cè)信息加入最近修改的記錄隊(duì)列
3.主動(dòng)讓Response緩存失效
我們來(lái)類比下服務(wù)取消
服務(wù)取消CANCEL
protected boolean internalCancel(String appName, String id, boolean isReplication) {
try {
//cancel雖然看上去好像是修改,但是這里用的是讀鎖,后面會(huì)解釋
read.lock();
//從registry中剔除這個(gè)實(shí)例
Map> gMap = registry.get(appName);
Lease leaseToCancel = null;
if (gMap != null) {
leaseToCancel = gMap.remove(id);
}
if (leaseToCancel == null) {
logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
return false;
} else {
//改變狀態(tài),記錄狀態(tài)修改時(shí)間等相關(guān)代碼......
if (instanceInfo != null) {
instanceInfo.setActionType(ActionType.DELETED);
//加入最近修改的記錄隊(duì)列
recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
}
//主動(dòng)讓Response緩存失效
invalidateCache(appName, vip, svip);
logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
return true;
}
} finally {
read.unlock();
}
}
總結(jié)起來(lái),也是主要三件事:
1.從registry中剔除這個(gè)實(shí)例
2.將實(shí)例注冊(cè)信息加入最近修改的記錄隊(duì)列
3.主動(dòng)讓Response緩存失效
這里我們注意到了這個(gè)最近修改隊(duì)列,我們來(lái)詳細(xì)看看
最近修改隊(duì)列
這個(gè)最近修改隊(duì)列和消費(fèi)者定時(shí)獲取服務(wù)實(shí)例列表有著密切的關(guān)系
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
@Override
public void run() {
Iterator it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}
這個(gè)RetentionTimeInMSInDeltaQueue默認(rèn)是180s(配置是eureka.server.retention-time-in-m-s-in-delta-queue,默認(rèn)是180s,官網(wǎng)寫(xiě)錯(cuò)了),可以看出這個(gè)隊(duì)列是一個(gè)長(zhǎng)度為180s的滑動(dòng)窗口,保存最近180s以內(nèi)的應(yīng)用實(shí)例信息修改,后面我們會(huì)看到,客戶端調(diào)用獲取增量信息,實(shí)際上就是從這個(gè)queue中讀取,所以可能一段時(shí)間內(nèi)讀取到的信息都是一樣的。
關(guān)于服務(wù)與實(shí)例列表獲取
EurekaClient端
我們從Ribbon說(shuō)起:EurekaClient也存在緩存,應(yīng)用服務(wù)實(shí)例列表信息在每個(gè)EurekaClient服務(wù)消費(fèi)端都有緩存。一般的,Ribbon的LoadBalancer會(huì)讀取這個(gè)緩存,來(lái)知道當(dāng)前有哪些實(shí)例可以調(diào)用,從而進(jìn)行負(fù)載均衡。這個(gè)loadbalancer同樣也有緩存。
首先看這個(gè)LoadBalancer的緩存更新機(jī)制,相關(guān)類是PollingServerListUpdater:
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
//從EurekaClient緩存中獲取服務(wù)實(shí)例列表,保存在本地緩存
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
//定時(shí)調(diào)度
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
這個(gè)updateAction.doUpdate();就是從EurekaClient緩存中獲取服務(wù)實(shí)例列表,保存在BaseLoadBalancer的本地緩存:
protected volatile List allServerList = Collections.synchronizedList(new ArrayList());
public void setServersList(List lsrv) {
//寫(xiě)入allServerList的代碼,這里略
}
@Override
public List getAllServers() {
return Collections.unmodifiableList(allServerList);
}
這里的getAllServers會(huì)在每個(gè)負(fù)載均衡規(guī)則中被調(diào)用,例如RoundRobinRule:
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List reachableServers = lb.getReachableServers();
//獲取服務(wù)實(shí)例列表,調(diào)用的就是剛剛提到的getAllServers
List allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
int nextServerIndex = incrementAndGetModulo(serverCount);
server = allServers.get(nextServerIndex);
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
這個(gè)緩存需要注意下,有時(shí)候我們只修改了EurekaClient緩存的更新時(shí)間,但是沒(méi)有修改這個(gè)LoadBalancer的刷新本地緩存時(shí)間,就是ribbon.ServerListRefreshInterval,這個(gè)參數(shù)可以設(shè)置的很小,因?yàn)闆](méi)有從網(wǎng)絡(luò)讀取,就是從一個(gè)本地緩存刷到另一個(gè)本地緩存(如何配置緩存配置來(lái)實(shí)現(xiàn)服務(wù)實(shí)例快速下線快速感知快速刷新,可以參考我的另一篇文章)。
然后我們來(lái)看一下EurekaClient本身的緩存,直接看關(guān)鍵類DiscoveryClient的相關(guān)源碼,我們這里只關(guān)心本地Region的,多Region配置我們先忽略:
//本地緩存,可以理解為是一個(gè)軟鏈接
private final AtomicReference localRegionApps = new AtomicReference();
private void initScheduledTasks() {
//如果配置為需要拉取服務(wù)列表,則設(shè)置定時(shí)拉取任務(wù),這個(gè)配置默認(rèn)是需要拉取服務(wù)列表
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//其他定時(shí)任務(wù)初始化的代碼,忽略
}
//定時(shí)從EurekaServer拉取服務(wù)列表的任務(wù)
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
void refreshRegistry() {
try {
//多Region配置處理代碼,忽略
boolean success = fetchRegistry(remoteRegionsModified);
if (success) {
registrySize = localRegionApps.get().size();
lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
}
//日志代碼,忽略
} catch (Throwable e) {
logger.error("Cannot fetch registry from server", e);
}
}
//定時(shí)從EurekaServer拉取服務(wù)列表的核心方法
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
Applications applications = getApplications();
//判斷,如果是第一次拉取,或者app列表為空,就進(jìn)行全量拉取,否則就會(huì)進(jìn)行增量拉取
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
getAndStoreFullRegistry();
} else {
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//緩存更新完成,發(fā)送個(gè)event給觀察者,目前沒(méi)啥用
onCacheRefreshed();
// 檢查下遠(yuǎn)端的服務(wù)實(shí)例列表里面包括自己,并且狀態(tài)是否對(duì),這里我們不關(guān)心
updateInstanceRemoteStatus();
// registry was fetched successfully, so return true
return true;
}
//全量拉取代碼
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications apps = null;
//訪問(wèn)/eureka/apps接口,拉取所有服務(wù)實(shí)例信息
EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());
if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}
//增量拉取代碼
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();
Applications delta = null;
//訪問(wèn)/eureka/delta接口,拉取所有服務(wù)實(shí)例增量信息
EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}
if (delta == null) {
//如果delta為空,拉取增量失敗,就全量拉取
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
//這里設(shè)置原子鎖的原因是怕某次調(diào)度網(wǎng)絡(luò)請(qǐng)求時(shí)間過(guò)長(zhǎng),導(dǎo)致同一時(shí)間有多線程拉取到增量信息并發(fā)修改
//拉取增量成功,檢查hashcode是否一樣,不一樣的話也會(huì)全量拉取
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) {
try {
updateDelta(delta);
reconcileHashCode = getReconcileHashCode(applications);
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}
以上就是對(duì)于EurekaClient拉取服務(wù)實(shí)例信息的源代碼分析,總結(jié)EurekaClient 重要緩存如下:EurekaClient第一次全量拉取,定時(shí)增量拉取應(yīng)用服務(wù)實(shí)例信息,保存在緩存中。
EurekaClient增量拉取失敗,或者增量拉取之后對(duì)比hashcode發(fā)現(xiàn)不一致,就會(huì)執(zhí)行全量拉取,這樣避免了網(wǎng)絡(luò)某時(shí)段分片帶來(lái)的問(wèn)題。
同時(shí)對(duì)于服務(wù)調(diào)用,如果涉及到ribbon負(fù)載均衡,那么ribbon對(duì)于這個(gè)實(shí)例列表也有自己的緩存,這個(gè)緩存定時(shí)從EurekaClient的緩存更新
EurekaServer端
在EurekaServer端,所有的讀取請(qǐng)求都是讀的ReadOnlyMap(這個(gè)可以配置) 有定時(shí)任務(wù)會(huì)定時(shí)從ReadWriteMap同步到ReadOnlyMap這個(gè)時(shí)間配置是:
#eureka server刷新readCacheMap的時(shí)間,注意,client讀取的是readCacheMap,這個(gè)時(shí)間決定了多久會(huì)把readWriteCacheMap的緩存更新到readCacheMap上
#默認(rèn)30s
eureka.server.responseCacheUpdateInvervalMs=3000
相關(guān)代碼:
if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache", th);
}
}
}
};
}
ReadWriteMap是一個(gè)LoadingCache,將Registry中的服務(wù)實(shí)例信息封裝成要返回的http響應(yīng)(分別是經(jīng)過(guò)gzip壓縮和非壓縮的),同時(shí)還有兩個(gè)特殊key,ALL_APPS和ALL_APPS_DELTA ALL_APPS就是所有服務(wù)實(shí)例信息 ALL_APPS_DELTA就是之前講注冊(cè)說(shuō)的RecentlyChangedQueue里面的實(shí)例列表封裝的http響應(yīng)信息
總結(jié)
以上是生活随笔為你收集整理的eureka 集群失败的原因_eureka集群中的疑问?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 8数据提供什么掩膜产品_工业轨式1-8路
- 下一篇: nginx 上传文件漏洞_文件上传及解析