java activeMQ消息的發送與接收
activemq是我們經常用到的消息隊列之一,比如說速度快,對spring的很好的支持,支持多種協議等等,今天我們就來看一下activeMQ消息的發送與接收的源代碼。
我這里使用了兩個配置文件,其實在一個配置文件里面就可以完成發送與接收功能,但是為了方便觀察日志,我就使用了兩個配置文件。在使用代碼之前需要搭建好activeMQ消息隊列環境。
一、代碼目錄結構
所建立的工程是maven工程,代碼結構如圖所示:
一、maven配置pom.xml
4.0.0
Test
Test
0.0.1-SNAPSHOT
<dependencies><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.json</groupId><artifactId>org.json</artifactId><version>chargebee-1.0</version>
</dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><dependency><groupId>com.cloudhopper.proxool</groupId><artifactId>proxool</artifactId><version>0.9.1</version><exclusions><exclusion><artifactId>log4j</artifactId><groupId>log4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.11.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jdbc</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>org.apache.cxf</groupId><artifactId>cxf-rt-frontend-jaxws</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.cxf</groupId><artifactId>cxf-rt-transports-http</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.cxf</groupId><artifactId>cxf-rt-transports-http-jetty</artifactId><version>2.4.5</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.5</version></dependency><dependency><groupId>com.h2database</groupId><artifactId>h2</artifactId><version>1.3.152</version></dependency><dependency><groupId>org.apache.httpcomponents</groupId><artifactId>httpclient</artifactId><version>4.1.1</version></dependency><dependency><groupId>dom4j</groupId><artifactId>dom4j</artifactId><version>1.6.1</version></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId><version>1.3</version></dependency><dependency><groupId>org.freemarker</groupId><artifactId>freemarker</artifactId><version>2.3.16</version></dependency><dependency><groupId>jaxen</groupId><artifactId>jaxen</artifactId><version>1.1.1</version></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>xqj</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>xpath</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>xom</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>sql</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>s9api</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>jdom</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>dom4j</classifier></dependency><dependency><groupId>net.sourceforge.saxon</groupId><artifactId>saxon</artifactId><version>9.1.0.8</version><classifier>dom</classifier></dependency><dependency><groupId>xom</groupId><artifactId>xom</artifactId><version>1.1</version></dependency><dependency><groupId>stax</groupId><artifactId>stax</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-core</artifactId><version>5.3.0.Final</version></dependency><dependency><groupId>org.drools</groupId><artifactId>drools-compiler</artifactId><version>5.3.0.Final</version></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.2</version></dependency><dependency><groupId>javax.xml.bind</groupId><artifactId>jsr173_api</artifactId><version>1.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.1.4.RELEASE</version></dependency><dependency><groupId>corba</groupId><artifactId>corba-connect</artifactId><version>0.0.2</version></dependency><dependency><groupId>com.ustcinfo.inm.data</groupId><artifactId>inm-data-spi</artifactId><version>0.0.1-SNAPSHOT</version></dependency><dependency><groupId>com.taobao.metamorphosis</groupId><artifactId>metamorphosis-client</artifactId><version>1.4.4</version></dependency><dependency><groupId>com.googlecode.aviator</groupId><artifactId>aviator</artifactId><version>2.2.1</version></dependency><dependency><groupId>com.jcraft</groupId><artifactId>jsch</artifactId><version>0.1.50</version></dependency>
</dependencies>
<build><sourceDirectory>src</sourceDirectory><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.3</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin></plugins>
</build>
————————————————
二、LoadUtil.java加載類
package aaa.activemq.load;
import javax.jms.Destination;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jms.core.JmsTemplate;
/**
- 加載配置文件工具類
- @author Administrator
*/
public class LoadUtil {
ApplicationContext applicationContext;
JmsTemplate template;//jsm對象,可以發送和消費消息
Destination destination;//隊列名稱/*** 加載方法* @param path 文件路徑名稱*/
public void load(String path){applicationContext = new ClassPathXmlApplicationContext(path);template = (JmsTemplate) applicationContext.getBean("jmsTemplate");destination = (Destination) applicationContext.getBean("queueDestination");this.setApplicationContext(applicationContext);this.setDestination(destination);this.setDestination(destination);
}public ApplicationContext getApplicationContext() {return applicationContext;
}public void setApplicationContext(ApplicationContext applicationContext) {this.applicationContext = applicationContext;
}public JmsTemplate getTemplate() {return template;
}public void setTemplate(JmsTemplate template) {this.template = template;
}public Destination getDestination() {return destination;
}public void setDestination(Destination destination) {this.destination = destination;
}
}
三、QueueProducer.java發送消息到activeMQ類
package aaa.avtivemq.sendmessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import aaa.activemq.load.LoadUtil;
public class QueueProducer{
/**
* 發送消息到activemq的實現方法
* @param msg //發送消息的內容,為字符串類型
*/
public void sendMessage(String msg) {
LoadUtil lu = new LoadUtil();
lu.load(“activemq-config.xml”);
JmsTemplate template = lu.getTemplate();
String destination = template.getDefaultDestination().toString();
System.out.println(“向隊列” + destination + “發送了消息:” + msg);
template.send(new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}
/*** 主方法* @param args*/
public static void main(String[] args) {QueueProducer producer = new QueueProducer();producer.sendMessage("zcinfo_test");
}
}
四、activemq-config.xml配置文件
?xml version=“1.0” encoding=“UTF-8”?>
<!--配置連接工廠地址-->
<bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.0.81.80:8765"></property><!-- <property name="brokerURL" value="failover:(tcp://tcp://192.0.81.83:8765,tcp://tcp://192.0.81.84:8765,tcp://tcp://192.0.81.85:8765)"></property> -->
</bean>
<!--配置隊列名稱 -->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>UEAP_TO_GZZC_QUEUE_TEST</value></constructor-arg>
</bean>
<!-- 配置JMS模版,這是spring提供的 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="activeMQConnectionFactory"></property><property name="defaultDestination" ref="queueDestination"></property><property name="receiveTimeout" value="10000"></property>
</bean>
五、ActivemqConsumer.java消費類 package www.activemq.receivemessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import www.activemq.load.LoadUtil;
public class ActivemqConsumer implements MessageListener{
@Override
public void onMessage(Message message) {TextMessage tm = (TextMessage)message;System.out.println("監聽到MQ中有數據......");try {System.out.println("獲取MQ中數據信息>>>>>>>>>>" + tm.getText());}catch (Exception e) {e.printStackTrace();}
}/*** 加載配置文件之后監聽器會自動調用onMessage方法,并且保持服務一直開啟很實用* @param args*/
public static void main(String[] args) {new LoadUtil().load("activemq-context.xml");
}
}
六、activemq-context.xml配置文件
<?xml version="1.0" encoding="UTF-8"?>
<context:component-scan base-package="com.starit.analyse" /><!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --><property name="connectionFactory" ref="connectionFactory"/>
</bean><!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><!-- <property name="brokerURL" value="failover:(tcp://192.0.81.83:8765:8400,tcp://192.0.81.84:8765,tcp://192.0.81.85:8765)"/> --><property name="brokerURL" value="failover:(tcp://192.0.81.80:8765)?jms.prefetchPolicy.all=1"/>
</bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --><property name="targetConnectionFactory" ref="targetConnectionFactory"/>
</bean><!--這個是隊列目的地-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg><value>UEAP_TO_GZZC_QUEUE_TEST</value></constructor-arg>
</bean><!-- 消息監聽器 加載此處會自動調用監聽方法并且一直保持服務開啟,很實用 -->
<bean id="consumerMessageListener" class="www.activemq.receivemessage.ActivemqConsumer"/>
<!-- 消息監聽容器 -->
<bean id="jmsContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueDestination" /><property name="messageListener" ref="consumerMessageListener" /><property name="concurrentConsumers" value="1"/>
</bean>
七、測試結果 (1)發送消息
(2)activemq顯示收到消息
(3)接收消息,服務一直開啟接收發送者消息
總結
以上是生活随笔為你收集整理的java activeMQ消息的发送与接收的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。