生活随笔
收集整理的這篇文章主要介紹了
activeMq初识 - 2
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
activeMq簡單實例:
package com.gordon; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*
; /** * 生產(chǎn)者* Created by gordon on 2018/9/8. */
public class Sender { public static final String userName = "admin"
; public static final String password = "admin"
; public static final String connect_url = "tcp://192.168.157.4:61616"
; public static final String QUEUE = "queue"
; public static final String TOPIC = "topic"
; public static void main(String[] args)
throws JMSException {
// testSender(QUEUE); testSender(TOPIC);
// 測試廣播模式先啟動客戶端
} private static void testSender(String mode)
throws JMSException {ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(userName,password,connect_url);Connection connection =
connectionFactory.createConnection();connection.start();Session session =
connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);TextMessage massage =
session.createTextMessage();
// Queue queue = session.createQueue(QUEUE);
// Topic topic = session.createTopic(TOPIC); Destination destination =
null ; if (mode.equals(QUEUE)){destination =
session.createQueue(mode);massage.setText( "測試queue隊列模式"
);} else if (mode.equals(TOPIC)){destination =
session.createTopic(mode);massage.setText( "測試topic廣播模式"
);}MessageProducer producer =
session.createProducer(destination); // 使用MessageProducer的serDeliveryMode方法為其設(shè)置持久化特性和非持久化特性(DeliveryMode) // producer.setDeliveryMode(DeliveryMode.PERSISTENT);將消息傳遞特性置為持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// 非持久化
producer.send(massage);System.out.println( "====生產(chǎn)者發(fā)送消息====="
); if (connection !=
null ){connection.close();}System.out.println( "====生產(chǎn)者發(fā)送消息===結(jié)束=="
);}} View Code package com.gordon; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*
;
import javax.jms.Destination;
import javax.jms.Queue; /** * 消費者* Created by gordon on 2018/9/6. */
public class Receiver { public static void main(String[] args)
throws JMSException {
// testReceiver(Sender.QUEUE);
testReceiver(Sender.TOPIC);} public static void testReceiver(String mode)
throws JMSException { /* 1、建立ConnectionFactory工廠對象,需要填入用戶名、密碼、以及要連接的地址,注意不要包的路徑,不要倒成spring里面的 */ ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(Sender.userName,Sender.password,Sender.connect_url); /* 2、通過ConnectionFactory工廠對象創(chuàng)建一個Connection連接,并且調(diào)用Connection.start()方法開啟連接,默認(rèn)是關(guān)閉的 */ Connection connection =
connectionFactory.createConnection();connection.start(); /* 3、通過Connection對象創(chuàng)建Session會話(上下文環(huán)境對象),用于接收消息。Connection可以創(chuàng)建一個或多個Session注意:會話是單線程的,所以消息是連續(xù)的,就是說消息是按照發(fā)送的順序一個一個接收的參數(shù)1:是否啟用事務(wù)(表示不開啟事務(wù)) 參數(shù)2:接收模式(一般設(shè)置為自動接收) */ Session session =
connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); /* 4、通過Session創(chuàng)建Destination對象,即一個客戶端用來指定生產(chǎn)者目標(biāo) 和 消費者來源 的對象Destination destination = session.createQueue("queue");PTP模式中:Destination被稱為Queue,隊列Pub/Sub模式中:Destination被稱為Topic,主題在程序中可以使用多個Queue 和 Topic */ // Queue queue1 = session.createQueue(mode); // 注意要和消息生產(chǎn)者的隊列名一致 Destination destination =
null ; if (mode.equals(Sender.QUEUE)){destination =
session.createQueue(mode);} else if (mode.equals(Sender.TOPIC)){destination =
session.createTopic(mode);} /* 5、通過Session對象創(chuàng)建消息的發(fā)送對接收對象(生產(chǎn)者或消費者,MessageProducer/MessageConsumer) */ MessageConsumer messageConsumer =
session.createConsumer(destination); // 多次運行main方法,測試創(chuàng)建多個消費者接收topic消息 System.out.println("messageConsumer : " +
messageConsumer.hashCode()); /* 6、使用JMS規(guī)范消息類型創(chuàng)建消息數(shù)據(jù)1. StreamMessage : Java原始值的數(shù)據(jù)流2. MapMessage : 一套名稱-值對3.TextMessage : 一個字符串對象4.ObjectMessage : 一個序列化的Java對象5.BytesMessage : 一個未解釋字節(jié)的數(shù)據(jù)流發(fā)送端使用MessageProducer.send()方法發(fā)送數(shù)據(jù)接收端使用MessageConsumer.receive()接收數(shù)據(jù) */ // System.out.println("AAAAAAAAAA"); // TextMessage msg = (TextMessage) messageConsumer.receive(); // receiver()同步方式,執(zhí)行后當(dāng)前一直掛起,等待接收消息,后面那句打印B的語句沒有輸出, // 知道發(fā)送者發(fā)送消息,這里接收到消息才會繼續(xù)往后執(zhí)行 // System.out.println("BBBBBBBBBB"); // System.out.println("===消費者接收===" + msg.getText()); /* 7、最后一定要關(guān)閉connection連接,否則ActiveMQ不能釋放資源,關(guān)閉一個Connection同樣也關(guān)閉了Session、MessageProducer、MessageConsumer。但是,客戶端一般不要關(guān)閉連接,除非特殊情況,這也是手動關(guān)閉(在頁面搞個按鈕點擊關(guān)閉)。為什么??很簡單,如果客戶端關(guān)閉,那如何接收消息?難道每次都要手動開啟連接??這也太麻煩了點。當(dāng)然也有這種操作,畢竟少。我們都希望它是自動接收,所以客戶端一般都一直保持連接 */
// if(connection != null){
// connection.close();
// System.out.println("===消費者接收===結(jié)束===");
// } // 生成環(huán)境一般是用監(jiān)聽器方式,此方式是異步執(zhí)行,同時生產(chǎn)者開啟事務(wù)或者使用持久化(持久化可以設(shè)置文件大小) messageConsumer.setMessageListener(
new CustomMessageListener());}} View Code package com.gordon; import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage; /** * 消息監(jiān)聽器,它會另起一個線程監(jiān)聽消息服務(wù)器* Created by gordon on 2018/9/8. */
public class CustomMessageListener
implements MessageListener {@Override public void onMessage(Message message) {TextMessage textMessage =
(TextMessage) message; try {String text =
textMessage.getText();System.out.println( " ===== 消費者 接收消息 === " +
text);} catch (JMSException e) {e.printStackTrace();}}
} View Code ?
啟動關(guān)閉:activeMq start / activeMq stop
activeMq端口:8161(web管理頁面端口)、61616(activemq服務(wù)監(jiān)控端口)
進(jìn)入web控制臺:http://192.168.157.4:8161
activeMq默認(rèn)登錄賬號密碼:admin admin消息發(fā)布的兩種方式 : topic :廣播形式?
(1)每個消息可以有多個消費者 (2)生產(chǎn)者和消費者之間有時間上的相關(guān)性。訂閱一個主題的消費者只能消費自它 ? ?訂閱之后發(fā)布的消息。JMS 規(guī)范允許客戶創(chuàng)建持久訂閱,這在一定程度上放松了時間 上的相關(guān)性要求。持久訂閱允許消費者消費它在未處于激活狀態(tài)時發(fā)送的消息。
ptp :點對點形式?
(1)每個消息只能有一個消費者 (2)消息的生產(chǎn)者和消費者之間沒有時間上的相關(guān)性。無論消費者在生產(chǎn)者發(fā)送消 ?息的時候是否處于運行狀態(tài),它都可以提取消息。
在點對點消息傳遞域中,目的地被稱為隊列(queue);在發(fā)布/訂閱消息傳遞域中,目的地被稱為主題(topic)
activeMq web Queues控制臺 : Number of Pending Message:等待消費的隊列 Number of Consumers :該隊列被多少個消費者消費 Message Enqueued :進(jìn)入隊列的消息總數(shù)量(只增不減) Message Dequeued :出了隊列的消息總數(shù)量(即被消費了的消息數(shù)量)
?
參考: ?[activeMq基礎(chǔ)系列文章]("https://www.cnblogs.com/HigginCui/category/955998.html") ?[activeMq消費者是如何接收消息的]("http://manzhizhen.iteye.com/category/318342") ?[activeMq性能優(yōu)化系列文章]("https://blog.csdn.net/yinwenjie/article/details/50916518") ?[activeMq消息機制以及ACK機制詳解]("http://shift-alt-ctrl.iteye.com/blog/2020182") ?[activeMq 常見問題解決]("https://blog.csdn.net/niwei3527/article/details/80607550") ?[activeMq面試題]("https://blog.csdn.net/belvine/article/details/79399798")
?
轉(zhuǎn)載于:https://www.cnblogs.com/for-what/p/9688974.html
總結(jié)
以上是生活随笔 為你收集整理的activeMq初识 - 2 的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔 推薦給好友。