javascript
Spring中的重试功能!嗯,有点东西
來源:https://albenw.github.io/posts/69a9647f/
概要
Spring實現了一套重試機制,功能簡單實用。Spring Retry是從Spring Batch獨立出來的一個功能,已經廣泛應用于Spring Batch,Spring Integration, Spring for Apache Hadoop等Spring項目。 本文將講述如何使用Spring Retry及其實現原理。
背景
重試,其實我們其實很多時候都需要的,為了保證容錯性,可用性,一致性等。一般用來應對外部系統的一些不可預料的返回、異常等,特別是網絡延遲,中斷等情況。還有在現在流行的微服務治理框架中,通常都有自己的重試與超時配置,比如dubbo可以設置retries=1,timeout=500調用失敗只重試1次,超過500ms調用仍未返回則調用失敗。 如果我們要做重試,要為特定的某個操作做重試功能,則要硬編碼,大概邏輯基本都是寫個循環,根據返回或異常,計數失敗次數,然后設定退出條件。 這樣做,且不說每個操作都要寫這種類似的代碼,而且重試邏輯和業務邏輯混在一起,給維護和擴展帶來了麻煩。 從面向對象的角度來看,我們應該把重試的代碼獨立出來。
使用介紹
基本使用
先舉個例子:
@Configuration @EnableRetry public?class?Application?{@Beanpublic?RetryService?retryService(){return?new?RetryService();}public?static?void?main(String[]?args)?throws?Exception{ApplicationContext?applicationContext?=?new?AnnotationConfigApplicationContext("springretry");RetryService?service1?=?applicationContext.getBean("service",?RetryService.class);service1.service();} }@Service("service") public?class?RetryService?{@Retryable(value?=?IllegalAccessException.class,?maxAttempts?=?5,backoff=?@Backoff(value?=?1500,?maxDelay?=?100000,?multiplier?=?1.2))public?void?service()?throws?IllegalAccessException?{System.out.println("service?method...");throw?new?IllegalAccessException("manual?exception");}@Recoverpublic?void?recover(IllegalAccessException?e){System.out.println("service?retry?after?Recover?=>?"?+?e.getMessage());}}@EnableRetry - 表示開啟重試機制?
@Retryable - 表示這個方法需要重試,它有很豐富的參數,可以滿足你對重試的需求?
@Backoff - 表示重試中的退避策略?
@Recover - 兜底方法,即多次重試后還是失敗就會執行這個方法
Spring-Retry 的功能豐富在于其重試策略和退避策略,還有兜底,監聽器等操作。
然后每個注解里面的參數,都是很簡單的,大家看一下就知道是什么意思,怎么用了,我就不多講了。
重試策略
看一下Spring Retry自帶的一些重試策略,主要是用來判斷當方法調用異常時是否需要重試。(下文原理部分會深入分析實現)
SimpleRetryPolicy 默認最多重試3次
TimeoutRetryPolicy 默認在1秒內失敗都會重試
ExpressionRetryPolicy 符合表達式就會重試
CircuitBreakerRetryPolicy 增加了熔斷的機制,如果不在熔斷狀態,則允許重試
CompositeRetryPolicy 可以組合多個重試策略
NeverRetryPolicy 從不重試(也是一種重試策略哈)
AlwaysRetryPolicy 總是重試
….等等
退避策略
看一下退避策略,退避是指怎么去做下一次的重試,在這里其實就是等待多長時間。(下文原理部分會深入分析實現)
FixedBackOffPolicy 默認固定延遲1秒后執行下一次重試
ExponentialBackOffPolicy 指數遞增延遲執行重試,默認初始0.1秒,系數是2,那么下次延遲0.2秒,再下次就是延遲0.4秒,如此類推,最大30秒。
ExponentialRandomBackOffPolicy 在上面那個策略上增加隨機性
UniformRandomBackOffPolicy 這個跟上面的區別就是,上面的延遲會不停遞增,這個只會在固定的區間隨機
StatelessBackOffPolicy 這個說明是無狀態的,所謂無狀態就是對上次的退避無感知,從它下面的子類也能看出來
原理
原理部分我想分開兩部分來講,一是重試機制的切入點,即它是如何使得你的代碼實現重試功能的;二是重試機制的詳細,包括重試的邏輯以及重試策略和退避策略的實現。
切入點
@EnableRetry
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @EnableAspectJAutoProxy(proxyTargetClass?=?false) @Import(RetryConfiguration.class) @Documented public?@interface?EnableRetry?{/***?Indicate?whether?subclass-based?(CGLIB)?proxies?are?to?be?created?as?opposed*?to?standard?Java?interface-based?proxies.?The?default?is?{@code?false}.**?@return?whether?to?proxy?or?not?to?proxy?the?class*/boolean?proxyTargetClass()?default?false;}我們可以看到
@EnableAspectJAutoProxy(proxyTargetClass = false)
這個并不陌生,就是打開Spring AOP功能。 重點看看
@Import(RetryConfiguration.class)
@Import相當于注冊這個Bean
我們看看這個RetryConfiguration是個什么東西
它是一個AbstractPointcutAdvisor,它有一個pointcut和一個advice。我們知道,在IOC過程中會根據PointcutAdvisor類來對Bean進行Pointcut的過濾,然后生成對應的AOP代理類,用advice來加強處理。 看看RetryConfiguration的初始化:
上面代碼用到了AnnotationClassOrMethodPointcut,其實它最終還是用到了AnnotationMethodMatcher來根據注解進行切入點的過濾。這里就是@Retryable注解了。
//創建advice對象,即攔截器protected?Advice?buildAdvice()?{//下面關注這個對象AnnotationAwareRetryOperationsInterceptor?interceptor?=?new?AnnotationAwareRetryOperationsInterceptor();if?(retryContextCache?!=?null)?{interceptor.setRetryContextCache(retryContextCache);}if?(retryListeners?!=?null)?{interceptor.setListeners(retryListeners);}if?(methodArgumentsKeyGenerator?!=?null)?{interceptor.setKeyGenerator(methodArgumentsKeyGenerator);}if?(newMethodArgumentsIdentifier?!=?null)?{interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);}if?(sleeper?!=?null)?{interceptor.setSleeper(sleeper);}return?interceptor; }AnnotationAwareRetryOperationsInterceptor
繼承關系
可以看出AnnotationAwareRetryOperationsInterceptor是一個MethodInterceptor,在創建AOP代理過程中如果目標方法符合pointcut的規則,它就會加到interceptor列表中,然后做增強,我們看看invoke方法做了什么增強。
這里用到了委托,主要是需要根據配置委托給具體“有狀態”的interceptor還是“無狀態”的interceptor。
private?MethodInterceptor?getDelegate(Object?target,?Method?method)?{if?(!this.delegates.containsKey(target)?||?!this.delegates.get(target).containsKey(method))?{synchronized?(this.delegates)?{if?(!this.delegates.containsKey(target))?{this.delegates.put(target,?new?HashMap<Method,?MethodInterceptor>());}Map<Method,?MethodInterceptor>?delegatesForTarget?=?this.delegates.get(target);if?(!delegatesForTarget.containsKey(method))?{Retryable?retryable?=?AnnotationUtils.findAnnotation(method,?Retryable.class);if?(retryable?==?null)?{retryable?=?AnnotationUtils.findAnnotation(method.getDeclaringClass(),?Retryable.class);}if?(retryable?==?null)?{retryable?=?findAnnotationOnTarget(target,?method);}if?(retryable?==?null)?{return?delegatesForTarget.put(method,?null);}MethodInterceptor?delegate;//支持自定義MethodInterceptor,而且優先級最高if?(StringUtils.hasText(retryable.interceptor()))?{delegate?=?this.beanFactory.getBean(retryable.interceptor(),?MethodInterceptor.class);}else?if?(retryable.stateful())?{//得到“有狀態”的interceptordelegate?=?getStatefulInterceptor(target,?method,?retryable);}else?{//得到“無狀態”的interceptordelegate?=?getStatelessInterceptor(target,?method,?retryable);}delegatesForTarget.put(method,?delegate);}}}return?this.delegates.get(target).get(method);}getStatefulInterceptor和getStatelessInterceptor都是差不多,我們先看看比較簡單的getStatelessInterceptor。
private?MethodInterceptor?getStatelessInterceptor(Object?target,?Method?method,?Retryable?retryable)?{//生成一個RetryTemplateRetryTemplate?template?=?createTemplate(retryable.listeners());//生成retryPolicytemplate.setRetryPolicy(getRetryPolicy(retryable));//生成backoffPolicytemplate.setBackOffPolicy(getBackoffPolicy(retryable.backoff()));return?RetryInterceptorBuilder.stateless().retryOperations(template).label(retryable.label()).recoverer(getRecoverer(target,?method)).build();}具體生成retryPolicy和backoffPolicy的規則,我們等下再回頭來看。 RetryInterceptorBuilder其實就是為了生成RetryOperationsInterceptor。RetryOperationsInterceptor也是一個MethodInterceptor,我們來看看它的invoke方法。
public?Object?invoke(final?MethodInvocation?invocation)?throws?Throwable?{String?name;if?(StringUtils.hasText(label))?{name?=?label;}?else?{name?=?invocation.getMethod().toGenericString();}final?String?label?=?name;//定義了一個RetryCallback,其實看它的doWithRetry方法,調用了invocation的proceed()方法,是不是有點眼熟,這就是AOP的攔截鏈調用,如果沒有攔截鏈,那就是對原來方法的調用。RetryCallback<Object,?Throwable>?retryCallback?=?new?RetryCallback<Object,?Throwable>()?{public?Object?doWithRetry(RetryContext?context)?throws?Exception?{context.setAttribute(RetryContext.NAME,?label);/**?If?we?don't?copy?the?invocation?carefully?it?won't?keep?a?reference?to*?the?other?interceptors?in?the?chain.?We?don't?have?a?choice?here?but?to*?specialise?to?ReflectiveMethodInvocation?(but?how?often?would?another*?implementation?come?along?).*/if?(invocation?instanceof?ProxyMethodInvocation)?{try?{return?((ProxyMethodInvocation)?invocation).invocableClone().proceed();}catch?(Exception?e)?{throw?e;}catch?(Error?e)?{throw?e;}catch?(Throwable?e)?{throw?new?IllegalStateException(e);}}else?{throw?new?IllegalStateException("MethodInvocation?of?the?wrong?type?detected?-?this?should?not?happen?with?Spring?AOP,?"?+"so?please?raise?an?issue?if?you?see?this?exception");}}};if?(recoverer?!=?null)?{ItemRecovererCallback?recoveryCallback?=?new?ItemRecovererCallback(invocation.getArguments(),?recoverer);return?this.retryOperations.execute(retryCallback,?recoveryCallback);}//最終還是進入到retryOperations的execute方法,這個retryOperations就是在之前的builder?set進來的RetryTemplate。return?this.retryOperations.execute(retryCallback);}無論是RetryOperationsInterceptor還是StatefulRetryOperationsInterceptor,最終的攔截處理邏輯還是調用到RetryTemplate的execute方法,從名字也看出來,RetryTemplate作為一個模板類,里面包含了重試統一邏輯。不過,我看這個RetryTemplate并不是很“模板”,因為它沒有很多可以擴展的地方。
重試邏輯及策略實現
上面介紹了Spring Retry利用了AOP代理使重試機制對業務代碼進行“入侵”。下面我們繼續看看重試的邏輯做了什么。 RetryTemplate的doExecute方法。
protected?<T,?E?extends?Throwable>?T?doExecute(RetryCallback<T,?E>?retryCallback,RecoveryCallback<T>?recoveryCallback,?RetryState?state)throws?E,?ExhaustedRetryException?{RetryPolicy?retryPolicy?=?this.retryPolicy;BackOffPolicy?backOffPolicy?=?this.backOffPolicy;//新建一個RetryContext來保存本輪重試的上下文RetryContext?context?=?open(retryPolicy,?state);if?(this.logger.isTraceEnabled())?{this.logger.trace("RetryContext?retrieved:?"?+?context);}//?Make?sure?the?context?is?available?globally?for?clients?who?need//?it...RetrySynchronizationManager.register(context);Throwable?lastException?=?null;boolean?exhausted?=?false;try?{//如果有注冊RetryListener,則會調用它的open方法,給調用者一個通知。boolean?running?=?doOpenInterceptors(retryCallback,?context);if?(!running)?{throw?new?TerminatedRetryException("Retry?terminated?abnormally?by?interceptor?before?first?attempt");}//?Get?or?Start?the?backoff?context...BackOffContext?backOffContext?=?null;Object?resource?=?context.getAttribute("backOffContext");if?(resource?instanceof?BackOffContext)?{backOffContext?=?(BackOffContext)?resource;}if?(backOffContext?==?null)?{backOffContext?=?backOffPolicy.start(context);if?(backOffContext?!=?null)?{context.setAttribute("backOffContext",?backOffContext);}}//判斷能否重試,就是調用RetryPolicy的canRetry方法來判斷。//這個循環會直到原方法不拋出異常,或不需要再重試while?(canRetry(retryPolicy,?context)?&&?!context.isExhaustedOnly())?{try?{if?(this.logger.isDebugEnabled())?{this.logger.debug("Retry:?count="?+?context.getRetryCount());}//清除上次記錄的異常lastException?=?null;//doWithRetry方法,一般來說就是原方法return?retryCallback.doWithRetry(context);}catch?(Throwable?e)?{//原方法拋出了異常lastException?=?e;try?{//記錄異常信息registerThrowable(retryPolicy,?state,?context,?e);}catch?(Exception?ex)?{throw?new?TerminatedRetryException("Could?not?register?throwable",ex);}finally?{//調用RetryListener的onError方法doOnErrorInterceptors(retryCallback,?context,?e);}//再次判斷能否重試if?(canRetry(retryPolicy,?context)?&&?!context.isExhaustedOnly())?{try?{//如果可以重試則走退避策略backOffPolicy.backOff(backOffContext);}catch?(BackOffInterruptedException?ex)?{lastException?=?e;//?back?off?was?prevented?by?another?thread?-?fail?the?retryif?(this.logger.isDebugEnabled())?{this.logger.debug("Abort?retry?because?interrupted:?count="+?context.getRetryCount());}throw?ex;}}if?(this.logger.isDebugEnabled())?{this.logger.debug("Checking?for?rethrow:?count="?+?context.getRetryCount());}if?(shouldRethrow(retryPolicy,?context,?state))?{if?(this.logger.isDebugEnabled())?{this.logger.debug("Rethrow?in?retry?for?policy:?count="+?context.getRetryCount());}throw?RetryTemplate.<E>wrapIfNecessary(e);}}/**?A?stateful?attempt?that?can?retry?may?rethrow?the?exception?before?now,*?but?if?we?get?this?far?in?a?stateful?retry?there's?a?reason?for?it,*?like?a?circuit?breaker?or?a?rollback?classifier.*/if?(state?!=?null?&&?context.hasAttribute(GLOBAL_STATE))?{break;}}if?(state?==?null?&&?this.logger.isDebugEnabled())?{this.logger.debug("Retry?failed?last?attempt:?count="?+?context.getRetryCount());}exhausted?=?true;//重試結束后如果有兜底Recovery方法則執行,否則拋異常return?handleRetryExhausted(recoveryCallback,?context,?state);}catch?(Throwable?e)?{throw?RetryTemplate.<E>wrapIfNecessary(e);}finally?{//處理一些關閉邏輯close(retryPolicy,?context,?state,?lastException?==?null?||?exhausted);//調用RetryListener的close方法doCloseInterceptors(retryCallback,?context,?lastException);RetrySynchronizationManager.clear();}}主要核心重試邏輯就是上面的代碼了,看上去還是挺簡單的。 在上面,我們漏掉了RetryPolicy的canRetry方法和BackOffPolicy的backOff方法,以及這兩個Policy是怎么來的。 我們回頭看看getStatelessInterceptor方法中的getRetryPolicy和getRetryPolicy方法。
private?RetryPolicy?getRetryPolicy(Annotation?retryable)?{Map<String,?Object>?attrs?=?AnnotationUtils.getAnnotationAttributes(retryable);@SuppressWarnings("unchecked")Class<??extends?Throwable>[]?includes?=?(Class<??extends?Throwable>[])?attrs.get("value");String?exceptionExpression?=?(String)?attrs.get("exceptionExpression");boolean?hasExpression?=?StringUtils.hasText(exceptionExpression);if?(includes.length?==?0)?{@SuppressWarnings("unchecked")Class<??extends?Throwable>[]?value?=?(Class<??extends?Throwable>[])?attrs.get("include");includes?=?value;}@SuppressWarnings("unchecked")Class<??extends?Throwable>[]?excludes?=?(Class<??extends?Throwable>[])?attrs.get("exclude");Integer?maxAttempts?=?(Integer)?attrs.get("maxAttempts");String?maxAttemptsExpression?=?(String)?attrs.get("maxAttemptsExpression");if?(StringUtils.hasText(maxAttemptsExpression))?{maxAttempts?=?PARSER.parseExpression(resolve(maxAttemptsExpression),?PARSER_CONTEXT).getValue(this.evaluationContext,?Integer.class);}if?(includes.length?==?0?&&?excludes.length?==?0)?{SimpleRetryPolicy?simple?=?hasExpression???new?ExpressionRetryPolicy(resolve(exceptionExpression)).withBeanFactory(this.beanFactory):?new?SimpleRetryPolicy();simple.setMaxAttempts(maxAttempts);return?simple;}Map<Class<??extends?Throwable>,?Boolean>?policyMap?=?new?HashMap<Class<??extends?Throwable>,?Boolean>();for?(Class<??extends?Throwable>?type?:?includes)?{policyMap.put(type,?true);}for?(Class<??extends?Throwable>?type?:?excludes)?{policyMap.put(type,?false);}boolean?retryNotExcluded?=?includes.length?==?0;if?(hasExpression)?{return?new?ExpressionRetryPolicy(maxAttempts,?policyMap,?true,?exceptionExpression,?retryNotExcluded).withBeanFactory(this.beanFactory);}else?{return?new?SimpleRetryPolicy(maxAttempts,?policyMap,?true,?retryNotExcluded);}}嗯~,代碼不難,這里簡單做一下總結好了。就是通過@Retryable注解中的參數,來判斷具體使用文章開頭說到的哪個重試策略,是SimpleRetryPolicy還是ExpressionRetryPolicy等。
private?BackOffPolicy?getBackoffPolicy(Backoff?backoff)?{long?min?=?backoff.delay()?==?0???backoff.value()?:?backoff.delay();if?(StringUtils.hasText(backoff.delayExpression()))?{min?=?PARSER.parseExpression(resolve(backoff.delayExpression()),?PARSER_CONTEXT).getValue(this.evaluationContext,?Long.class);}long?max?=?backoff.maxDelay();if?(StringUtils.hasText(backoff.maxDelayExpression()))?{max?=?PARSER.parseExpression(resolve(backoff.maxDelayExpression()),?PARSER_CONTEXT).getValue(this.evaluationContext,?Long.class);}double?multiplier?=?backoff.multiplier();if?(StringUtils.hasText(backoff.multiplierExpression()))?{multiplier?=?PARSER.parseExpression(resolve(backoff.multiplierExpression()),?PARSER_CONTEXT).getValue(this.evaluationContext,?Double.class);}if?(multiplier?>?0)?{ExponentialBackOffPolicy?policy?=?new?ExponentialBackOffPolicy();if?(backoff.random())?{policy?=?new?ExponentialRandomBackOffPolicy();}policy.setInitialInterval(min);policy.setMultiplier(multiplier);policy.setMaxInterval(max?>?min???max?:?ExponentialBackOffPolicy.DEFAULT_MAX_INTERVAL);if?(this.sleeper?!=?null)?{policy.setSleeper(this.sleeper);}return?policy;}if?(max?>?min)?{UniformRandomBackOffPolicy?policy?=?new?UniformRandomBackOffPolicy();policy.setMinBackOffPeriod(min);policy.setMaxBackOffPeriod(max);if?(this.sleeper?!=?null)?{policy.setSleeper(this.sleeper);}return?policy;}FixedBackOffPolicy?policy?=?new?FixedBackOffPolicy();policy.setBackOffPeriod(min);if?(this.sleeper?!=?null)?{policy.setSleeper(this.sleeper);}return?policy;}嗯~,一樣的味道。就是通過@Backoff注解中的參數,來判斷具體使用文章開頭說到的哪個退避策略,是FixedBackOffPolicy還是UniformRandomBackOffPolicy等。
那么每個RetryPolicy都會重寫canRetry方法,然后在RetryTemplate判斷是否需要重試。 我們看看SimpleRetryPolicy的
@Overridepublic?boolean?canRetry(RetryContext?context)?{Throwable?t?=?context.getLastThrowable();//判斷拋出的異常是否符合重試的異常//還有,是否超過了重試的次數return?(t?==?null?||?retryForException(t))?&&?context.getRetryCount()?<?maxAttempts;}同樣,我們看看FixedBackOffPolicy的退避方法。
protected?void?doBackOff()?throws?BackOffInterruptedException?{try?{//就是sleep固定的時間sleeper.sleep(backOffPeriod);}catch?(InterruptedException?e)?{throw?new?BackOffInterruptedException("Thread?interrupted?while?sleeping",?e);}}至此,重試的主要原理以及邏輯大概就是這樣了。
RetryContext
我覺得有必要說說RetryContext,先看看它的繼承關系。
可以看出對每一個策略都有對應的Context。
在Spring Retry里,其實每一個策略都是單例來的。我剛開始直覺是對每一個需要重試的方法都會new一個策略,這樣重試策略之間才不會產生沖突,但是一想就知道這樣就可能多出了很多策略對象出來,增加了使用者的負擔,這不是一個好的設計。Spring Retry采用了一個更加輕量級的做法,就是針對每一個需要重試的方法只new一個上下文Context對象,然后在重試時,把這個Context傳到策略里,策略再根據這個Context做重試,而且Spring Retry還對這個Context做了cache。這樣就相當于對重試的上下文做了優化。
總結
Spring Retry通過AOP機制來實現對業務代碼的重試”入侵“,RetryTemplate中包含了核心的重試邏輯,還提供了豐富的重試策略和退避策略。
參考資料
http://www.10tiao.com/html/164/201705/2652898434/1.html https://www.jianshu.com/p/58e753ca0151 https://paper.tuisec.win/detail/90bd660fad92183
總結
以上是生活随笔為你收集整理的Spring中的重试功能!嗯,有点东西的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高并发下秒杀商品,必须知道的9个细节
- 下一篇: 离散数学关系的性质_关系和关系的性质|