JMS学习九 ActiveMQ的消息持久化到Mysql数据库
1、將連接Mysql數(shù)據(jù)庫(kù)驅(qū)動(dòng)包,放到ActiveMQ的lib目錄下
2,修改ActiveMQ的conf目錄下的active.xml文件,修改數(shù)據(jù)持久化的方式
? ? ? ? ?2.1 ?修改原來(lái)的kshadb的持久化數(shù)據(jù)的方式
?
? ? ? ? ? 2.2 ?連接Mysql的配置
3、將數(shù)據(jù)持久化Mysql的運(yùn)行截圖
? ? ? 3.1 ?重新啟動(dòng)ActiveMQ,并運(yùn)行程序,放入持久化數(shù)據(jù),查看Mysql的active數(shù)據(jù)庫(kù)
?
4,數(shù)據(jù)持久化代碼
package test.mq.helloworld;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
?? ?
?? ? //默認(rèn)連接用戶名
? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
? ? //默認(rèn)連接密碼
? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
? ? //默認(rèn)連接地址
? ? private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
?? ?
?? ?//發(fā)送的消息數(shù)量
? ? private static final int SENDNUM = 10;
?? ?public static void main(String[] args) throws Exception {
?? ??? ?/*ActiveMQConnectionFactory activeMQConnectionFactory =?
?? ??? ??? ??? ?new ActiveMQConnectionFactory(
?? ??? ??? ??? ?ActiveMQConnection.DEFAULT_USER,?
?? ??? ??? ??? ?ActiveMQConnection.DEFAULT_PASSWORD,?
?? ??? ??? ??? ?"tcp://localhost:61616");*/
?? ??? ?/**
?? ??? ? * activemq.xml ?配置密碼
?? ??? ? */
?? ??? ?ActiveMQConnectionFactory activeMQConnectionFactory =?
?? ??? ??? ??? ?new ActiveMQConnectionFactory(
?? ??? ??? ??? ?"bhz",?
?? ??? ??? ??? ?"bhz",?
?? ??? ??? ??? ?"tcp://localhost:61616");
?? ??? ?//連接
? ? ? ? Connection connection = null;
?? ??? ?
?? ??? ?try {
?? ??? ??? ? connection = activeMQConnectionFactory.createConnection();
?? ??? ??? ? connection.start();
?? ??? ??? ?
?? ??? ??? ?//創(chuàng)建session
?? ??? ?//?? ?Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
?? ??? ??? ? //開(kāi)啟事物
?? ??? ?//?? ?Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
?? ??? ??? ? //開(kāi)啟事物 并且使用Client的方式
?? ??? ??? ? /**
?? ??? ??? ? ?* ?3、通過(guò)Connection對(duì)象創(chuàng)建Session會(huì)話(上下文環(huán)境對(duì)象), ?
? ? ? ? ??? ??? ? ? ? 參數(shù)一,表示是否開(kāi)啟事務(wù) ?
? ? ? ? ??? ??? ? ? ? 參數(shù)二,表示的是簽收模式,一般使用的有自動(dòng)簽收和客戶端自己確認(rèn)簽收 ?
??
? ? ? ? ??? ??? ? ? ? 第一個(gè)參數(shù)設(shè)置為true,表示開(kāi)啟事務(wù) ?
? ? ? ? ??? ??? ? ? ? 開(kāi)啟事務(wù)后,記得要手動(dòng)提交事務(wù)?
?? ??? ??? ? ?*/
?? ??? ??? ?Session session = connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
?? ??? ??? ?
?? ??? ??? ?// 4、通過(guò)Session創(chuàng)建Destination對(duì)象,指的是一個(gè)客戶端用來(lái)指定生產(chǎn)消息目標(biāo)和消費(fèi)消息來(lái)源的對(duì)象。 ?
?? ? ? ? ? ?// 在PTP模式中,Destination指的是Queue ?
?? ? ? ? ? ?// 在發(fā)布訂閱模式中,Destination指的是Topic ?
?? ??? ??? ?//消息的目的地
?? ??? ??? ?Destination destination = session.createQueue("queue1");
?? ??? ??? ?//創(chuàng)建消息生產(chǎn)者
?? ??? ??? ?// 5、使用Session來(lái)創(chuàng)建消息對(duì)象的生產(chǎn)者或者消費(fèi)者 ?
?? ??? ??? ?MessageProducer messageProducer = session.createProducer(destination);
?? ??? ??? ?//PERSISTENT 用來(lái)指定JMS Provider對(duì)消息進(jìn)行持久化操作,以免Provider fail的時(shí)候,丟失Message
?? ??? ??? ?//NON_Persistent 方式下的JMS Provider不會(huì)對(duì)消進(jìn)行持久化
?? ??? ??? ?messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
?? ??? ??? ?//發(fā)送消息
?? ??? ?//?? ?sendMessage(session, messageProducer);
?? ??? ??? ?for (int j = 0; j < 10; j++) {
?? ??? ??? ??? ?TextMessage textMessage = session.createTextMessage();
?? ? ? ? ? ??? ?textMessage.setText("我的消息內(nèi)容,id為"+j);
?? ? ? ? ? ??? ?System.out.println("生產(chǎn)者: "+textMessage.getText());
?? ??? ??? ??? ?messageProducer.send(destination,textMessage,DeliveryMode.PERSISTENT,j,60*1000);?? ?
?? ??? ??? ?//?? ?System.out.println("生產(chǎn)者: "+textMessage.getText());
?? ??? ??? ?}
?? ??? ??? ?
?? ??? ??? ?//使用事物?? ?Boolean.TRUE?? ?
?? ??? ?//?? ?session.commit();
?? ??? ?} catch (Exception e) {
?? ??? ??? ?e.printStackTrace();
?? ??? ?}finally{
? ? ? ? ? ? if(connection != null){
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? connection.close();
? ? ? ? ? ? ? ? } catch (JMSException e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
?? ?}
?? ?
?? ?
?? ? /**
? ? ?* 發(fā)送消息
? ? ?* @param session
? ? ?* @param messageProducer ?消息生產(chǎn)者
? ? ?* @throws Exception
? ? ?*/
? ? public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
? ? ? ? for (int i = 0; i < Sender.SENDNUM; i++) {
? ? ? ? ? ? //創(chuàng)建一條文本消息?
? ? ? ? ?? ?TextMessage textMessage = session.createTextMessage();
? ? ? ? ?? ?textMessage.setText("我的消息內(nèi)容,id為"+i);
? ? ? ? ?? ?messageProducer.send(textMessage);
? ? ? ? ?? ?System.out.println("生產(chǎn)者: "+textMessage.getText());
? ? ? ? ?// ? TextMessage message = session.createTextMessage("ActiveMQ 發(fā)送消息" +i);
? ? ? ? ?// ? System.out.println("生產(chǎn)者發(fā)送消息:Activemq 發(fā)送消息" + i);
? ? ? ? ? ? //通過(guò)消息生產(chǎn)者發(fā)出消息?
? ? ? ? ?// ? messageProducer.send(message);
? ? ? ? }
? ? }
}
?
總結(jié)
以上是生活随笔為你收集整理的JMS学习九 ActiveMQ的消息持久化到Mysql数据库的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 研发工作中芯片选型需要考虑的问题
- 下一篇: linux下出现ping:unknown