007_JMS中的持久订阅
1. 持久訂閱時(shí), 客戶端需要首先向JMS提供者注冊(cè)一個(gè)表面自己身份的id(clientId)。這樣當(dāng)咱們這個(gè)客戶端處于離線時(shí), JMS提供者會(huì)為這個(gè)客戶端保存所有發(fā)送到主題的消息。當(dāng)客戶端再次連接到JMS提供者時(shí), JMS提供者根據(jù)這個(gè)客戶端id, 把消息發(fā)送給它。
2. 創(chuàng)建持久訂閱必須設(shè)置一個(gè)客戶端id, 不然會(huì)報(bào)如下錯(cuò)誤
3. 設(shè)置客戶端id
3.1. 設(shè)置客戶端id要緊跟在創(chuàng)建連接之后
// 1. 創(chuàng)建一個(gè)連接工廠 TopicConnectionFactory cf = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL); // 2. 創(chuàng)建連接 TopicConnection conn = cf.createTopicConnection(); // 3. 設(shè)置客戶端id conn.setClientID(clientId);3.2. 如果設(shè)置客戶端id沒(méi)有緊跟在創(chuàng)建連接之后回報(bào)如下錯(cuò)誤
4. 多個(gè)客戶端設(shè)置clientID, clientID不能重復(fù)。如果已有一個(gè)活動(dòng)的被clientID標(biāo)識(shí)的客戶端, 再出現(xiàn)一個(gè)重復(fù)clientID標(biāo)識(shí)的客戶端連接, 會(huì)報(bào)如下錯(cuò)誤
5. 持久訂閱的實(shí)現(xiàn)機(jī)制
5.1. 生產(chǎn)者發(fā)送消息給提供者, 如果此時(shí)提供者發(fā)現(xiàn)沒(méi)有任何的消費(fèi)者(包括在線/離線), 那么就會(huì)認(rèn)為該消息無(wú)用, 不需要存儲(chǔ), 會(huì)直接刪除。
5.2. 如果有在線的消費(fèi)者, 那么提供者會(huì)將消息直接傳送給在線的消費(fèi)者, 因?yàn)檫@個(gè)時(shí)候連接是通的, 消息有傳輸?shù)耐ǖ馈?/p>
5.3. 如果有離線的消費(fèi)者, 那么提供者會(huì)把屬于該消費(fèi)者的消息存儲(chǔ)下來(lái), 等消費(fèi)者在線的時(shí)候, 再將保存的離線消息推送給它。對(duì)于持久訂閱者, 提供者會(huì)在該消費(fèi)者第一次登錄在線的時(shí)候, 將它的身份信息記錄下來(lái)。記錄身份的關(guān)鍵就是clientID和主題名稱。當(dāng)持久訂閱者又重新在線的時(shí)候, 提供者會(huì)根據(jù)當(dāng)前連接的clientID和主題名稱, 去查詢屬于它的離線消息, 并進(jìn)行推送。
6. 例子
6.1. 新建一個(gè)名為JMSDurableSubscriber的Java項(xiàng)目, 同時(shí)拷入相關(guān)jar包
6.2. 編寫MyProducer.java
package com.jmsapp.persistent;import javax.jms.JMSException; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyProducer {// 默認(rèn)連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認(rèn)用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認(rèn)連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊(duì)列名稱private static final String topicName = "persistentSubscriber";public static void main(String[] args) {// 1. 創(chuàng)建一個(gè)連接工廠TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對(duì)象TopicConnection conn = null;// 會(huì)話對(duì)象TopicSession session = null;try {// 2. 創(chuàng)建連接conn = cf.createTopicConnection();// 3. 啟動(dòng)連接conn.start();// 4. 創(chuàng)建會(huì)話session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建消息目的地。如果是點(diǎn)對(duì)點(diǎn), 那么它的實(shí)現(xiàn)是Queue; 如果是訂閱模式, 那它的實(shí)現(xiàn)是Topic。這里我們創(chuàng)建一個(gè)名為persistentSubscriber的主題。Topic topic = session.createTopic(topicName);// 6. 消息生產(chǎn)者TopicPublisher publisher = session.createPublisher(topic);// 7. 創(chuàng)建文本消息和發(fā)送消息StreamMessage message = session.createStreamMessage();message.writeString("JMS中的持久訂閱");publisher.publish(message);} catch (JMSException e) {e.printStackTrace();} finally {try {if (session != null) {session.close();}} catch (JMSException e1) {e1.printStackTrace();} finally {if (conn != null) {try {conn.close();} catch (JMSException e) {e.printStackTrace();}}}}} }6.3. 編寫MyConsumer.java
package com.jmsapp.persistent;import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class MyConsumer {// 默認(rèn)連接用戶名private static final String dftUsr = ActiveMQConnection.DEFAULT_USER;// 默認(rèn)用戶密碼private static final String dftPwd = ActiveMQConnection.DEFAULT_PASSWORD;// 默認(rèn)連接地址private static final String dftUrl = ActiveMQConnection.DEFAULT_BROKER_URL;// 隊(duì)列名稱private static final String topicName = "persistentSubscriber";// 客戶端idprivate static final String clientId = "rjbd";public static void main(String[] args) {// 1. 創(chuàng)建一個(gè)連接工廠TopicConnectionFactory cf = new ActiveMQConnectionFactory(dftUsr, dftPwd, dftUrl);// 連接對(duì)象TopicConnection conn = null;// 會(huì)話對(duì)象TopicSession session = null;try {// 2. 創(chuàng)建連接conn = cf.createTopicConnection();// 3. 設(shè)置客戶端idconn.setClientID(clientId);// 4. 創(chuàng)建會(huì)話session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);// 5. 創(chuàng)建消息目的地。如果是點(diǎn)對(duì)點(diǎn), 那么它的實(shí)現(xiàn)是Queue; 如果是訂閱模式, 那它的實(shí)現(xiàn)是Topic。這里我們創(chuàng)建一個(gè)名為persistentSubscriber的主題。Topic topic = session.createTopic(topicName);// 6. 消息消費(fèi)者TopicSubscriber subscriber = session.createDurableSubscriber(topic, clientId);// 7. 接收消息subscriber.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message msg) {try {StreamMessage message = (StreamMessage) msg;System.out.println("接收: " + message.readString());} catch (JMSException e) {e.printStackTrace();}}});// 8. 啟動(dòng)連接, 準(zhǔn)備開始接收消息conn.start();} catch (JMSException e) {e.printStackTrace();}} }6.4. 運(yùn)行MyConsumer.java, 接收端是一直處于運(yùn)行狀態(tài)的
6.5. 終止運(yùn)行MyConsumer.java
6.6. 運(yùn)行MyProducer.java
6.7. 再次運(yùn)行MyConsumer.java, 接收到消息(非持久化訂閱, 這樣操作就接收不到消息)
總結(jié)
以上是生活随笔為你收集整理的007_JMS中的持久订阅的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 006_Topic消息模式发送对象消息
- 下一篇: 008_Queue消息模式发送映射消息