ActiveMQ持久化到mysql
ActiveMQ持久化到mysql
?
配置
1.找到apache-activemq-5.15.2/examples/conf下面的activemq-jdbc-performance.xml
2.打開activemq-jdbc-performance.xml,在persistenceAdapter節點后面添加dataSource="#mysql-ds"
并配置你的數據庫
其實可以直接更改apache-activemq-5.15.2/conf/activemq.xml的persistenceAdapter節點.配置下數據庫也是可以的
用activemq-jdbc-performance.xml 我的理解應該是高性能模式,連都沒有(這句是添加localhost:8161的管理頁面,),并且只能用openwire傳輸協議,默認的配置文件傳輸協議是全開的,如果需要用到其他的傳輸協議可以自己在transportConnectors節點上添加
3.把activemq-jdbc-performance.xml復制到apache-activemq-5.15.2/conf目錄下,從命名為activemq.xml,覆蓋原來的activemq.xml
4.在對應的數據庫創建activemq庫,然后重啟ActiveMQ
我們這里用debug模式啟動,提示沒有mysql的jar包
5.我們在apache-activemq-5.15.2/lib下面添加mysql的jar包,再次啟動,就不會報錯了
6.這時可以看到剛才創建的activemq庫多了三張表,說明配置成功了
點對點測試
生產者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnectionFactory;public class Producer {public static void main(String[] args) { // String user = ActiveMQConnection.DEFAULT_USER; // String password = ActiveMQConnection.DEFAULT_PASSWORD; // String url = ActiveMQConnection.DEFAULT_BROKER_URL;String subject = "test.queue";ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");// ConnectionFactory contectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");try{Connection connection = contectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(subject);MessageProducer producer = session.createProducer(destination);// producer.setDeliveryMode(DeliveryMode.PERSISTENT);//設置為持久化for(int i = 0; i < 20;) {TextMessage createTextMessage = session.createTextMessage("這是要發送的第"+ ++i +"條消息消息");producer.send(createTextMessage);System.out.println("第"+ i +"條消息已發送");}Thread.sleep(2000);session.commit();session.close();connection.close();}catch (JMSException e) {// e.printStackTrace();}catch (InterruptedException e) {// e.printStackTrace();}}}消費者
import java.util.Date;import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory;public class Customer {public static void main(String[] args) {// String user = ActiveMQConnection.DEFAULT_USER; // // String password = ActiveMQConnection.DEFAULT_PASSWORD; // // String url = ActiveMQConnection.DEFAULT_BROKER_URL;String subject = "test.queue";ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.109:61616");// ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");Connection connection;try {connection= connectionFactory.createConnection();connection.start();final Session session =connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(subject);MessageConsumer message = session.createConsumer(destination);message.setMessageListener(new MessageListener() {public void onMessage(Message msg){TextMessage message = (TextMessage) msg;try {System.out.println("--收到消息:" +new Date()+message.getText());session.commit();}catch(JMSException e) {// e.printStackTrace();}}}); // Thread.sleep(30000); // // session.close(); // // Thread.sleep(30000); // // connection.close(); // // Thread.sleep(30000);}catch(Exception e) {// e.printStackTrace();}}}這時生產者生產數據,消費者一直不在線,數據就會持久化到數據庫的activemq_msgs表,就算ActiveMQ的服務掛了,再次啟動后,等消費者在線了就可以再次獲取生產者生產的數據(消費之后數據庫的數據會自動刪除)
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的ActiveMQ持久化到mysql的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: weedfs文件使用记录
- 下一篇: nacos2 Caused by: ja