ActiveMQ菜鸟入门教程
本篇文章介紹一個簡單的ActiveMQdemo搭建,本人試過了代碼可以運行成功。
原文鏈接:https://www.songliguo.com/activemq-getting-started.html
ActiveMQ官網(wǎng)下載地址:?http://activemq.apache.org/download.html??
ActiveMQ安裝可參考:?https://blog.csdn.net/mr_haixin/article/details/80418204
ActiveMQ是Apache的一個開源項目,它是一個能力強勁的開源消息總線,也是一個中間件產(chǎn)品。它是JMS的一個實現(xiàn)。
在介紹ActiveMQ之前,先來復(fù)習(xí)一下J2EE中的JMS規(guī)范。
JMS是Java Message Service的簡稱,用來發(fā)送異步消息,在不同系統(tǒng)和不同的模塊之間我們可以利用它實現(xiàn)集成。
JMS有兩個好處,第一個就是讓模塊之間或者系統(tǒng)之間的耦合度降低,第二個是異步通信。
JMS的消息機制有2種模型,一種是Point to Point,表現(xiàn)為隊列的形式。發(fā)送的消息,只能被一個接收者取走;另一種是Topic,可以被多個訂閱者訂閱,類似于群發(fā)。
在JMS中有這樣幾個重要的核心接口和類:
- ConnectionFactory,用于jms client獲取與jms provider的連接。不同的jms產(chǎn)品,對這個接口有不同的實現(xiàn),比如說ActiveMQ,這個接口的實現(xiàn)類是ActiveMQConnectionFactory
- Connection,是由ConnectionFactory產(chǎn)生的,表示jms client與jms provider的連接
- ?Session,是由Connection產(chǎn)生的,表示一個會話。Session是關(guān)鍵組件,Message、Producer/Consumer、Destination都是在Session上創(chuàng)建的
- ?Message,這個組件很好理解,就是傳輸?shù)南?#xff0c;里面包括head、properties、body,其中head是必選的
- ?Destination,是消息源,對發(fā)送者來說,就是消息發(fā)到哪里;對接收者來說,就是從哪里取消息。Destination有2個子接口,Queue和Topic,分別對應(yīng)上面提到的2種模型
- MessageProducer,是消息發(fā)送者,創(chuàng)建這個組件的代碼類似:
?
?
| 1 2 3 4 5 6 | ? //創(chuàng)建一個Queue,名稱為SongLiGuo_FirstQueue destination = session.createQueue("SongLiGuo_FirstQueue"); //得到消息生產(chǎn)者【發(fā)送者】 messageProducer = session.createProducer(destination); ? |
可以注意到,這里需要把Destination作為參數(shù),傳入createProducer()方法,這說明消息發(fā)送者是綁定到Destination上的,這個發(fā)送者發(fā)送的消息,會發(fā)送到這個綁定的Destination上
- ?MessageConsumer,是消息接收者,和Message Producer是相反的一種組件
?
對JMS有所了解之后,我們來看ActiveMQ。
1.下載ActiveMQ
去官方網(wǎng)站下載:http://activemq.apache.org/
2.運行ActiveMQ
解壓縮apache-activemq-5.5.1-bin.zip,然后雙擊apache-activemq-5.5.1binactivemq.bat運行ActiveMQ程序。
啟動ActiveMQ以后,登陸:http://localhost:8161/admin/,創(chuàng)建一個Queue,命名為SongLiGuo_FirstQueue(如果這里創(chuàng)建了程序就不用創(chuàng)建)
3.創(chuàng)建Eclipse項目并運行
創(chuàng)建project:ActiveMQ-5.5,并導(dǎo)入apache-activemq-5.5.1lib目錄下需要用到的jar文件,項目結(jié)構(gòu)如下圖所示:
3.1.Sender.java
?
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | ? package com.songliguo.activemq; ? import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; ? import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; ? /** * *???? * 項目名稱:ActiveMQ-5.5?? * 類名稱:Sender?? * 類描述:?? ActiveMQ發(fā)送者 * 創(chuàng)建人:Songliguo?? * 創(chuàng)建時間:2017年3月14日 上午10:01:02?? * 修改人:?? * 修改時間: * 修改備注:?? * @version???? * */ public class Sender { ? private static final int SEND_NUMBER = 10; public static void main(String[] args) { //ConnectionFactory是連接工廠,JMS用它創(chuàng)建連接 ConnectionFactory connectionFactory; //Connection JMS客戶端到JMS provider的連接 Connection connection = null; //Session 一個發(fā)送或者接收消息的線程 Session session; //Destination 消息發(fā)送目的地,消息發(fā)送給誰接收 Destination destination; //MessageProducer 消息發(fā)送者 MessageProducer messageProducer; //構(gòu)造ConnectionFactory 實例對象,此處采用ActiveMQ的實現(xiàn)jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //構(gòu)造工廠得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); //創(chuàng)建一個Queue,名稱為SongLiGuo_FirstQueue ????????????????????????destination = session.createQueue("SongLiGuo_FirstQueue"); //得到消息生產(chǎn)者【發(fā)送者】 messageProducer = session.createProducer(destination); //設(shè)置不持久化,根據(jù)實際情況而定 messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //構(gòu)造消息,此處寫死,項目就是參數(shù)或者方法獲取 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally { try { if(null != connection){ connection.close(); } } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer)throws Exception { ????????for (int i = 1; i <= SEND_NUMBER; i++) { ????????????TextMessage message = session.createTextMessage("ActiveMq 發(fā)送的消息" + i); ????????????// 發(fā)送消息到目的地方 ????????????System.out.println("發(fā)送消息:" + "ActiveMq 發(fā)送的消息" + i); ????????????producer.send(message); ????????} ????} } ? ? |
?
3.2.Receiver.java
?
?
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | ? package com.songliguo.activemq; ? import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; ? import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; ? /** * *???? * 項目名稱:ActiveMQ-5.5?? * 類名稱:Receiver?? * 類描述:??activeMQ接收類 * 創(chuàng)建人:Songliguo?? * 創(chuàng)建時間:2017年3月14日 上午10:31:35?? * 修改人:?? * 修改時間: * 修改備注:?? * @version???? * */ public class Receiver { ? public static void main(String[] args) { //connectionFactory 連接工廠,JMS用它創(chuàng)建連接 ConnectionFactory connectionFactory; //connection JMS客戶端到JMS provider 的連接 Connection connection = null; //session一個發(fā)送或者接收的線程 Session session; //destination 消息目的地,發(fā)送給誰接收 Destination destination; //消費者消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); try { //構(gòu)造工廠得到連接對象 connection = connectionFactory.createConnection(); //啟動 connection.start(); //獲取操作連接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("SongLiGuo_FirstQueue"); ????????????consumer = session.createConsumer(destination); ????????????while(true){ ???????????? //設(shè)置接收者收消息的時間,為了方便測試,這里暫定設(shè)置為100s ???????????? TextMessage message = (TextMessage)consumer.receive(100); ???????????? if(null != message){ ???????????? System.out.println("收到消息==="+message.getText()); ???????????? }else{ ???????????? break; ???????????? } ????????????} } catch (Exception e) { e.printStackTrace(); }finally{ try { if(null != connection){ connection.close(); } } catch (Throwable ignore) { } } } } ? ? |
?
4.注意事項
項目所引用的jar最后在ActiveMQ下的lib中找
5.測試結(jié)果
運行sender,在運行完sender以后,我們可以看到如下console
我們再切換到receiver運行后的console,如下圖所示:
?
6.我們可以在http://localhost:8161/admin/queues.jsp看到消息發(fā)送和接收情況
轉(zhuǎn)載請注明原文鏈接:首頁?->?技術(shù)交流?->?JAVA開發(fā)?->?ActiveMQ菜鳥入門教程
總結(jié)
以上是生活随笔為你收集整理的ActiveMQ菜鸟入门教程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 恩施机器人编程_恩施武汉机器人激光切割机
- 下一篇: limbo模拟器镜像Android,li