ActiveMQ 发送和接收消息
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ 发送和接收消息
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一、添加 jar 包
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.11.2</version> </dependency>二、消息傳遞的兩種形式
1、點對點:發送的消息只能被一個消費者接收,第一個消費者接收后,消息沒了
2、發布/訂閱:消息可以被多個消費者接收?。發完消息,如果沒有消費者接收,這消息會自動消失。也就是說,消費者服務必須是啟動的狀態。( topic 消息在 ActiveMQ 服務端默認不是持久化的,可以通過配置文件配置持久化 )
三、點對點發送消息
/*** 點到點形式發送消息* @throws Exception*/ @Test public void testQueueProducer() throws Exception{//1、創建一個連接工廠,需要指定服務的 ip 和端口String brokerURL = "tcp://192.168.25.129:61616";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);//2、使用工廠對象創建一個 Connection 對象Connection connection = connectionFactory.createConnection();//3、開啟連接,調用 Connection 對象的 start 方法 connection.start();//4、創建一個 Session 對象。//第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義);//第二個參數:應答模式。自動應答或者手動應答,一般是自動應答Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。Queue queue = session.createQueue("test-queue");//6、使用 Session 對象創建一個 Producer 對象MessageProducer producer = session.createProducer(queue);//7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以/*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello ActiveMQ");*/TextMessage textMessage = session.createTextMessage("hello ActiveMQ");//8、發布消息 producer.send(textMessage);//9、關閉資源 producer.close();session.close();connection.close(); }四、點對點接收消息
/*** 點對點接收消息* @throws Exception*/ @Test public void testQueueConsumer() throws Exception{//1、創建一個 ConnectionFactory 對象連接 MQ 服務器String brokerURL = "tcp://192.168.25.129:61616";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);//2、創建一個連接對象Connection connection = connectionFactory.createConnection();//3、開啟連接 connection.start();//4、使用 Connection 對象 創建一個 Session 對象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、創建一個 Destination 對象。queue 對象Queue queue = session.createQueue("test-queue");//6、使用 Session 對象創建一個消費者MessageConsumer consumer = session.createConsumer(queue);//7、接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {//8、打印結果TextMessage textMessage = (TextMessage) message;try {String text = textMessage.getText();System.out.println(text);} catch (JMSException e) {// TODO Auto-generated catch block e.printStackTrace();}}});//9、等待接收消息。( 接收到消息后才網下面執行。關閉資源 ) System.in.read();//10、關閉資源 consumer.close();session.close();connection.close();}?五、廣播發送消息
/*** 廣播發送消息* @throws Exception*/ @Test public void testTopicProducer() throws Exception{//1、創建一個連接工廠,需要指定服務的 ip 和端口String brokerURL = "tcp://192.168.25.129:61616";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);//2、使用工廠對象創建一個 Connection 對象Connection connection = connectionFactory.createConnection();//3、開啟連接,調用 Connection 對象的 start 方法 connection.start();//4、創建一個 Session 對象。//第一個參數:是否開啟事務(一般不開啟,如果開啟事務,第二個參數沒意義);//第二個參數:應答模式。自動應答或者手動應答,一般是自動應答Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、使用 Session 對象創建一個 Destination 對象。兩種形式 queue、topic。Topic topic = session.createTopic("test-topic");//6、使用 Session 對象創建一個 Producer 對象MessageProducer producer = session.createProducer(topic);//7、創建一個 Message 對象,可以使用 TextMessage。下面兩種方式都可以/*TextMessage textMessage = new ActiveMQTextMessage(); textMessage.setText("hello ActiveMQ");*/TextMessage textMessage = session.createTextMessage("hello ActiveMQ");//8、發布消息 producer.send(textMessage);//9、關閉資源 producer.close();session.close();connection.close(); }六、廣播接收消息
/*** 廣播接收消息* @throws Exception*/ @Test public void testTopicConsumer() throws Exception{//1、創建一個 ConnectionFactory 對象連接 MQ 服務器String brokerURL = "tcp://192.168.25.129:61616";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);//2、創建一個連接對象Connection connection = connectionFactory.createConnection();//3、開啟連接 connection.start();//4、使用 Connection 對象 創建一個 Session 對象Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//5、創建一個 Destination 對象。Topic 對象Topic topic = session.createTopic("test-topic");//6、使用 Session 對象創建一個消費者MessageConsumer consumer = session.createConsumer(topic);//7、接收消息consumer.setMessageListener(new MessageListener() {@Overridepublic void onMessage(Message message) {//8、打印結果TextMessage textMessage = (TextMessage) message;try {String text = textMessage.getText();System.out.println(text);} catch (JMSException e) {// TODO Auto-generated catch block e.printStackTrace();}}});System.out.println("topic消費者");//9、等待接收消息。( 接收到消息后才網下面執行。關閉資源 ) System.in.read();//10、關閉資源 consumer.close();session.close();connection.close(); }?
轉載于:https://www.cnblogs.com/fangwu/p/8669036.html
總結
以上是生活随笔為你收集整理的ActiveMQ 发送和接收消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网页版blast构建
- 下一篇: python数据图形化—— matplo