ActiveMQ queue 代码示例
生活随笔
收集整理的這篇文章主要介紹了
ActiveMQ queue 代码示例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
生產者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSProducer {//默認連接用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認連接密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認連接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//發送的消息數量private static final int SENDNUM = 10;public static void main(String[] args) {//連接工廠 ConnectionFactory connectionFactory;//連接Connection connection = null;//會話 接受或者發送消息的線程Session session = null;//消息的目的地 Destination destination;//消息生產者 MessageProducer messageProducer;//消息隊列名稱String queueName = "helloWord"; //實例化連接工廠connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通過連接工廠獲取連接connection = connectionFactory.createConnection();//啟動連接 connection.start();//創建sessionsession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建一個連接自定義隊列名稱的消息隊列destination = session.createQueue(queueName);//創建消息生產者messageProducer = session.createProducer(destination);//發送消息 sendMessage(session, messageProducer);session.commit();} catch (Exception e) {e.printStackTrace();}finally{if(connection != null){try {session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}}}/*** 發送消息* @param session* @param messageProducer 消息生產者* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{for (int i = 0; i < SENDNUM; i++) {//創建一條文本消息 TextMessage message = session.createTextMessage("ActiveMQ 發送消息" +i);System.out.println("發送消息:Activemq 發送消息" + i);//通過消息生產者發出消息 messageProducer.send(message);}} }消費者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer {//默認連接用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認連接密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認連接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args) {//連接工廠 ConnectionFactory connectionFactory;//連接Connection connection = null;//會話 接受或者發送消息的線程 Session session;//消息的目的地 Destination destination;//消息的消費者 MessageConsumer messageConsumer;//消息隊列名稱String queueName = "helloWord";//實例化連接工廠connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通過連接工廠獲取連接connection = connectionFactory.createConnection();//啟動連接 connection.start();//創建sessionsession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建一個連接自定義隊列名稱的消息隊列destination = session.createQueue(queueName);//創建消息消費者messageConsumer = session.createConsumer(destination);while (true) {TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);if(textMessage != null){System.out.println("收到的消息:" + textMessage.getText());}else {break;}}} catch (JMSException e) {e.printStackTrace();}} }多線程生產者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSProducerMultithreading implements Runnable{//默認連接用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認連接密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認連接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//發送的消息數量private static final int SENDNUM = 3;/*** 發送消息* @param session* @param messageProducer 消息生產者* @throws Exception*/public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{for (int i = 0; i < SENDNUM; i++) {//獲取當前線程idString threadId = Thread.currentThread().getId()+"";//創建一條文本消息 TextMessage message = session.createTextMessage("ActiveMQ 發送消息" +i+"生產者線程編號="+threadId);//控制臺打印System.out.println("ActiveMQ 發送消息" +i+"生產者線程編號="+threadId);//通過消息生產者發出消息 messageProducer.send(message);}}@Overridepublic void run() {//連接工廠 ConnectionFactory connectionFactory;//連接Connection connection = null;//會話 接受或者發送消息的線程Session session = null;//消息的目的地 Destination destination;//消息生產者 MessageProducer messageProducer;//消息隊列名稱String queueName = "Multithreading";//實例化連接工廠connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通過連接工廠獲取連接connection = connectionFactory.createConnection();//啟動連接 connection.start();//創建sessionsession = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);//創建一個名稱為HelloWorld的消息隊列destination = session.createQueue(queueName);//創建消息生產者messageProducer = session.createProducer(destination);//發送消息 sendMessage(session, messageProducer);session.commit();} catch (Exception e) {e.printStackTrace();}finally{if(connection != null){try {session.close();connection.close();} catch (JMSException e) {e.printStackTrace();}}}} }多線程消費者:
package com.111.activemq;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory;public class JMSConsumerMultithreading implements Runnable{//默認連接用戶名private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默認連接密碼private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默認連接地址private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;@Overridepublic void run() {ConnectionFactory connectionFactory;//連接工廠Connection connection = null;//連接 Session session;//會話 接受或者發送消息的線程Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消費者//消息隊列名稱String queueName = "Multithreading";//實例化連接工廠connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);try {//通過連接工廠獲取連接connection = connectionFactory.createConnection();//啟動連接 connection.start();//創建sessionsession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//創建一個連接HelloWorld的消息隊列destination = session.createQueue(queueName);//創建消息消費者messageConsumer = session.createConsumer(destination);String threadId = Thread.currentThread().getId()+""; while (true) {TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);if(textMessage != null){System.out.println("收到的消息:" + textMessage.getText()+" 消費者線程編號="+threadId);}else {break;}}} catch (JMSException e) {e.printStackTrace();}} }多線程生產者測試類:
package com.111.activemq;public class JMSProducerMultithreadingTest {public static void main(String[] args) {JMSProducerMultithreading jpm = new JMSProducerMultithreading();//啟動10個生產者線程for(int i = 0 ; i < 10 ; i++){Thread t = new Thread(jpm);t.start();}} }多線程消費者測試類:
package com.111.activemq;public class JMSConsumerMultithreadingTest {public static void main(String[] args) {JMSConsumerMultithreading jcm = new JMSConsumerMultithreading();//啟動3個消費者者線程for(int i = 0 ; i < 3 ; i++){Thread t = new Thread(jcm);t.start();}} }?
轉載于:https://www.cnblogs.com/MIC2016/p/6098647.html
總結
以上是生活随笔為你收集整理的ActiveMQ queue 代码示例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 梦到别人建新房子好不好
- 下一篇: 做梦梦到把蛇打死了好不好