Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计
降級熔斷框架 Hystrix 源碼解析:滑動(dòng)窗口統(tǒng)計(jì)
概述
Hystrix 是一個(gè)開源的降級熔斷框架,用于提高服務(wù)可靠性,適用于依賴大量外部服務(wù)的業(yè)務(wù)系統(tǒng)。什么是降級熔斷呢?
降級
業(yè)務(wù)降級,是指犧牲非核心的業(yè)務(wù)功能,保證核心功能的穩(wěn)定運(yùn)行。簡單來說,要實(shí)現(xiàn)優(yōu)雅的業(yè)務(wù)降級,需要將功能實(shí)現(xiàn)拆分到相對獨(dú)立的不同代碼單元,分優(yōu)先級進(jìn)行隔離。在后臺通過開關(guān)控制,降級部分非主流程的業(yè)務(wù)功能,減輕系統(tǒng)依賴和性能損耗,從而提升集群的整體吞吐率。
降級的重點(diǎn)是:業(yè)務(wù)之間有優(yōu)先級之分。降級的典型應(yīng)用是:電商活動(dòng)期間關(guān)閉非核心服務(wù),保證核心買買買業(yè)務(wù)的正常運(yùn)行。
熔斷
老式電閘都安裝了保險(xiǎn)絲,一旦有人使用超大功率的設(shè)備,保險(xiǎn)絲就會燒斷以保護(hù)各個(gè)電器不被強(qiáng)電流給燒壞。同理我們的接口也需要安裝上“保險(xiǎn)絲”,以防止非預(yù)期的請求對系統(tǒng)壓力過大而引起的系統(tǒng)癱瘓,當(dāng)流量過大時(shí),可以采取拒絕或者引流等機(jī)制。
同樣在分布式系統(tǒng)中,當(dāng)被調(diào)用的遠(yuǎn)程服務(wù)無法使用時(shí),如果沒有過載保護(hù),就會導(dǎo)致請求的資源阻塞在遠(yuǎn)程服務(wù)器上耗盡資源。很多時(shí)候,剛開始可能只是出現(xiàn)了局部小規(guī)模的故障,然而由于種種原因,故障影響范圍越來越大,最終導(dǎo)致全局性的后果。這種過載保護(hù),就是熔斷器。
在 hystrix 中,熔斷相關(guān)的配置有以下幾個(gè):滑動(dòng)窗口長度,單位毫秒
hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds
滑動(dòng)窗口滾動(dòng)桶的長度,單位毫秒
hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize
觸發(fā)熔斷的失敗率閾值
hystrix.command.HystrixCommandKey.circuitBreaker.errorThresholdPercentage
觸發(fā)熔斷的請求量閾值
hystrix.command.HystrixCommandKey.circuitBreaker.requestVolumeThreshold
從配置信息里可以看出來,熔斷邏輯判斷里使用了滑動(dòng)窗口來統(tǒng)計(jì)服務(wù)調(diào)用的成功、失敗量。那么這里的滑動(dòng)窗口是如何實(shí)現(xiàn)的呢?下面我們深入源碼來研究一下。
注:使用的源碼版本是 2017-09-13 GitHub 上 master 分支最新代碼。
滑動(dòng)窗口
在 hystrix 里,大量使用了 RxJava 這個(gè)響應(yīng)式函數(shù)編程框架,滑動(dòng)窗口的實(shí)現(xiàn)也是使用了 RxJava 框架。
源碼分析
一個(gè)滑動(dòng)窗口有兩個(gè)關(guān)鍵要素組成:窗口時(shí)長、窗口滾動(dòng)時(shí)間間隔。通常一個(gè)窗口會劃分為若干個(gè)桶 bucket,每個(gè)桶的大小等于窗口滾動(dòng)時(shí)間間隔。也就是說,滑動(dòng)窗口統(tǒng)計(jì)數(shù)據(jù)時(shí),分兩步:統(tǒng)計(jì)一個(gè) bucket 內(nèi)的數(shù)據(jù);
統(tǒng)計(jì)一個(gè)窗口,即若干個(gè) bucket 的數(shù)據(jù)。
bucket 統(tǒng)計(jì)的代碼位于 BucketedCounterStream 類中,其關(guān)鍵的代碼如下所示://?這里的代碼并非全部,只展示了和?bucket?統(tǒng)計(jì)相關(guān)的關(guān)鍵代碼public?abstract?class?BucketedCounterStream?{????protected?final?int?numBuckets;????protected?final?Observable?bucketedStream;????protected?final?AtomicReference?subscription?=?new?AtomicReference(null);????private?final?Func1,?Observable>?reduceBucketToSummary;????protected?BucketedCounterStream(final?HystrixEventStream?inputEventStream,?final?int?numBuckets,?final?int?bucketSizeInMs,????????????????????????????????????final?Func2?appendRawEventToBucket)?{????????this.numBuckets?=?numBuckets;????????this.reduceBucketToSummary?=?new?Func1,?Observable>()?{????????????@Override
public?Observable?call(Observable?eventBucket)?{????????????????return?eventBucket.reduce(getEmptyBucketSummary(),?appendRawEventToBucket);
}
};????????final?List?emptyEventCountsToStart?=?new?ArrayList();????????for?(int?i?=?0;?i?
emptyEventCountsToStart.add(getEmptyBucketSummary());
}????????this.bucketedStream?=?Observable.defer(new?Func0>()?{????????????@Override
public?Observable?call()?{????????????????return?inputEventStream
.observe()
.window(bucketSizeInMs,?TimeUnit.MILLISECONDS)?//bucket?it?by?the?counter?window?so?we?can?emit?to?the?next?operator?in?time?chunks,?not?on?every?OnNext
.flatMap(reduceBucketToSummary)????????????????//for?a?given?bucket,?turn?it?into?a?long?array?containing?counts?of?event?types
.startWith(emptyEventCountsToStart);???????????//start?it?with?empty?arrays?to?make?consumer?logic?as?generic?as?possible?(windows?are?always?full)
}
});
}????abstract?Bucket?getEmptyBucketSummary();
}
首先我們看這幾行代碼,這幾行代碼功能是:將服務(wù)調(diào)用級別的輸入數(shù)據(jù)流 inputEventStream 以 bucketSizeInMs 毫秒為一個(gè)桶進(jìn)行了匯總,匯總的結(jié)果輸入到桶級別數(shù)據(jù)流 bucketedStream。this.bucketedStream?=?Observable.defer(new?Func0>()?{????????????@Override
public?Observable?call()?{????????????????return?inputEventStream
.observe()
.window(bucketSizeInMs,?TimeUnit.MILLISECONDS)?//?window?窗函數(shù)匯聚?bucketSizeInMs?毫秒內(nèi)的數(shù)據(jù)后,每隔?bucketSizeInMs?毫秒批量發(fā)送出去
.flatMap(reduceBucketToSummary)????????????????//?flatMap?方法接收到?window?窗函數(shù)發(fā)來的數(shù)據(jù),使用?reduceBucketToSummary?函數(shù)進(jìn)行匯總統(tǒng)計(jì)
.startWith(emptyEventCountsToStart);???????????//?給?bucketedStream?發(fā)布源設(shè)定一個(gè)起始值
}
});
RxJava 基于觀察者模式,又叫“發(fā)布-訂閱”模式。inputEventStream 是 HystrixEventStream 對象,其 observe() 方法返回的是一個(gè)被觀察者 Observable 對象,也可以說是一個(gè)發(fā)布源 Publisher。public?interface?HystrixEventStream?{????Observable?observe();
}
在 Hystrix 中有多種數(shù)據(jù)發(fā)布源,與服務(wù)調(diào)用的熔斷相關(guān)的是 HystrixCommandCompletionStream:每一次服務(wù)調(diào)用結(jié)束,調(diào)用 write 方法記錄成功、失敗等信息;
write 方法調(diào)用了 writeOnlySubject.onNext,writeOnlySubject 是一個(gè)線程安全的發(fā)布源 PublishSubject,用于發(fā)布 HystrixCommandCompletion 類型的數(shù)據(jù),onNext 功能是發(fā)布一個(gè)事件或數(shù)據(jù);
observe 方法返回的可訂閱數(shù)據(jù)源 readOnlyStream 是 writeOnlySubject 的只讀版本。public?class?HystrixCommandCompletionStream?implements?HystrixEventStream?{????private?final?HystrixCommandKey?commandKey;?//?服務(wù)調(diào)用標(biāo)記?key
private?final?Subject?writeOnlySubject;????private?final?Observable?readOnlyStream;
HystrixCommandCompletionStream(final?HystrixCommandKey?commandKey)?{????????this.commandKey?=?commandKey;????????this.writeOnlySubject?=?new?SerializedSubject(PublishSubject.create());????????this.readOnlyStream?=?writeOnlySubject.share();
}????public?void?write(HystrixCommandCompletion?event)?{
writeOnlySubject.onNext(event);
}????@Override
public?Observable?observe()?{????????return?readOnlyStream;
}
}
上面分析了 bucket 統(tǒng)計(jì)和事件發(fā)布源相關(guān)的代碼,下面我們再看一下 window 統(tǒng)計(jì)的代碼。滑動(dòng)窗口統(tǒng)計(jì)的代碼在 BucketedRollingCounterStream 類中,window 統(tǒng)計(jì)和 bucket 統(tǒng)計(jì)原理是一樣的,只是維度不同:bucket 統(tǒng)計(jì)的維度是時(shí)間,比如 bucketSizeInMs 毫秒;
window 統(tǒng)計(jì)的維度是若干數(shù)據(jù),在這里是 numBuckets 個(gè) bucket。
注意:numBuckets 的值等于 hystrix.command.HystrixCommandKey.circuitBreaker.sleepWindowInMilliseconds 除以 hystrix.command.HystrixCommandKey.metrics.rollingPercentile.bucketSize,numBuckets 是整數(shù),所以 sleepWindowInMilliseconds 必須是 bucketSize 的整數(shù)倍,否則 Hystrix 就會拋出異常。public?abstract?class?BucketedRollingCounterStream?extends?BucketedCounterStream?{????private?Observable?sourceStream;????private?final?AtomicBoolean?isSourceCurrentlySubscribed?=?new?AtomicBoolean(false);????protected?BucketedRollingCounterStream(HystrixEventStream?stream,?final?int?numBuckets,?int?bucketSizeInMs,???????????????????????????????????????????final?Func2?appendRawEventToBucket,???????????????????????????????????????????final?Func2?reduceBucket)?{????????super(stream,?numBuckets,?bucketSizeInMs,?appendRawEventToBucket);
Func1,?Observable>?reduceWindowToSummary?=?new?Func1,?Observable>()?{????????????@Override
public?Observable?call(Observable?window)?{????????????????return?window.scan(getEmptyOutputValue(),?reduceBucket).skip(numBuckets);
}
};????????this.sourceStream?=?bucketedStream??????//stream?broken?up?into?buckets
.window(numBuckets,?1)??????????//emit?overlapping?windows?of?buckets
.flatMap(reduceWindowToSummary)?//convert?a?window?of?bucket-summaries?into?a?single?summary
.doOnSubscribe(new?Action0()?{????????????????????@Override
public?void?call()?{
isSourceCurrentlySubscribed.set(true);
}
})
.doOnUnsubscribe(new?Action0()?{????????????????????@Override
public?void?call()?{
isSourceCurrentlySubscribed.set(false);
}
})
.share()????????????????????????//?multiple?subscribers?should?get?same?data
.onBackpressureDrop();??????????//?如果消費(fèi)者處理數(shù)據(jù)太慢導(dǎo)致數(shù)據(jù)堆積,就丟棄部分?jǐn)?shù)據(jù)
}????@Override
public?Observable?observe()?{????????return?sourceStream;
}
}
接下來我們介紹一下 BucketedRollingCounterStream 構(gòu)造函數(shù)的主要參數(shù):HystrixEventStream stream:數(shù)據(jù)發(fā)布源;
int numBuckets:每個(gè)窗口內(nèi)部 bucket 個(gè)數(shù);
int bucketSizeInMs:bucket 時(shí)長,也是窗口滾動(dòng)時(shí)間間隔;
appendRawEventToBucket:bucket 內(nèi)部統(tǒng)計(jì)函數(shù),其功能是起始值 Bucket 加上 Event 后,輸出 Bucket 類型值,對對個(gè)數(shù)據(jù)的處理具有累積的效果;
reduceBucket:和 appendRawEventToBucket 類似,用于 window 統(tǒng)計(jì)。
BucketedRollingCounterStream 提供了完整的滑動(dòng)窗口統(tǒng)計(jì)的服務(wù),想要使用滑動(dòng)窗口來統(tǒng)計(jì)數(shù)據(jù)的繼承實(shí)現(xiàn) BucketedRollingCounterStream 即可。 接下來我們看一下用于滑動(dòng)統(tǒng)計(jì)服務(wù)調(diào)用成功、失敗次數(shù)的 RollingCommandEventCounterStream 類:public?class?RollingCommandEventCounterStream?extends?BucketedRollingCounterStream?{????private?static?final?ConcurrentMap?streams?=?new?ConcurrentHashMap();????private?static?final?int?NUM_EVENT_TYPES?=?HystrixEventType.values().length;????public?static?RollingCommandEventCounterStream?getInstance(HystrixCommandKey?commandKey,?int?numBuckets,?int?bucketSizeInMs)?{
RollingCommandEventCounterStream?initialStream?=?streams.get(commandKey.name());????????if?(initialStream?!=?null)?{????????????return?initialStream;
}?else?{????????????synchronized?(RollingCommandEventCounterStream.class)?{
RollingCommandEventCounterStream?existingStream?=?streams.get(commandKey.name());????????????????if?(existingStream?==?null)?{
RollingCommandEventCounterStream?newStream?=?new?RollingCommandEventCounterStream(commandKey,?numBuckets,?bucketSizeInMs,
HystrixCommandMetrics.appendEventToBucket,?HystrixCommandMetrics.bucketAggregator);
streams.putIfAbsent(commandKey.name(),?newStream);????????????????????return?newStream;
}?else?{????????????????????return?existingStream;
}
}
}
}????private?RollingCommandEventCounterStream(HystrixCommandKey?commandKey,?int?numCounterBuckets,?int?counterBucketSizeInMs,
Func2?reduceCommandCompletion,
Func2?reduceBucket)?{????????super(HystrixCommandCompletionStream.getInstance(commandKey),?numCounterBuckets,?counterBucketSizeInMs,?reduceCommandCompletion,?reduceBucket);
}
}
RollingCommandEventCounterStream 構(gòu)造函數(shù)是私有的,需要通過 getInstance 方法來獲取實(shí)例,這么做是為了確保每個(gè)依賴服務(wù) HystrixCommandKey 只生成一個(gè) RollingCommandEventCounterStream 實(shí)例。我們看一下構(gòu)造 BucketedRollingCounterStream 的時(shí)候傳入的參數(shù),appendRawEventToBucket、reduceBucket 的實(shí)現(xiàn)分別是 HystrixCommandMetrics.appendEventToBucket、HystrixCommandMetrics.bucketAggregator,其主要功能就是一個(gè)對各種 HystrixEventType 事件的累加求和。public?class?HystrixCommandMetrics?extends?HystrixMetrics?{????private?static?final?HystrixEventType[]?ALL_EVENT_TYPES?=?HystrixEventType.values();????public?static?final?Func2?appendEventToBucket?=?new?Func2()?{????????@Override
public?long[]?call(long[]?initialCountArray,?HystrixCommandCompletion?execution)?{
ExecutionResult.EventCounts?eventCounts?=?execution.getEventCounts();????????????for?(HystrixEventType?eventType:?ALL_EVENT_TYPES)?{????????????????switch?(eventType)?{????????????????????case?EXCEPTION_THROWN:?break;?//this?is?just?a?sum?of?other?anyway?-?don't?do?the?work?here
default:
initialCountArray[eventType.ordinal()]?+=?eventCounts.getCount(eventType);????????????????????????break;
}
}????????????return?initialCountArray;
}
};????public?static?final?Func2?bucketAggregator?=?new?Func2()?{????????@Override
public?long[]?call(long[]?cumulativeEvents,?long[]?bucketEventCounts)?{????????????for?(HystrixEventType?eventType:?ALL_EVENT_TYPES)?{????????????????switch?(eventType)?{????????????????????case?EXCEPTION_THROWN:????????????????????????for?(HystrixEventType?exceptionEventType:?HystrixEventType.EXCEPTION_PRODUCING_EVENT_TYPES)?{
cumulativeEvents[eventType.ordinal()]?+=?bucketEventCounts[exceptionEventType.ordinal()];
}????????????????????????break;????????????????????default:
cumulativeEvents[eventType.ordinal()]?+=?bucketEventCounts[eventType.ordinal()];????????????????????????break;
}
}????????????return?cumulativeEvents;
}
};
}
這個(gè)滑動(dòng)窗口是在 Hystrix 哪里使用的呢?必然是熔斷邏輯里啊。熔斷邏輯位于 HystrixCircuitBreaker 類中,其使用滑動(dòng)窗口的關(guān)鍵代碼如下。主要是調(diào)用了 BucketedRollingCounterStream 的 observe 方法,對統(tǒng)計(jì)數(shù)據(jù)的發(fā)布源進(jìn)行了訂閱,收到統(tǒng)計(jì)數(shù)據(jù)后,對熔斷器狀態(tài) circuitOpened 進(jìn)行更新。/*?package?*/class?HystrixCircuitBreakerImpl?implements?HystrixCircuitBreaker?{????????private?final?HystrixCommandProperties?properties;????????private?final?HystrixCommandMetrics?metrics;????????enum?Status?{
CLOSED,?OPEN,?HALF_OPEN;
}????????private?final?AtomicReference?status?=?new?AtomicReference(Status.CLOSED);????????private?final?AtomicLong?circuitOpened?=?new?AtomicLong(-1);????????private?final?AtomicReference?activeSubscription?=?new?AtomicReference(null);????????protected?HystrixCircuitBreakerImpl(HystrixCommandKey?key,?HystrixCommandGroupKey?commandGroup,?final?HystrixCommandProperties?properties,?HystrixCommandMetrics?metrics)?{????????????this.properties?=?properties;????????????this.metrics?=?metrics;????????????//On?a?timer,?this?will?set?the?circuit?between?OPEN/CLOSED?as?command?executions?occur
Subscription?s?=?subscribeToStream();
activeSubscription.set(s);
}????????private?Subscription?subscribeToStream()?{????????????return?metrics.getHealthCountsStream()
.observe()
.subscribe(new?Subscriber()?{????????????????????????@Override
public?void?onCompleted()?{
}????????????????????????@Override
public?void?onError(Throwable?e)?{
}????????????????????????@Override
public?void?onNext(HealthCounts?hc)?{????????????????????????????//?判斷請求次數(shù),是否達(dá)到閾值。畢竟請求量太小,熔斷的意義也就不大了
if?(hc.getTotalRequests()?
}?else?{????????????????????????????????//?判斷失敗率是否達(dá)到閾值
if?(hc.getErrorPercentage()?
}?else?{????????????????????????????????????//?失敗率達(dá)到閾值,則修改熔斷狀態(tài)為?OPEN
if?(status.compareAndSet(Status.CLOSED,?Status.OPEN))?{
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
}
手動(dòng)寫一個(gè)示例
前面解析了 Hystrix 中滑動(dòng)窗口的實(shí)現(xiàn),由于考慮了各種細(xì)節(jié)其實(shí)現(xiàn)非常復(fù)雜,所以我們寫了一個(gè)簡易版本的滑動(dòng)窗口統(tǒng)計(jì),方便觀察學(xué)習(xí)。import?org.slf4j.Logger;import?org.slf4j.LoggerFactory;import?rx.Observable;import?rx.functions.Func1;import?rx.functions.Func2;import?rx.subjects.PublishSubject;import?rx.subjects.SerializedSubject;import?java.util.concurrent.TimeUnit;/**
*?模擬滑動(dòng)窗口計(jì)數(shù)
*?Created?by?albon?on?17/6/24.
*/public?class?RollingWindowTest?{????private?static?final?Logger?logger?=?LoggerFactory.getLogger(WindowTest.class);????public?static?final?Func2?INTEGER_SUM?=
(integer,?integer2)?->?integer?+?integer2;????public?static?final?Func1,?Observable>?WINDOW_SUM?=
window?->?window.scan(0,?INTEGER_SUM).skip(3);????public?static?final?Func1,?Observable>?INNER_BUCKET_SUM?=
integerObservable?->?integerObservable.reduce(0,?INTEGER_SUM);????public?static?void?main(String[]?args)?throws?InterruptedException?{
PublishSubject?publishSubject?=?PublishSubject.create();
SerializedSubject?serializedSubject?=?publishSubject.toSerialized();
serializedSubject
.window(5,?TimeUnit.SECONDS)?//?5秒作為一個(gè)基本塊
.flatMap(INNER_BUCKET_SUM)???????????//?基本塊內(nèi)數(shù)據(jù)求和
.window(3,?1)??????????????//?3個(gè)塊作為一個(gè)窗口,滾動(dòng)布數(shù)為1
.flatMap(WINDOW_SUM)?????????????????//?窗口數(shù)據(jù)求和
.subscribe((Integer?integer)?->
logger.info("[{}]?call?......?{}",?//?輸出統(tǒng)計(jì)數(shù)據(jù)到日志
Thread.currentThread().getName(),?integer));????????//?緩慢發(fā)送數(shù)據(jù),觀察效果
for?(int?i=0;?i<100;?++i)?{????????????if?(i?
serializedSubject.onNext(1);
}?else?{
serializedSubject.onNext(2);
}
Thread.sleep(1000);
}
}
}
總結(jié)
一個(gè)滑動(dòng)窗口統(tǒng)計(jì)主要分為兩步:bucket 統(tǒng)計(jì),bucket 的大小決定了滑動(dòng)窗口滾動(dòng)時(shí)間間隔;
window 統(tǒng)計(jì),window 的時(shí)長決定了包含的 bucket 的數(shù)目。
Hystrix 實(shí)現(xiàn)滑動(dòng)窗口利用了 RxJava 這個(gè)響應(yīng)式函數(shù)編程框架,主要是其中的幾個(gè)函數(shù):window:根據(jù)指定時(shí)間或指定數(shù)量對數(shù)據(jù)流進(jìn)行聚集,相當(dāng)于 1 對 N 的轉(zhuǎn)換;
flatMap:將輸入數(shù)據(jù)流,轉(zhuǎn)換成另一種格式的數(shù)據(jù)流,在滑動(dòng)窗口統(tǒng)計(jì)中起到了數(shù)據(jù)求和的功能(當(dāng)然其功能并不限于求和)。
Hystrix 最核心的基礎(chǔ)組件,當(dāng)屬提供觀察者模式(發(fā)布-訂閱模式)的 RxJava。
參考文獻(xiàn)
作者:albon
鏈接:https://www.jianshu.com/p/c1b6497889b4
總結(jié)
以上是生活随笔為你收集整理的Java熔断框架有哪些_降级熔断框架 Hystrix 源码解析:滑动窗口统计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 人工智能人才月薪10万?脉脉CEO林凡:
- 下一篇: Akamai发布Akamai Conne