消息中间体activeMQ
一、簡介
1.什么是消息中間體
消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,并基于數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。對于消息中間件,常見的角色有Producer(生產者)、Consumer(消費者)
2.什么是JMS
JMS(Java?Messaging Service)是Java平臺上有關面向消息中間件的技術規范,它便于消息系統中的Java應用程序進行消息交換,并且通過提供標準的產生、發送、接收消息的接口簡化企業應用的開發。
JMS 定義了五種不同的消息正文格式,以及調用的消息類型,允許你發送并接收以一些不同形式的數據,提供現有消息格式的一些級別的兼容性。
TextMessage--一個字符串對象 ? MapMessage--一套名稱-值對 ? ObjectMessage--一個序列化的 Java 對象 ? BytesMessage--一個字節的數據流 ? StreamMessage -- Java 原始值的數據流3.JMS消息傳遞類型
對于消息的傳遞有兩種類型:
一種是點對點的,即一個生產者和一個消費者對應
一種是發布/訂閱模式,即一個生產者產生消息并進行發送后,可以由多個消費者進行接收
4.ActiveMQ下載與安裝
(1)下載
官方網站下載:http://activemq.apache.org/download.html
(2)安裝啟動(Linux)
1.將apache-activemq-版本號-bin.tar.gz上傳至服務器
2.解壓此文件
tar ?zxvf ?apache-activemq-5.12.0-bin.tar.gz3.為apache-activemq目錄賦權
chmod 777 apache-activemq-5.12.04.啟動
bin目錄下運行: ? ./activemq start二、Spring整合JMS
1.點對點模式
(1)消息生產者
①創建工程springjms_product
②導入pom依賴
? ? <properties><spring.version>4.2.4.RELEASE</spring.version></properties> ?<dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>${spring.version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-test</artifactId><version>${spring.version}</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.9</version></dependency><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.13.4</version></dependency></dependencies>③創建spring配置文件
applicationContext-jms-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context ? http://www.springframework.org/schema/context/spring-context.xsd"> ? ?<context:component-scan base-package="com.yfy"></context:component-scan> ? ?<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--><bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.25.132:61616"/></bean> ?<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --><bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --><property name="targetConnectionFactory" ref="targetConnectionFactory"/></bean> ?<!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --><property name="connectionFactory" ref="connectionFactory"/></bean><!--這個是隊列目的地,點對點的 文本信息--><bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg value="queue_text"/></bean> ? ? </beans>④創建生產者類
QueueProduct.java
@Component public class QueueProduct { ?@Autowiredprivate JmsTemplate jmsTemplate; ?@Autowiredprivate Destination queueTextDestination; ?public void sentTextMessag(final String message) {jmsTemplate.send(queueTextDestination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});} }⑤創建測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-jms-producer.xml") public class Test { ?@Autowiredprivate QueueProduct queueProduct; ?@org.junit.Testpublic void testSendQueue() {queueProduct.sentTextMessag("hello queue");} }(2)消息消費者
①創建工程springjms_consumer
②導入pom依賴,和消息生產者一樣
③創建spring配置文件
applicationContext-jms-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:jms="http://www.springframework.org/schema/jms"xsi:schemaLocation="http://www.springframework.org/schema/beans ? http://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/context ? http://www.springframework.org/schema/context/spring-context.xsd"><!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> ?<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> ?<property name="brokerURL" value="tcp://192.168.25.132:61616"/></bean><!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> ?<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> ?<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> ?<property name="targetConnectionFactory" ref="targetConnectionFactory"/> ?</bean> ?<!--這個是隊列目的地,點對點的 文本信息--> ?<bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> ?<constructor-arg value="queue_text"/> ?</bean> ?<!--這個是訂閱模式 文本信息--><bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="topic_text"/></bean><!-- 我的監聽類 --><bean id="myMessageListener" class="com.yfy.MyMessageListener"></bean><!-- 消息監聽容器:隊列 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="queueTextDestination" /><property name="messageListener" ref="myMessageListener" /></bean> ? </beans>④創建監聽類
MyMessageListener.java
public class MyMessageListener implements MessageListener { ?@Overridepublic void onMessage(Message message) {TextMessage testMessage=(TextMessage)message;try {String text = testMessage.getText();System.out.println(text);} catch (JMSException e) {e.printStackTrace();}} }⑤創建測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-jms-consumer.xml") public class Test { ?@org.junit.Testpublic void testReceive(){try {System.in.read();} catch (IOException e) {e.printStackTrace();}} }2.發布/訂閱模式
(1)消息生產者
在product配置文件中加入
<!--這個是訂閱模式 文本信息--><bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg value="topic_text"/></bean>TopicProduct.java
@Component public class TopicProduct { ?@Autowiredprivate JmsTemplate jmsTemplate; ?@Autowiredprivate Destination topicTextDestination; ?public void sendMessage(String message) {jmsTemplate.send(topicTextDestination, new MessageCreator() {@Overridepublic Message createMessage(Session session) throws JMSException {return session.createTextMessage(message);}});} }創建測試類
@RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("classpath:applicationContext-jms-producer.xml") public class Test { ? ?@Autowiredprivate TopicProduct topicProduct; ?@org.junit.Testpublic void TestSendTopic() {topicProduct.sendMessage("hello topic");} }(2)消息消費者
在consumer配置文件中加入
<!-- 我的監聽類 --><bean id="myMessage2Listener" class="com.yfy.MyMessage2Listener"></bean><!-- 消息監聽容器:訂閱模式 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory" /><property name="destination" ref="topicTextDestination" /><property name="messageListener" ref="myMessageListener" /></bean>創建監聽類
MyMessage2Listener.java
public class MyMessage2Listener implements MessageListener { ?@Overridepublic void onMessage(Message message) {TextMessage textMessage=(TextMessage)message;try {String text = textMessage.getText();System.out.println(text);} catch (JMSException e) {e.printStackTrace();}} }測試:同時運行三個消費者工程,在運行生產者工程,查看消費者工程的控制臺輸出
總結
以上是生活随笔為你收集整理的消息中间体activeMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式文件服务器FastDFS
- 下一篇: Spring任务调度之Spring-Ta