异步重试模式
當您有一段經常失敗且必須重試的代碼時,此Java 7/8庫提供了豐富且不引人注目的API,并提供了針對此問題的快速且可擴展的解決方案:
現在,您可以運行任意代碼塊,并且庫將為您重試該代碼塊,以防它拋出SocketException :
final CompletableFuture<Socket> future = executor.getWithRetry(() ->new Socket("localhost", 8080) );future.thenAccept(socket ->System.out.println("Connected! " + socket) );請仔細看! getWithRetry()不會阻止。 它立即返回CompletableFuture并異步調用給定的函數。 您可以一次收聽該Future甚至是多個Future ,并同時進行其他工作。 因此,這段代碼的作用是:嘗試連接到localhost:8080 ,如果由于SocketException而失敗,它將在500毫秒后重試(帶有一些隨機抖動),每次重試后的延遲加倍,但不超過10秒。
等效但更簡潔的語法:
executor.getWithRetry(() -> new Socket("localhost", 8080)).thenAccept(socket -> System.out.println("Connected! " + socket));這是您可能期望的示例輸出:
TRACE | Retry 0 failed after 3ms, scheduled next retry in 508ms (Sun Jul 21 21:01:12 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 1 failed after 0ms, scheduled next retry in 934ms (Sun Jul 21 21:01:13 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Retry 2 failed after 0ms, scheduled next retry in 1919ms (Sun Jul 21 21:01:15 CEST 2013) java.net.ConnectException: Connection refusedat java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:1.8.0-ea]//...TRACE | Successful after 2 retries, took 0ms and returned: Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]Connected! Socket[addr=localhost/127.0.0.1,port=8080,localport=46332]想象一下,您連接到兩個不同的系統,一個系統速度很慢 ,第二個系統不可靠并且經常失敗:
CompletableFuture<String> stringFuture = executor.getWithRetry(ctx -> unreliable()); CompletableFuture<Integer> intFuture = executor.getWithRetry(ctx -> slow());stringFuture.thenAcceptBoth(intFuture, (String s, Integer i) -> {//both done after some retries });當緩慢且不可靠的系統最終都無任何故障地答復時,異步異步執行thenAcceptBoth()回調。 類似地(使用CompletableFuture.acceptEither() ),您可以同時異步調用兩個或多個不可靠的服務器,并在重試幾次后第一個成功時會收到通知。
我對此不夠強調–重試是異步執行的,并且有效地使用了線程池,而不是盲目入睡。
基本原理
通常我們被迫重試給定的代碼段,因為它失敗了,我們必須再次嘗試,通常會稍有延遲以節省CPU。 這項要求非常普遍,并且在Spring Batch中通過RetryTemplate類提供重試支持的現成通用實現很少RetryTemplate 。 但是幾乎沒有其他類似的方法( [1] , [2] )。 所有這些嘗試(我敢打賭,你們中的許多人自己都實現了類似的工具!)遇到了相同的問題-它們正在阻塞,從而浪費了大量資源,并且擴展性不好。
這本身并不壞,因為它使編程模型更加簡單-庫負責重試,而您只需要等待比平常更長的返回值即可。 但是,這不僅會造成泄漏的抽象(由于重試和延遲,通常非常快的方法通常會突然變慢),而且還會浪費寶貴的線程,因為這種工具將在重試之間花費大部分時間。 因此
創建了Async-Retry實用程序,該實用程序針對Java 8 (現有Java 7 backport )并解決了上述問題。
主要的抽象是RetryExecutor ,它提供了簡單的API:
public interface RetryExecutor {CompletableFuture<Void> doWithRetry(RetryRunnable action);<V> CompletableFuture<V> getWithRetry(Callable<V> task);<V> CompletableFuture<V> getWithRetry(RetryCallable<V> task);<V> CompletableFuture<V> getFutureWithRetry(RetryCallable<CompletableFuture<V>> task); }不必擔心RetryRunnable和RetryCallable –為方便起見,它們允許使用已檢查的異常,并且在大多數情況下,我們還是會使用lambda表達式。
請注意,它返回CompletableFuture 。 我們不再假裝調用錯誤方法很快。 如果庫遇到異常,它將使用預先配置的退避延遲重試我們的代碼塊。 調用時間將從幾毫秒飛漲到幾秒鐘。 CompletableFuture清楚地表明了這一點。 而且,它不是一個愚蠢的java.util.concurrent.Future我們都知道– Java 8中的CompletableFuture非常強大 ,最重要的是–默認情況下是非阻塞的。
如果您畢竟需要阻止結果,只需在Future對象上調用.get() 。
基本API
該API非常簡單。 您提供了一個代碼塊,該庫將多次運行它,直到它正常返回為止,而不是引發異常。 它也可能在重試之間設置可配置的延遲:
RetryExecutor executor = //...executor.getWithRetry(() -> new Socket("localhost", 8080));一旦成功連接到localhost:8080將解析返回的CompletableFuture<Socket> localhost:8080 。 (可選)我們可以使用RetryContext來獲取額外的上下文,例如當前正在執行的重試:
executor.getWithRetry(ctx -> new Socket("localhost", 8080 + ctx.getRetryCount())).thenAccept(System.out::println);該代碼比看起來更聰明。 在第一次執行時, ctx.getRetryCount()返回0 ,因此我們嘗試連接到localhost:8080 。 如果失敗,則下一個重試將嘗試localhost:8081 ( 8080 + 1 ),依此類推。 而且,如果您意識到所有這些操作都是異步發生的,則可以掃描多臺計算機的端口,并收到有關每個主機上第一個響應端口的通知:
Arrays.asList("host-one", "host-two", "host-three").stream().forEach(host ->executor.getWithRetry(ctx -> new Socket(host, 8080 + ctx.getRetryCount())).thenAccept(System.out::println));對于每個主機, RetryExecutor將嘗試連接到端口8080,并嘗試使用更高的端口。
getFutureWithRetry()需要特別注意。 我想重試已經返回CompletableFuture<V> :例如異步HTTP調用的結果:
private CompletableFuture<String> asyncHttp(URL url) { /*...*/}//...final CompletableFuture<CompletableFuture<String>> response = executor.getWithRetry(ctx -> asyncHttp(new URL("http://example.com"))); 將asyncHttp()傳遞給getWithRetry()將產生CompletableFuture<CompletableFuture<V>> 。 與它一起工作不僅很尷尬,而且還很麻煩。 該庫將僅調用asyncHttp()并僅在失敗時重試,但在返回時不重試
CompletableFuture<String>失敗。 解決方案很簡單:
在這種情況下, RetryExecutor將理解從asyncHttp()返回的內容實際上只是一個Future并且將(異步)等待結果或失敗。 該庫功能更強大,因此讓我們深入了解:
配置選項
通常,您可以配置兩個重要因素: RetryPolicy ,用于控制是否應進行下一次重試;以及Backoff (可以有選擇地增加后續重試之間的延遲)。
默認情況下, RetryExecutor在每個Throwable上無限地重復用戶任務,并在RetryExecutor重試之間增加1秒的延遲。
創建
RetryExecutor默認實現是AsyncRetryExecutor ,您可以直接創建:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();RetryExecutor executor = new AsyncRetryExecutor(scheduler);//...scheduler.shutdownNow();唯一需要的依賴關系是JDK的標準ScheduledExecutorService 。 在許多情況下,一個線程就足夠了,但是如果您要同時處理數百個或更多任務的重試,請考慮增加池大小。
請注意, AsyncRetryExecutor不會關閉ScheduledExecutorService 。 這是一個有意識的設計決策,將在后面進行解釋。
AsyncRetryExecutor幾乎沒有其他構造函數,但是在大多數情況下,更改類的行為對于with*()方法鏈接的調用最為方便。 您將看到大量以此方式編寫的示例。 稍后,我們將僅使用executor引用而不定義它。 假設它是RetryExecutor類型。
重試政策
例外情況
默認情況下,從用戶任務拋出的每個Throwable (特殊AbortRetryException除外)都會導致重試。 顯然,這是可配置的。 例如,在JPA中,您可能想重試由于OptimisticLockException而失敗的事務-但其他所有異常都應立即失敗:
executor.retryOn(OptimisticLockException.class).withNoDelay().getWithRetry(ctx -> dao.optimistic());其中dao.optimistic()可能會引發OptimisticLockException 。 在這種情況下,您可能不希望重試之間有任何延遲,以后再說。 如果您不喜歡在每個Throwable上重試的默認設置,只需使用retryOn()來限制它:
executor.retryOn(Exception.class)當然,也可能需要相反的操作–中止重試并在拋出某些異常的情況下立即失敗而不是重試。 就這么簡單:
executor.abortOn(NullPointerException.class).abortOn(IllegalArgumentException.class).getWithRetry(ctx -> dao.optimistic());顯然,您不想重試NullPointerException或IllegalArgumentException因為它們指示編程錯誤,而不是瞬時失敗。 最后,您可以結合使用重試和中止策略。 如果出現任何retryOn()異常(或子類),則用戶代碼將重試,除非它應該abortOn()指定的異常。 例如,我們想重試每個IOException或SQLException但是如果遇到FileNotFoundException或java.sql.DataTruncation則中止(順序無關):
executor.retryOn(IOException.class).abortIf(FileNotFoundException.class).retryOn(SQLException.class).abortIf(DataTruncation.class).getWithRetry(ctx -> dao.load(42));如果這還不夠,您可以提供將在每次失敗時調用的自定義謂詞:
executor.abortIf(throwable ->throwable instanceof SQLException &&throwable.getMessage().contains("ORA-00911"));最大重試次數
中斷重試“循環”的另一種方法(請記住此過程是異步的,沒有阻塞循環 )是通過指定最大重試次數:
executor.withMaxRetries(5)在極少數情況下,您可能希望禁用重試,并且幾乎不利用異步執行。 在這種情況下,請嘗試:
executor.dontRetry()重試之間的延遲(退避)
有時需要在失敗后立即重試(請參閱OptimisticLockException示例),但是在大多數情況下,這是一個壞主意。 如果您無法連接到外部系統,請稍等片刻,然后再嘗試下一次嘗試。 您可以節省CPU,帶寬和其他服務器的資源。 但是有很多選擇要考慮:
- 我們應該以固定的間隔重試還是增加每次失敗后的延遲 ?
- 輪候時間是否應該有上限和下限?
- 我們是否應該添加隨機“抖動”來延遲時間以及時分散許多任務的重試?
該庫回答了所有這些問題。
重試間隔固定
默認情況下,每次重試之前都有1秒的等待時間。 因此,如果初始嘗試失敗,則將在1秒后執行第一次重試。 當然,我們可以將默認值更改為200毫秒:
executor.withFixedBackoff(200)如果我們已經在此處,則默認情況下,執行用戶任務后將應用退避。 如果用戶任務本身消耗一些時間,則重試的頻率將降低。 例如,重試延遲為RetryExecutor毫秒,用戶任務失敗所需的平均時間約為50毫秒, RetryExecutor將每秒重試約4次(50毫秒+ RetryExecutor毫秒)。 但是,如果要將重試頻率保持在更可預測的水平,則可以使用fixedRate標志:
executor.withFixedBackoff(200).withFixedRate()這類似于ScheduledExecutorService “固定速率”與“固定延遲”方法。 順便說一句,不要期望RetryExecutor非常精確,這是最好的,但是在很大程度上取決于前面提到的ScheduledExecutorService準確性。
重試之間的間隔呈指數增長
它可能是一個活躍的研究主題,但總的來說,您可能希望隨著時間的推移擴展重試延遲,假設如果用戶任務多次失敗,我們應該減少嘗試次數。 例如,假設我們從100ms延遲開始,直到進行第??一次重試為止,但是如果該嘗試也失敗了,我們應該再等待兩次(200ms)。 再過400毫秒,800毫秒……您就會明白:
executor.withExponentialBackoff(100, 2)這是一個指數函數,可以快速增長。 因此,將最大退避時間設置在某個合理的水平(例如10秒)非常有用:
executor.withExponentialBackoff(100, 2).withMaxDelay(10_000) //10 seconds隨機抖動
在嚴重停機期間經常觀察到的一種現象是系統趨于同步。 想象一下一個繁忙的系統突然停止響應。 成百上千的請求失敗并被重試。 這取決于您的退避,但是默認情況下,所有這些請求都會在一秒鐘后精確重試,從而在某個時間點產生大量流量。 最后,此類故障會傳播到其他系統,這些系統又會進行同步。
為避免此問題,隨著時間的推移擴展重試,使負載平坦化是很有用的。 一個簡單的解決方案是添加隨機抖動來延遲時間,以便并非所有請求都計劃在完全相同的時間重試。 您可以在均勻抖動(隨機值從-100ms到100ms)之間進行選擇:
executor.withUniformJitter(100) //ms…和成比例的抖動,將延遲時間乘以隨機因子,默認情況下為0.9到1.1(10%):
executor.withProportionalJitter(0.1) //10%您還可以對延遲時間設置嚴格的下限,以避免安排較短的重試時間:
executor.withMinDelay(50) //ms實施細節
該庫是在考慮Java 8的情況下構建的,以利用lambda和新的CompletableFuture抽象(但是存在具有Guava依賴項的Java 7 port )。 它在下面使用ScheduledExecutorService來運行任務并計劃將來的重試-這樣可以最大程度地利用線程。
但是真正有趣的是整個庫是完全不變的,根本沒有單個可變字段。 起初這可能是違反直覺的,例如以以下簡單代碼示例為例:
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();AsyncRetryExecutor first = new AsyncRetryExecutor(scheduler).retryOn(Exception.class).withExponentialBackoff(500, 2);AsyncRetryExecutor second = first.abortOn(FileNotFoundException.class);AsyncRetryExecutor third = second.withMaxRetries(10);似乎所有with*()方法或retryOn() / abortOn()方法retryOn()現有的執行程序變異。 但是事實并非如此,每次配置更改都會創建一個新實例 ,而舊實例則保持不變。 因此,例如,當first執行者將重試FileNotFoundException , second和third執行者則不會。 但是它們都共享同一個scheduler 。 這就是AsyncRetryExecutor不關閉ScheduledExecutorService (甚至沒有任何close()方法)的原因。 由于我們不知道有多少個AsyncRetryExecutor副本指向同一調度程序,因此我們甚至不嘗試管理其生命周期。 但是,這通常不是問題(請參見下面的Spring集成 )。
您可能想知道,為什么這么笨拙的設計決定? 有以下三個原因:
- 在編寫并發代碼時,不變性可以大大降低多線程錯誤的風險。 例如, RetryContext保存重試次數。 但是,我們無需更改變量,而只需創建具有遞增但final計數器的新實例(副本)即可。 沒有比賽條件或可見性。
- 如果給您現有的RetryExecutor幾乎完全是您想要的,但是您需要進行一些細微調整,則只需調用executor.with...()并獲取一個新副本。 您不必擔心使用同一執行程序的其他地方(請參閱: Spring集成以獲取更多示例)
- 如今,函數式編程和不變的數據結構非常流行 。
注意: AsyncRetryExecutor 未標記為final ,您可以通過將其子類化并添加可變狀態來打破不變性。 請不要這樣做,子類只允許更改行為。
依存關系
該庫需要Java 8和SLF4J進行記錄。 Java 7端口還取決于Guava 。
Spring整合
如果您即將在Spring中使用RetryExecutor ,請放心,但是配置API可能對您不起作用。 Spring通過大量的設置來促進(或用于促進)可變服務的約定。 在XML中,您可以定義bean并在其上調用setter(通過<property name="..."/> )。 該約定假定存在變異設置器。 但是我發現這種方法在某些情況下容易出錯并且違反直覺。
假設我們全局定義了org.springframework.transaction.support.TransactionTemplate bean并將其注入到多個位置。 大。 現在有一個請求,它的超時要求略有不同:
@Autowired private TransactionTemplate template;后來在同一個班級:
final int oldTimeout = template.getTimeout(); template.setTimeout(10_000); //do the work template.setTimeout(oldTimeout);此代碼在許多級別上都是錯誤的! 首先,如果發生故障,我們將永遠不會恢復oldTimeout 。 好了, finally救了。 但還要注意我們如何更改全局共享的TransactionTemplate實例。 誰知道不知道更改配置的其他幾個bean和線程將要使用它?
即使您確實想全局更改事務超時,也足夠公平,但是這樣做仍然是錯誤的方法。 private timeout字段不是volatile ,因此對其進行的更改可能對其他線程可見或不可見。 真是一團糟! 同樣的問題在許多其他類(如JmsTemplate 。
你知道我要去哪里嗎? 只需創建一個不變的服務類,并在需要時通過創建副本來安全地對其進行調整。 現在,使用此類服務??同樣簡單:
@Configuration class Beans {@Beanpublic RetryExecutor retryExecutor() {return new AsyncRetryExecutor(scheduler()).retryOn(SocketException.class).withExponentialBackoff(500, 2);}@Bean(destroyMethod = "shutdownNow")public ScheduledExecutorService scheduler() {return Executors.newSingleThreadScheduledExecutor();}}嘿! 進入21世紀,我們在Spring不再需要XML。 Bootstrap也很簡單:
final ApplicationContext context = new AnnotationConfigApplicationContext(Beans.class); final RetryExecutor executor = context.getBean(RetryExecutor.class); //... context.close();如您所見,將現代的,不可變的服務與Spring集成非常簡單。 順便說一句,如果您在設計自己的服務時沒有準備好進行如此大的更改,請至少考慮使用構造函數注入 。
到期
該庫包含大量的單元測試。 但是,尚未在任何生產代碼中使用它,并且該API可能會更改。 當然,我們鼓勵您提交錯誤,功能請求和提取請求 。 它是在考慮到Java 8的情況下開發的,但是Java 7 backport存在一些更詳細的API和強制性Guava依賴關系( ListenableFuture而不是
Java 8的CompletableFuture )。
GitHub上的完整源代碼。
翻譯自: https://www.javacodegeeks.com/2013/08/asynchronous-retry-pattern.html
總結
- 上一篇: 1-7 月全球电动汽车电池市场份额:宁德
- 下一篇: Redmi K70 Pro影像细节曝光: