009_JMS中的事务
1. 本地事務 ????
1.1. 在一個JMS客戶端, 可以使用本地事務來組合消息的發送和接收。JMS Session接口提供了commit和rollback方法。事務提交意味著生產的所有消息被發送, 消費的所有消息被確認; 事務回滾意味著生產的所有消息被銷毀, 消費的所有消息被恢復并重新提交, 除非它們已經過期。 ????
1.2. 事務性的會話總是牽涉到事務處理中, commit或rollback方法一旦被調用, 一個事務就結束了, 而另一個事務被開始。關閉事務性會話將回滾其中的事務。 ????
1.3. 需要注意的還有一個, 消息的生產和消費不能包含在同一個事務中。
2. 創建事務會話
2.1. 創建事務會話的時候transacted參數要設置為true, acknowledgeMode參數要設置為Session.SESSION_TRANSACTED。
// 1. 創建一個連接工廠 ConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 2. 創建連接 Connection conn = cf.createConnection(); // 4. 創建會話 Session session = conn.createSession(true, Session.SESSION_TRANSACTED);2.2. Session.SESSION_TRANSACTED值是專門為事務會話使用的。
3. 例子
3.1. 創建一個名為JMSTransacted的Java項目, 同時拷入相關jar包
3.2. 編輯MyProducer.java
package com.jmsapp.tra;import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSender; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyProducer {// 默認連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊列名稱private static final String queueName = "queueMapMsgTransacted";public static void main(String[] args) {// 1. 創建一個連接工廠QueueConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對象QueueConnection conn = null;// 會話對象QueueSession session = null;try {// 2. 創建連接conn = cf.createQueueConnection();// 3. 啟動連接conn.start();// 4. 創建會話session = conn.createQueueSession(true, Session.SESSION_TRANSACTED);// 5. 創建消息目的地。如果是點對點, 那么它的實現是Queue; 如果是訂閱模式, 那它的實現是Topic。這里我們創建一個名為queueMapMsgTransacted的消息隊列。Queue queue = session.createQueue(queueName);// 6. 消息生產者QueueSender sender = session.createSender(null);// 7. 創建文本消息和發送消息MapMessage user = session.createMapMessage();user.setLong("id", 100000000000L);user.setString("name", "lisi");sender.send(queue, user, DeliveryMode.PERSISTENT, 9, 1000 * 60 * 60);MapMessage userExtend = session.createMapMessage();userExtend.setChar("sex", '男');userExtend.setBoolean("married", false);sender.send(queue, userExtend, DeliveryMode.PERSISTENT, 5, 1000 * 60 * 60);// 8. 提交事務session.commit();} catch (JMSException e) {if(null != session) {try {session.rollback();} catch (JMSException e1) {e1.printStackTrace();}}e.printStackTrace();} finally {try {if (session != null) {session.close();}} catch (JMSException e1) {e1.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} }3.3. 點對點消息過期, 會把它存放到一個ActiveMQ.DLQ的隊列中
3.4. 編輯MyConsumer.java
package com.jmsapp.tra;import java.util.Enumeration; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueReceiver; import javax.jms.QueueSession; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyConsumer {// 默認連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊列名稱private static final String queueName = "queueMapMsgTransacted";public static void main(String[] args) {// 1. 創建一個連接工廠QueueConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對象QueueConnection conn = null;// 會話對象QueueSession session = null;try {// 2. 創建連接conn = cf.createQueueConnection();// 3. 啟動連接conn.start();// 4. 創建會話session = conn.createQueueSession(true, Session.SESSION_TRANSACTED);// 5. 創建消息目的地。如果是點對點, 那么它的實現是Queue; 如果是訂閱模式, 那它的實現是Topic。這里我們創建一個名為queueMapMsgTransacted的消息隊列。Queue queue = session.createQueue(queueName);// 6. 消息接收者QueueReceiver receiver = session.createReceiver(queue);// 7. 創建文本消息和發送消息while(true) {MapMessage msg = (MapMessage) receiver.receive(3000);if(msg == null) {break;}@SuppressWarnings("unchecked")Enumeration<String> e = msg.getMapNames();while(e.hasMoreElements()) {String name = e.nextElement();System.out.println(name + " = " + msg.getObject(name));}}// 8. 提交事務session.commit();} catch (JMSException e) {if(null != session) {try {session.rollback();} catch (JMSException e1) {e1.printStackTrace();}}e.printStackTrace();} finally {try {if (session != null) {session.close();}} catch (JMSException e1) {e1.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} }3.5. 再次運行MyProducer.java, 然后運行MyConsumer.java
總結
以上是生活随笔為你收集整理的009_JMS中的事务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 008_Queue消息模式发送映射消息
- 下一篇: 010_JMS消息选择器