javascript
spring jms 事务_Spring JMS:处理事务中的消息
spring jms 事務
1.引言
這篇文章將向您展示使用JMS異步接收消息期間使用者執行過程中的錯誤如何導致消息丟失。 然后,我將解釋如何使用本地事務解決此問題。
您還將看到這種解決方案在某些情況下可能導致消息重復(例如,當它將消息保存到數據庫中,然后偵聽器執行失敗時)。 發生這種情況的原因是因為JMS事務獨立于其他事務資源(如DB)。 如果您的處理不是冪等的,或者您的應用程序不支持重復消息檢測,那么您將不得不使用分布式事務。
分布式事務超出了本文的范圍。 如果您對處理分布式事務感興趣,可以閱讀這篇有趣的文章。
我已經實現了一個再現以下情況的測試應用程序:
生產者將消息發送到隊列:
使用者從隊列中檢索消息并進行處理:
- 該應用程序的源代碼可以在github上找到。
2.測試應用
測試應用程序執行兩個測試類TestNotTransactedMessaging和TestTransactedMessaging 。 這些類都將執行上述三種情況。
讓我們看看在沒有事務的情況下執行應用程序時的配置。
app-config.xml
應用程序配置。 基本上,它會在指定的包中進行檢查以自動檢測應用Bean:生產者和使用者。 它還配置了將在其中存儲處理后的通知的內存數據庫。
<context:component-scan base-package="xpadro.spring.jms.producer, xpadro.spring.jms.receiver"/><bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"><constructor-arg ref="dataSource"/> </bean><jdbc:embedded-database id="dataSource"><jdbc:script location="classpath:db/schema.sql" /> </jdbc:embedded-database>notx-jms-config.xml
配置JMS基礎結構,該基礎結構是:
- 經紀人聯系
- JmsTemplate
- 將通知發送到的隊列
- 偵聽器容器,它將發送通知給偵聽器以處理它們
生產者僅使用jmsTemplate發送通知。
@Component("producer") public class Producer {private static Logger logger = LoggerFactory.getLogger(Producer.class);@Autowiredprivate JmsTemplate jmsTemplate;public void convertAndSendMessage(String destination, Notification notification) {jmsTemplate.convertAndSend(destination, notification);logger.info("Sending notification | Id: "+notification.getId());} }偵聽器負責從隊列中檢索通知,并將其存儲到數據庫中。
@Component("notificationProcessor") public class NotificationProcessor implements MessageListener {private static Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);@Autowiredprivate JdbcTemplate jdbcTemplate;@Overridepublic void onMessage(Message message) {try {Notification notification = (Notification) ((ObjectMessage) message).getObject();logger.info("Received notification | Id: "+notification.getId()+" | Redelivery: "+getDeliveryNumber(message));checkPreprocessException(notification);saveToBD(notification);checkPostprocessException(message, notification);} catch (JMSException e) {throw JmsUtils.convertJmsAccessException(e);}} ... }當id = 1的通知到達時, checkPreprocessException方法將拋出運行時異常。 這樣,在將消息存儲到數據庫之前,我們將導致錯誤。
如果到達id = 2的通知, checkPostprocessException方法將引發異常,從而在將其存儲到數據庫后立即引起錯誤。
getDeliveryNumber方法返回消息已發送的次數。 這僅適用于事務,因為在偵聽器處理失敗導致回滾之后,代理將嘗試重新發送消息。
最后, saveToDB方法非常明顯。 它將通知存儲到數據庫。
您始終可以通過單擊本文開頭的鏈接來檢查此應用程序的源代碼。
3.測試沒有交易的消息接收
我將啟動兩個測試類,一個不包含事務,另一個在本地事務中。 這兩個類都擴展了一個基類,該基類加載了公共應用程序上下文并包含一些實用程序方法:
@ContextConfiguration(locations = {"/xpadro/spring/jms/config/app-config.xml"}) @DirtiesContext public class TestBaseMessaging {protected static final String QUEUE_INCOMING = "incoming.queue";protected static final String QUEUE_DLQ = "ActiveMQ.DLQ";@Autowiredprotected JdbcTemplate jdbcTemplate;@Autowiredprotected JmsTemplate jmsTemplate;@Autowiredprotected Producer producer;@Beforepublic void prepareTest() {jdbcTemplate.update("delete from Notifications");}protected int getSavedNotifications() {return jdbcTemplate.queryForObject("select count(*) from Notifications", Integer.class);}protected int getMessagesInQueue(String queueName) {return jmsTemplate.browse(queueName, new BrowserCallback<Integer>() {@Overridepublic Integer doInJms(Session session, QueueBrowser browser) throws JMSException {Enumeration<?> messages = browser.getEnumeration();int total = 0;while (messages.hasMoreElements()) {messages.nextElement();total++;}return total;}});} }實用方法說明如下:
- getSavedNotifications :返回存儲到數據庫的通知數。 我使用了queryForObject方法,因為自版本3.2.2開始建議使用該方法。 queryForInt方法已被棄用。
- getMessagesInQueue :允許您檢查指定隊列中哪些消息仍在等待處理。 對于此測試,我們有興趣知道仍有多少通知等待處理。
現在,讓我向您展示第一個測試的代碼( TestNotTransactedMessaging )。 此測試啟動本文開頭指示的3種情況。
@Test public void testCorrectMessage() throws InterruptedException {Notification notification = new Notification(0, "notification to deliver correctly");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterReceiveMessage() throws InterruptedException {Notification notification = new Notification(1, "notification to fail after receiving");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(0, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }@Test public void testFailedAfterProcessingMessage() throws InterruptedException {Notification notification = new Notification(2, "notification to fail after processing");producer.convertAndSendMessage(QUEUE_INCOMING, notification);Thread.sleep(6000);printResults();assertEquals(1, getSavedNotifications());assertEquals(0, getMessagesInQueue(QUEUE_INCOMING)); }private void printResults() {logger.info("Total items in \"incoming\" queue: "+getMessagesInQueue(QUEUE_INCOMING));logger.info("Total items in DB: "+getSavedNotifications()); }4,執行測試
好的,讓我們執行測試,看看結果是什么:
testCorrectMessage輸出:
Producer|Sending notification | Id: 0 NotificationProcessor|Received notification | Id: 0 | Redelivery: 1 TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 1此處沒有問題,因為消息已正確接收并存儲到數據庫,所以隊列為空。
testFailedAfterReceiveMessage輸出:
Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestNotTransactedMessaging|Total items in "incoming" queue: 0 TestNotTransactedMessaging|Total items in DB: 0由于它在事務外部執行,因此使用確認模式(默認為自動)。 這意味著一旦調用onMessage方法并因此將其從隊列中刪除,就認為該消息已成功傳遞。 因為偵聽器在將消息存儲到DB之前失敗,所以我們丟失了消息!
testFailedAfterProcessingMessage輸出:
2013-08-22 18:39:09,906|Producer|Sending notification | Id: 2 2013-08-22 18:39:09,906|NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 2013-08-22 18:39:09,906|AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in "incoming" queue: 0 2013-08-22 18:39:15,921|TestNotTransactedMessaging|Total items in DB: 1在這種情況下,在執行失敗之前,已從隊列(AUTO_ACKNOWLEDGE)中刪除了該消息并將其存儲到DB。
5,添加本地交易
通常,我們不允許像測試的第二種情況那樣丟失消息,因此我們要做的是在本地事務中調用偵聽器。 所做的更改非常簡單,并不意味著從我們的應用程序中修改一行代碼。 我們只需要更改配置文件。
為了測試這3種事務的情況,我將以下配置文件notx-jms-config.xml替換為:
tx-jms-config.xml
首先,我添加了在發生回滾的情況下進行的重新傳遞的數量(由于偵聽器執行中的錯誤導致):
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="vm://embedded?broker.persistent=false"/><property name="redeliveryPolicy"><bean class="org.apache.activemq.RedeliveryPolicy"><property name="maximumRedeliveries" value="4"/></bean></property> </bean>接下來,我指示偵聽器將在事務內執行。 這可以通過修改偵聽器容器定義來完成:
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted"><jms:listener ref="notificationProcessor" destination="incoming.queue"/> </jms:listener-container>這將導致在本地JMS事務中執行對偵聽器的每次調用。 收到消息后,事務將開始。 如果偵聽器執行失敗,則消息接收將回滾。
這就是我們要做的一切。 讓我們使用此配置啟動測試。
6,測試交易中的消息接收
來自TestTransactedMessaging類的代碼實際上與先前的測試相同。 唯一的區別是,它向DLQ(死信隊列)添加了查詢。 在事務內執行時,如果回退消息接收,則代理會將消息發送到此隊列(在所有重新傳遞失敗之后)。
我跳過了成功接收的輸出,因為它不會帶來任何新的變化。
testFailedAfterReceiveMessage輸出:
Producer|Sending notification | Id: 1 NotificationProcessor|Received notification | Id: 1 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 2 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. ... java.lang.RuntimeException: error after receiving message NotificationProcessor|Received notification | Id: 1 | Redelivery: 5 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after receiving message TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 1 TestTransactedMessaging|Total items in DB: 0如您所見,第一次接收失敗,并且代理嘗試將其重新發送四次(如maximumRedeliveries屬性中所示)。 由于情況持續存在,因此消息已發送到特殊DLQ隊列。 這樣,我們不會丟失消息。
testFailedAfterProcessingMessage輸出:
Producer|Sending notification | Id: 2 NotificationProcessor|Received notification | Id: 2 | Redelivery: 1 AbstractMessageListenerContainer|Execution of JMS message listener failed, and no ErrorHandler has been set. java.lang.RuntimeException: error after processing message NotificationProcessor|Received notification | Id: 2 | Redelivery: 2 TestTransactedMessaging|Total items in "incoming" queue: 0 TestTransactedMessaging|Total items in "dead letter" queue: 0 TestTransactedMessaging|Total items in DB: 2在這種情況下,這是發生的情況:
7,結論
將本地事務添加到消息接收中可避免丟失消息。 我們必須考慮的是,可能會出現重復的消息,因此我們的偵聽器將必須檢測到它,否則我們的處理必須是冪等的才能再次進行處理。 如果這不可能,我們將不得不進行分布式事務,因為它們支持涉及不同資源的事務。
翻譯自: https://www.javacodegeeks.com/2014/02/spring-jms-processing-messages-within-transactions.html
spring jms 事務
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的spring jms 事务_Spring JMS:处理事务中的消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 绝世小攻歌词 绝世小攻介绍
- 下一篇: 使用AWS Lambda,S3和AWS