生活随笔
收集整理的這篇文章主要介紹了
OpenHub框架进行的异步通信
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
在本系列的前一部分中,我們介紹了OpenHub框架 。 這部分顯示了框架最強大的功能之一- 異步消息傳遞模型 。
當源系統無法等待目標系統的響應時,將使用系統之間的異步通信。 有以下幾個原因:
- 源系統必須盡可能地響應 ,并且不受外部影響(通信緩慢,目標系統不穩定等)的影響
- 目標系統中的處理需要很多時間
- 異步通信對性能和流量產生積極影響
異步方案
當您決定異步通信時,您必須考慮可能的情況:
- 目標系統必須確認傳入消息已成功保存 ,并準備進行進一步處理。 是否應將異步過程的最終結果通知源系統?
- 如果異步處理失敗怎么辦? 如果存在暫時性技術錯誤 (例如,與另一個系統的通信失敗), 請嘗試幾次,或者由于業務錯誤 (例如,輸入數據無效)而停止進一步處理 。
- 在異步處理期間會調用其他系統- 如果對第一個系統的調用可以,但對第二個系統的調用失敗,該怎么辦? 異步處理必須是冪等的,并跳過第一個成功的呼叫,然后僅重試第二個呼叫。
- 異步過程可能很棘手,因此最好將一個大進程(父進程)劃分為較小的(子進程)進程 。 如果處理了子進程,則父進程也將完成。
- 有時,您必須保證傳入請求的順序 (請求的到達順序與發送的順序不相同)并按照確切的順序進行處理。
- 它是異步處理,您需要對其進行監視或在發生意外情況 (例如異步過程失敗) 時自動得到通知 。
- 有時,您需要保存數據或異步過程的當前狀態以使其成功完成,例如,第一次調用外部系統的結果就是第二次調用的輸入。
當您開始考慮所有這些情況時,您會發現從頭開始實施它并不簡單。 OpenHub框架內置了對異步消息處理的支持。 它易于使用,但同時又強大又靈活。 并且也是可配置的,例如,該過程應再次運行多少次? 在哪個時間間隔?
異步路由實現
使用OpenHub框架的路由實現有兩個子路由:
- 一種用于處理傳入消息( RouteIn )
- 一種用于異步過程實現( RouteOut )
/*** Route definition for asynchronous operation "translate" via web services.*/
@CamelConfiguration(value = AsyncTranslateWsRoute.ROUTE_BEAN)
public class AsyncTranslateWsRoute extends AbstractBasicRoute {static final String ROUTE_BEAN = "asyncTranslateWsRouteBean";private static final String OPERATION_NAME = "asyncTranslateWs";static final String ROUTE_ID_ASYNC_IN = getInRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String ROUTE_ID_ASYNC_OUT = getOutRouteId(ServiceEnum.TRANSLATE, OPERATION_NAME);static final String URI_ASYNC_OUT = "direct:" + ROUTE_ID_ASYNC_OUT;@Overrideprotected void doConfigure() throws Exception {// asyncTranslate - input asynch messagecreateAsyncRouteIn();// asyncTranslate - process delivery (=asynchronous execution)createAsyncRouteOut();}/*** Route for asynchronous <strong>asyncTranslate</strong> input operation.* <p/>* Prerequisite: none* <p/>* Output: {@link AsyncTranslateResponse}*/private void createAsyncRouteIn() {Namespaces ns = new Namespaces("h", TranslateWebServiceConfig.TRANSLATE_SERVICE_NS);// note: mandatory parameters are set already in XSD, this validation is extraXPathValidator validator = new XPathValidator("/h:asyncTranslateRequest", ns, "h:inputText");AsynchRouteBuilder.newInstance(ServiceEnum.TRANSLATE, OPERATION_NAME,getInWsUri(new QName(TranslateWebServiceConfig.TRANSLATE_SERVICE_NS, "asyncTranslateRequest")),new AsynchResponseProcessor() {@Overrideprotected Object setCallbackResponse(CallbackResponse callbackResponse) {AsyncTranslateResponse res = new AsyncTranslateResponse();res.setConfirmAsyncTranslate(callbackResponse);return res;}}, jaxb(AsyncTranslateResponse.class)).withValidator(validator).build(this);}/*** Route for <strong>asyncTranslate</strong> operation - process delivery (=asynchronous execution).* Only input text is logged in this case.* <p/>* Prerequisite: none*/private void createAsyncRouteOut() {final String URI_LOG_INPUT_PARAMS = "direct:logInputParams";from(URI_ASYNC_OUT).routeId(ROUTE_ID_ASYNC_OUT)// xml -> AsyncTranslateRequest.unmarshal(jaxb(AsyncTranslateRequest.class)).to("extcall:message:" + URI_LOG_INPUT_PARAMS);from(URI_LOG_INPUT_PARAMS).validate(body().isInstanceOf(AsyncTranslateRequest.class)).log(LoggingLevel.DEBUG, "Asynchronous execution - input text '${body.inputText}' (lang: ${body.inputLang})");}
}
RouteIn使用AsynchRouteBuilder通過以下功能輕松配置:
- 定義哪個傳入的Web Service請求應開始此路由
- 定義源系統的確認響應。 輸入路由成功執行后,將返回對源系統的同步響應。
- 定義驗證器,該驗證器檢查傳入請求中是否存在元素inputText
RouteOut定義了異步過程本身。 輸入請求( AsyncTranslateRequest )僅在這種情況下記錄。
就這樣。 周圍的一切都由OpenHub框架實現。
外部通話
您的路線實施通常會調用外部系統或其他路線。 如果實現異步流程,則必須遵守冪等規則-流程的每個部分都可以被多次調用,并且必須確保所有調用的行為相同。 有時外部系統/路由本身是冪等的,然后您可以根據需要多次調用它。 如果沒有,那么您必須在實現中對其進行控制。 因此,我們將Camel組件設為extcall 。
上例中的組件excall確保即使整個異步過程運行了多次,使用URI_LOG_INPUT_PARAMS路由也會被精確地調用一次。
外部通話說明
描述:
- 在異步消息處理期間調用了兩個外部系統
- 在處理過程中我們可以返回兩個extcall的停靠點
- 如果在對外部系統1的第一次請求之前發生錯誤,則下一個處理嘗試將從頭開始,與新消息到達相同
漏斗和節流組件
其他強大的組件是funnel和throttling 。
Funnel組件用于過濾特定集成點處的并發消息。 這種過濾可確保在該位置僅處理一個特定類型的消息或同時具有特定信息的消息,即使是以保證的順序(可選選項)進行處理。 對于與外部系統進行通信很有用,該外部系統一次只能接受一個特定實體(例如,訂購系統中的一個特定客戶)的一條輸入消息。
通過第二個組件throttling ,您可以確保特定端點不會過載,或者我們不會超出任何外部服務的商定SLA。 Throttling組件也可以用于同步消息。
所有組件都支持群集。
實施細節
OpenHub需要保存的所有內容都保存在數據庫中–類型沒有限制。 無需調整JMS / MQ系統即可支持異步消息傳遞。 然后,您可以使用自己喜歡的任何工具進行日常工作- 數據模型簡單,清晰并且文檔齊全。 數據庫工具比JMS / MQ系統多得多。
有時我們聽說在這種情況下使用數據庫是一種反模式,從性能的角度來看,在某些情況下它可能是瓶頸。 這取決于實際項目中的集成用例,但在處理數十萬個并發請求的實際項目中,我們仍然沒有嚴格的性能限制。 我們準備添加JMS / MQ實現,但是到目前為止還不需要。
不必僅通過傳入請求啟動異步過程–您還可以使用調度作業在需要的任何時候啟動路由,然后將其留給OpenHub框架。
所有示例都可以在GitHub的參考實現中找到–參見https://github.com/OpenWiseSolutions/openhub-ri
翻譯自: https://www.javacodegeeks.com/2017/10/asynchronous-communication-made-openhub-framework.html
總結
以上是生活随笔為你收集整理的OpenHub框架进行的异步通信的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。