javascript
Spring整合ActiveMQ完成消息队列MQ编程
<–start–>
第一步:新建一個maven,將工程命名為activeMQ_spring。在pom.xml文件中導入相關jar包。
①spring開發和測試相關的jar包:
spring-context包
spring-test包
juint包
②ActiveMQ的jar包
activemq-all包
③spring整合activeMQ的jar包
<dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.1.7.RELEASE</version> </dependency>完整的pom.xml文件代碼如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.niwotaxuexiba.maven</groupId><artifactId>activeMQ_spring</artifactId><version>0.0.1-SNAPSHOT</version><name>activeMQ_spring</name><dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>4.1.7.RELEASE</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>4.1.7.RELEASE</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.14.0</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.1.7.RELEASE</version></dependency></dependencies> </project>第二步:編寫配置生產者:
①配置activeMQ連接工廠。
②配置spring mq 管理工廠
<!-- Spring Caching連接工廠 --><!-- Spring用于管理真正的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property><!-- 同上,同理 --><!-- <constructor-arg ref="amqConnectionFactory" /> --><!-- Session緩存數量 --><property name="sessionCacheSize" value="100" /></bean>③配置jmsTemplate模板
<!-- Spring JmsTemplate 的消息生產者 start--><!-- 定義JmsTemplate的Queue類型 --><bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /><!-- 非pub/sub模型(發布/訂閱),即隊列模式 --><property name="pubSubDomain" value="false" /></bean><!-- 定義JmsTemplate的Topic類型 --><bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /><!-- pub/sub模型(發布/訂閱) --><property name="pubSubDomain" value="true" /></bean><!--Spring JmsTemplate 的消息生產者 end-->完整的applicationContext-mq.xml文件代碼:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:context="http://www.springframework.org/schema/context"xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsdhttp://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsdhttp://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd "><!-- 掃描包 --><context:component-scan base-package="cn.niwotaxuexiba.activemq" /><!-- ActiveMQ 連接工廠 --><!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--><!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼--><amq:connectionFactory id="amqConnectionFactory"brokerURL="tcp://localhost:61616" userName="admin" password="admin" /><!-- Spring Caching連接工廠 --><!-- Spring用于管理真正的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property><!-- 同上,同理 --><!-- <constructor-arg ref="amqConnectionFactory" /> --><!-- Session緩存數量 --><property name="sessionCacheSize" value="100" /></bean><!-- Spring JmsTemplate 的消息生產者 start--><!-- 定義JmsTemplate的Queue類型 --><bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /><!-- 非pub/sub模型(發布/訂閱),即隊列模式 --><property name="pubSubDomain" value="false" /></bean><!-- 定義JmsTemplate的Topic類型 --><bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> <constructor-arg ref="connectionFactory" /><!-- pub/sub模型(發布/訂閱) --><property name="pubSubDomain" value="true" /></bean><!--Spring JmsTemplate 的消息生產者 end--></beans>創建一個QueueSender類,生產queue隊列信息:
@Service public class QueueSender {// 注入jmsTemplate@Autowired@Qualifier("jmsQueueTemplate")private JmsTemplate jmsTemplate;public void send(String queueName, final String message) {jmsTemplate.send(queueName, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});} }生成topic話題信息:
@Service public class TopicSender {// 注入jmsTemplate@Autowired@Qualifier("jmsTopicTemplate")private JmsTemplate jmsTemplate;public void send(String topicName, final String message) {jmsTemplate.send(topicName, new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});} }測試代碼:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq.xml") public class ProducerTest {@Autowiredprivate QueueSender queueSender;@Autowiredprivate TopicSender topicSender;@Testpublic void testSendMessage() {queueSender.send("spring_queue", "你好,你我他");topicSender.send("spring_topic", "你好,學習吧");} }編寫消費者代碼:
在applicationContext-mq-customer.xml中配置只掃描customer包:
配置Listener監聽器,在applicationContext-mq-customer.xml中進行配置。
<!-- 消息消費者 start--><!-- 定義Queue監聽器 --><jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"><!-- 默認注冊bean名稱,應該是類名首字母小寫 --><jms:listener destination="spring_queue" ref="queueConsumer1"/><jms:listener destination="spring_queue" ref="queueConsumer2"/></jms:listener-container><!-- 定義Topic監聽器 --><jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"><jms:listener destination="spring_topic" ref="topicConsumer1"/><jms:listener destination="spring_topic" ref="topicConsumer2"/></jms:listener-container><!-- 消息消費者 end -->完整的applicationContext-mq-customer.xml代碼:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:context="http://www.springframework.org/schema/context"xmlns:jdbc="http://www.springframework.org/schema/jdbc" xmlns:tx="http://www.springframework.org/schema/tx"xmlns:jpa="http://www.springframework.org/schema/data/jpa" xmlns:task="http://www.springframework.org/schema/task"xmlns:amq="http://activemq.apache.org/schema/core"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsdhttp://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsdhttp://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsdhttp://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.1.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsdhttp://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsdhttp://www.springframework.org/schema/jmshttp://www.springframework.org/schema/jms/spring-jms.xsdhttp://activemq.apache.org/schema/corehttp://activemq.apache.org/schema/core/activemq-core-5.8.0.xsd "><!-- 掃描包 --><context:component-scan base-package="cn.niwotaxuexiba.activemq.consumer" /><!-- ActiveMQ 連接工廠 --><!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--><!-- 如果連接網絡:tcp://ip:61616;未連接網絡:tcp://localhost:61616 以及用戶名,密碼--><amq:connectionFactory id="amqConnectionFactory"brokerURL="tcp://localhost:61616" userName="admin" password="admin" /><!-- Spring Caching連接工廠 --><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="amqConnectionFactory"></property><!-- 同上,同理 --><!-- <constructor-arg ref="amqConnectionFactory" /> --><!-- Session緩存數量 --><property name="sessionCacheSize" value="100" /></bean><!-- 消息消費者 start--><!-- 定義Queue監聽器 --><jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto"><!-- 默認注冊bean名稱,應該是類名首字母小寫 --><jms:listener destination="spring_queue" ref="queueConsumer1"/><jms:listener destination="spring_queue" ref="queueConsumer2"/></jms:listener-container><!-- 定義Topic監聽器 --><jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto"><jms:listener destination="spring_topic" ref="topicConsumer1"/><jms:listener destination="spring_topic" ref="topicConsumer2"/></jms:listener-container><!-- 消息消費者 end --></beans>編寫消息消費者QueueCustomer1類,實現MessageListener接口:
@Service public class QueueConsumer1 implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消費者QueueConsumer1獲取消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}} }創建另一個消息消費者QueueCustomer2類,實現MessageListener接口。
@Service public class QueueConsumer2 implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消費者QueueConsumer2獲取消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}} }創建TopicCustomer1類,實現MessageListener接口。
@Service public class TopicConsumer1 implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消費者TopicConsumer1獲取消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}} }創建TopicCustomer2類,實現MessageListener接口。
@Service public class TopicConsumer2 implements MessageListener {public void onMessage(Message message) {TextMessage textMessage = (TextMessage) message;try {System.out.println("消費者TopicConsumer2獲取消息:" + textMessage.getText());} catch (JMSException e) {e.printStackTrace();}} }代碼測試:
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:applicationContext-mq-consumer.xml") public class ConsumerTest {@Testpublic void testConsumerMessage() {while (true) {// junit退出,防止進程死掉}} }<–end–>
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的Spring整合ActiveMQ完成消息队列MQ编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JAX-RS客户端WebClient的使
- 下一篇: WebService实例-CRM系统提供