JMS学习二(简单的ActiveMQ实例)
下載安裝ActiveMQ服務(wù),下載地址當然可以去官網(wǎng)下載?
http://activemq.apache.org/download-archives.html
ActiveMQ安裝很簡單,下載解壓后到bin目錄就有win32 和win64兩個目錄按照自己的系統(tǒng)進入后就有activemq.bat來啟動ActiveMQ服務(wù)
一、點對點消息模型實例
使用queue作為目的之
1、消息發(fā)送端
package mqtest1; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { int i =0; //鏈接工廠 ActiveMQConnectionFactory connectionFactory = null; //鏈接對象 Connection connection = null; //會話 Session session = null; //隊列(目的地、生產(chǎn)者發(fā)送消息的目的地) Queue queue = null; //消息生產(chǎn)者 MessageProducer producer = null; connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動提交 //參數(shù)二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收 session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test_queue"); //為隊列創(chuàng)建消息生產(chǎn)者 producer = session.createProducer(queue); //消息是否為持久性的,這個不設(shè)置也是可以的,默認是持久的 //producer.setDeliveryMode(DeliveryMode.PERSISTENT); //消息設(shè)置為持久的發(fā)送后及時服務(wù)關(guān)閉了再次開啟消息也不會丟失。 //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //發(fā)送后如果服務(wù)關(guān)閉再次開啟則消息會丟失。 while (true){ //創(chuàng)建消息 TextMessage message = session.createTextMessage(); message.setText("測試隊列消息"+i); //發(fā)送消息到目的地 producer.send(message); i++; if(i>10) { break; } } session.commit(); System.out.println("呵呵消息發(fā)送結(jié)束"); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { //釋放資源 //producer.close(); //session.close(); //connection.close(); } } }?
2、消息消費端
package mqtest1; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Receive { public static void main(String[] args) { // 鏈接工廠 ActiveMQConnectionFactory connectionFactory = null; // 鏈接對象 Connection connection = null; // 會話 Session session = null; // 隊列(目的地,消費者消費消息的地方) Queue queue = null; // 消息消費者 MessageConsumer consumer = null; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); // 創(chuàng)建session是的true 和false session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); queue = session.createQueue("test_queue"); // 隊列(目的地,消費者消費消息的地方) consumer = session.createConsumer(queue); // 消息消費者 // Message message = consumer.receive(); //同步方式接收 consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String value = textMessage.getText(); System.out.println("value: " + value); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }?
點對點模型Destination作為目的地
?
1、消息發(fā)送端
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class TestMQ { public static void main(String[] args) { int i =0; //鏈接工廠 ConnectionFactory connectionFactory = null; // 鏈接對象 Connection connection = null; // 會話對象 Session session = null; // 目的地 Destination destination = null; // 消息生產(chǎn)者 MessageProducer producer = null; connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616"); try { connection = connectionFactory.createConnection(); connection.start(); //第一個參數(shù)是否開啟事務(wù) true開啟 ,false不開啟事務(wù),如果開啟記得手動提交 //參數(shù)二,表示的是簽收模式,一般使用的有自動簽收和客戶端自己確認簽收 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test-queue"); //為目的地創(chuàng)建消息生產(chǎn)者 producer = session.createProducer(destination); //消息是否為持久性的,這個不設(shè)置也是可以的,默認是持久的 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TestBean tbean = new TestBean(); tbean.setAge(25); tbean.setName("hellojava" +i); producer.send(session.createObjectMessage(tbean)); i++; if( i>10) { break; } } System.out.println("呵呵消息已發(fā)送"); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally { try { producer.close(); session.close(); connection.close(); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }?
?
2、消息消費端
package mq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class AcceptMq { public static void main(String[] args) { ConnectionFactory connectionFactory; // Connection :JMS 客戶端到JMS Provider 的連接 Connection connection = null; // Session: 一個發(fā)送或接收消息的線程 Session session = null; // Destination :消息的目的地;消息發(fā)送給誰. Destination destination = null; // 消費者,消息接收者 //MessageConsumer consumer = null; connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.1.120:61616"); try { //通過工廠創(chuàng)建鏈接 connection = connectionFactory.createConnection(); //啟動鏈接 connection.start(); //創(chuàng)建會話 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); //消息目的地 destination = session.createQueue("test-queue"); //消息消費者 MessageConsumer consumer = session.createConsumer(destination); //同步方式接受信息,如果還沒有獲取到則會阻塞直到接收到信息 /*Message messages = consumer.receive(); TestBean value =(TestBean)((ObjectMessage)messages).getObject(); String name = value.getName();*/ consumer.setMessageListener(new MessageListener(){ @Override public void onMessage(Message message){ try { TestBean tbean =(TestBean)((ObjectMessage)message).getObject(); System.out.println("tbean: "+tbean); if(null != message) { System.out.println("收到信息1: "+tbean.getName()); } } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }?
3、bean 類
package mq; import java.io.Serializable; public class TestBean implements Serializable{ private int age; private String name; public TestBean() {}; public TestBean(int age, String name) { this.age = age; this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public String getName() { return name; } public void setName(String name) { this.name = name; } }?
二、發(fā)布/訂閱消息模型實例
1、消息發(fā)布端
package mq; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class PSMQ { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創(chuàng)建話題 Topic topic = session.createTopic("myTopic.messages"); //為話題創(chuàng)建消息生產(chǎn)者 MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); while(true) { TextMessage message = session.createTextMessage(); message.setText("message_" + System.currentTimeMillis()); producer.send(message); System.out.println("Sent message: " + message.getText()); } } }?
2、消息訂閱端
package mq; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.MessageListener; import org.apache.activemq.ActiveMQConnectionFactory; public class PSAccept { public static void main(String[] args) throws JMSException { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.101:61616"); Connection connection = factory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //創(chuàng)建話題 Topic topic = session.createTopic("myTopic.messages"); //為話題創(chuàng)建消費者 MessageConsumer consumer = session.createConsumer(topic); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage tm = (TextMessage) message; try { System.out.println("Received message: " + tm.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }?
點對點消息模型和發(fā)布/訂閱消息模型兩種方式其實不同的就是使用隊列、還是使用話題創(chuàng)建目的地不同其他的都一樣。
connectionFactory = new ActiveMQConnectionFactory("admin","admin","tcp://192.168.1.120:61616");其中第一個admin是用戶名第二個是密碼而第三個參數(shù)就是協(xié)議+ip+port(端口),這幾個參數(shù)兩個客戶端都是一樣的不然消費端就獲取不到了……
在消息消費者中我們接收消息有兩種方式即同步接收和異步接收,同步接受就是使用receive()方法來接受而異步就是設(shè)置一個監(jiān)聽對象。
?
說到密碼我們順便來看看ActiveMQ訪問密碼的設(shè)置
?
三、ActiveMQ訪問密碼設(shè)置
在ActiveMQ的conf目錄的activemq.xml中添加賬號密碼
<plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="whd" password="123" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins>?
activemq.xml中添加位置:
?
ok這樣我們對這個ActiveMQ設(shè)置了一個用戶名密碼,所以在創(chuàng)建鏈接的時候要修改admin這個默認的用戶名密碼為修改后的用戶名密碼。
這樣我們就能正常的向服務(wù)器發(fā)送消息而消費端也能從服務(wù)商消費消息了……
?
差點忘了,還有一個ActiveMQ管理頁面地址:http://127.0.0.1:8161/admin/ 訪問這個地址登陸管理頁面,默認用戶名密碼都是admin
?
github源碼地址
?
總結(jié)
以上是生活随笔為你收集整理的JMS学习二(简单的ActiveMQ实例)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。