javascript
错误处理在Spring Integration中如何工作
1.引言
這篇文章的目標(biāo)是向您展示將消息傳遞系統(tǒng)與Spring Integration結(jié)合使用時(shí)如何處理錯(cuò)誤。 您將看到同步和異步消息傳遞之間的錯(cuò)誤處理有所不同。 和往常一樣,我將跳過聊天并繼續(xù)進(jìn)行一些示例。
- 您可以在github上獲取源代碼。
2,樣品申請(qǐng)
我將使用一個(gè)基本示例,因?yàn)槲蚁雽W⒂诋惓L幚怼?該應(yīng)用程序包含一個(gè)訂單服務(wù),該服務(wù)接收訂單,處理訂單并返回確認(rèn)。
下面我們可以看到消息傳遞系統(tǒng)的配置方式:
int-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/><int:router input-channel="requestChannel" ref="orderRouter" method="redirectOrder"/><int:channel id="syncChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="syncChannel" ref="orderProcessor"/><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator>網(wǎng)關(guān)是消息傳遞系統(tǒng)的入口點(diǎn)。 它將接收訂單并將其發(fā)送到直接通道“ requestChannel”,路由器將根據(jù)訂單ID將其重定向到適當(dāng)?shù)耐ǖ?#xff1a;
- syncChannel:一個(gè)直接通道 ,它將訂單發(fā)送到訂閱該通道的訂單處理器。
- asyncChannel:一個(gè)隊(duì)列通道 ,訂單處理器將從中主動(dòng)檢索訂單。
處理訂單后,訂單確認(rèn)將發(fā)送回網(wǎng)關(guān)。 這是代表此的圖形:
好的,讓我們從最簡單的情況開始,使用直接通道進(jìn)行同步發(fā)送。
3.與直接通道同步發(fā)送
訂單處理器已訂閱“ syncChannel”直接渠道。 “ processOrder”方法將在發(fā)送者的線程中調(diào)用。
public OrderConfirmation processOrder(Order order) {logger.info("Processing order {}", order.getId());if (isInvalidOrder(order)) {logger.info("Error while processing order [{}]", ERROR_INVALID_ID);throw new InvalidOrderException(ERROR_INVALID_ID);}return new OrderConfirmation("confirmed"); }現(xiàn)在,我們將執(zhí)行一個(gè)測(cè)試,該測(cè)試將通過發(fā)送無效訂單來引發(fā)異常。 此測(cè)試將向網(wǎng)關(guān)發(fā)送訂單:
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order); }考試:
TestSyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestSyncErrorHandling {@Autowiredprivate OrderService service;@Testpublic void testCorrectOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());}@Testpublic void testSyncErrorHandling() {OrderConfirmation confirmation = null;try {confirmation = service.sendOrder(new Order(1, "an invalid order"));Assert.fail("Should throw a MessageHandlingException");} catch (MessageHandlingException e) {Assert.assertEquals(InvalidOrderException.class, e.getCause().getClass());Assert.assertNull(confirmation);}} }我們運(yùn)行測(cè)試,看看在訂單處理器中如何引發(fā)異常并到達(dá)測(cè)試。 沒關(guān)系; 我們想驗(yàn)證發(fā)送無效訂單是否引發(fā)了異常。 發(fā)生這種情況是因?yàn)闇y(cè)試發(fā)送了訂單,并阻止等待在同一線程中處理訂單。 但是,當(dāng)我們使用異步通道時(shí)會(huì)發(fā)生什么? 讓我們繼續(xù)下一節(jié)。
4,與隊(duì)列通道異步發(fā)送
此部分的測(cè)試發(fā)送一個(gè)命令,該命令將由路由器重定向到隊(duì)列通道。 網(wǎng)關(guān)如下所示:
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order); }請(qǐng)注意,這次網(wǎng)關(guān)正在返回Future 。 如果我們不返回此值,則網(wǎng)關(guān)將阻止測(cè)試線程。 通過返回Future,網(wǎng)關(guān)將變?yōu)楫惒綘顟B(tài),并且不會(huì)阻塞發(fā)送方的線程。
考試:
TestKoAsyncErrorHandling.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestKoAsyncErrorHandling {@Autowiredprivate OrderService service;@Test(expected=MessageHandlingException.class)public void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));} }好的,現(xiàn)在我們將啟動(dòng)測(cè)試并看到引發(fā)異常的信息…
java.lang.AssertionError: Expected exception: org.springframework.integration.MessageHandlingException糟糕,測(cè)試失敗,因?yàn)闆]有異常到達(dá)測(cè)試! 發(fā)生了什么? 好吧,解釋如下:
<int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator>由于我們使用的是異步通道(隊(duì)列),因此發(fā)送者發(fā)送訂單并繼續(xù)前進(jìn)。 然后,接收方從另一個(gè)線程輪詢訂單。 因此,不可能將Exception拋回到發(fā)送方。 讓我們表現(xiàn)得好像什么都沒發(fā)生嗎? 好吧,您最好不要,還有其他選擇。
5,異步錯(cuò)誤處理
當(dāng)使用異步消息傳遞時(shí),Spring Integration通過將異常發(fā)布到消息通道來處理它們。 引發(fā)的異常將包裝到MessagingException中,并成為消息的有效負(fù)載。
錯(cuò)誤消息發(fā)送到哪個(gè)通道? 首先,它將檢查請(qǐng)求消息是否包含名為“ errorChannel”的標(biāo)頭。 如果找到,錯(cuò)誤消息將被發(fā)送到那里。 否則,該消息將被發(fā)送到所謂的全局錯(cuò)誤通道。
5.1全局錯(cuò)誤通道
默認(rèn)情況下,Spring Integration創(chuàng)建一個(gè)名為“ errorChannel”的全局錯(cuò)誤通道。 該頻道是發(fā)布-訂閱頻道。 這意味著我們可以為該頻道訂閱多個(gè)端點(diǎn)。 實(shí)際上,已經(jīng)有一個(gè)端點(diǎn)訂閱了它:一個(gè)日志記錄處理程序 。該處理程序?qū)⒂涗浀竭_(dá)通道的消息的有效負(fù)載,盡管可以將其配置為不同的行為。
現(xiàn)在,我們將向該全局通道訂閱一個(gè)新的處理程序,并通過將其存儲(chǔ)到數(shù)據(jù)庫中來測(cè)試它是否接收到異常消息。
首先,我們需要在配置中進(jìn)行一些更改。 我創(chuàng)建了一個(gè)新文件,因此它不會(huì)干擾我們之前的測(cè)試:
int-async-config.xml
<context:component-scan base-package="xpadro.spring.integration"/><int:gateway default-request-channel="asyncChannel" service-interface="xpadro.spring.integration.service.OrderService" error-channel="errorChannel"/><int:channel id="asyncChannel"><int:queue capacity="5"/> </int:channel><int:service-activator method="processOrder" input-channel="asyncChannel" ref="orderProcessor"><int:poller fixed-delay="2000"/> </int:service-activator><int:service-activator input-channel="errorChannel" ref="orderErrorHandler" method="handleFailedOrder"/><bean id="orderErrorHandler" class="xpadro.spring.integration.activator.OrderErrorHandler"/>網(wǎng)關(guān) :我添加了一個(gè)錯(cuò)誤通道。 如果調(diào)用失敗,錯(cuò)誤消息將發(fā)送到該通道。 如果我沒有定義錯(cuò)誤通道,則網(wǎng)關(guān)會(huì)將異常傳播給調(diào)用者,但是在這種情況下,由于這是一個(gè)異步網(wǎng)關(guān),因此無法正常工作。
錯(cuò)誤處理程序 :我已經(jīng)定義了一個(gè)新的端點(diǎn),該端點(diǎn)已訂閱了全局錯(cuò)誤通道。 現(xiàn)在,任何發(fā)送到全局錯(cuò)誤通道的錯(cuò)誤消息都將傳遞給我們的處理程序。
我還添加了一個(gè)配置文件以配置數(shù)據(jù)庫。 我們的錯(cuò)誤處理程序會(huì)將收到的錯(cuò)誤插入此數(shù)據(jù)庫:
db-config.xml
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/> </bean><!-- in-memory database --> <jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schemas/schema.sql" /> </jdbc:embedded-database>錯(cuò)誤處理程序非常簡單。 它收到錯(cuò)誤消息,并將其信息插入數(shù)據(jù)庫:
public class OrderErrorHandler {@Autowiredprivate JdbcTemplate jdbcTemplate;@ServiceActivatorpublic void handleFailedOrder(Message<MessageHandlingException> message) {Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();saveToBD(requestedOrder.getId(), message.getPayload().getMessage());}private void saveToBD(int orderId, String errorMessage) {String query = "insert into errors(orderid, message) values (?,?)";jdbcTemplate.update(query, orderId, errorMessage);} }好的,現(xiàn)在一切就緒。 讓我們實(shí)施一個(gè)新的測(cè)試:
TestOkAsyncErrorHandlingTest.java
@ContextConfiguration(locations = {"/xpadro/spring/integration/config/int-async-config.xml","/xpadro/spring/integration/config/db-config.xml"}) @RunWith(SpringJUnit4ClassRunner.class) public class TestOkAsyncErrorHandling {@Autowiredprivate JdbcTemplate jdbcTemplate;@Autowiredprivate OrderService service;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from errors");}@Testpublic void testCorrectOrder() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(7, "another correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());}@Testpublic void testAsyncErrorHandling() throws InterruptedException, ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(6, "another order"));Thread.sleep(2000);Assert.assertEquals(1, getSavedErrors());validateSavedError(6);}private int getSavedErrors() {return jdbcTemplate.queryForObject("select count(*) from errors", Integer.class);}private void validateSavedError(int orderId) {String query = "select * from errors where orderid=?";Map<String, Object> result = jdbcTemplate.queryForMap(query, orderId);Assert.assertEquals(6, result.get("orderid"));assertThat((String)result.get("message"), containsString("Order ID is invalid"));} }這次測(cè)試成功,錯(cuò)誤消息已存儲(chǔ)到數(shù)據(jù)庫。
5.2其他機(jī)制
自定義錯(cuò)誤通道 :您可以定義錯(cuò)誤通道并將其定義為隊(duì)列通道,而不是默認(rèn)的發(fā)布-訂閱通道:
<int:poller id="defaultPoller" default="true" fixed-delay="5000" /><int:channel id="errorChannel"><int:queue capacity="10"/> </int:channel>ErrorMessageExceptionTypeRouter :這個(gè)Spring Integration專用路由器將解析將錯(cuò)誤消息發(fā)送到的通道。 它基于錯(cuò)誤的最具體原因做出決定:
<int:exception-type-router input-channel="errorChannel" default-output-channel="genericErrorChannel"><int:mapping exception-type="xpadro.spring.integration.exception.InvalidOrderException" channel="invalidChannel" /><int:mapping exception-type="xpadro.spring.integration.exception.FooException" channel="fooChannel" /> </int:exception-type-router>六,結(jié)論
我們已經(jīng)了解了使用Spring Integration時(shí)錯(cuò)誤處理的不同機(jī)制是什么。 有了這個(gè)基礎(chǔ),您將能夠通過實(shí)現(xiàn)轉(zhuǎn)換器從錯(cuò)誤消息中提取信息,使用標(biāo)頭擴(kuò)展器設(shè)置錯(cuò)誤通道或?qū)崿F(xiàn)自己的路由器等來擴(kuò)展它并配置錯(cuò)誤處理。
翻譯自: https://www.javacodegeeks.com/2014/02/how-error-handling-works-in-spring-integration.html
總結(jié)
以上是生活随笔為你收集整理的错误处理在Spring Integration中如何工作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java 8 Friday Goodie
- 下一篇: 提高 3D 模型贴图清晰度,眸瑞科技与沐