arouter跨module传递消息_消息队列中间件(二)使用 ActiveMQ
ActiveMQ 介紹
Active MQ 是由 Apache 出品的一款流行的功能強大的開源消息中間件,它速度快,支持跨語言的客戶端,具有易于使用的企業集成模式和許多的高級功能,同時完全支持?JSM1.1?和 J2EE1.4 。
官方下載地址:
?http://activemq.apache.org/download.html??
官方安裝教程:
?http://activemq.apache.org/getting-started.html??
默認管理頁面:
http://127.0.0.1:8161/admin/??
默認用戶名和密碼為
admin / admin。
conf / jetty-real.properties 您可以在此文件中進行配置。??
默認服務端口:61616??
ActiveMQ 特點
支持Java,C,C ++,C#,Ruby,Perl,Python,PHP等各種跨語言客戶端和協議,如 OpenWire , Stomp , AMQP , MQTT.
完全支持JMS 1.1和 J2EE 1.4,支持瞬態,持久,事務和XA消息傳遞。
對 Spring 框架的支持以便ActiveMQ可以輕松嵌入到Spring應用程序中。
通過了常見的 J2EE 服務器測試,如 TomEE,Geronimo,JBoss,GlassFish 和 WebLogic 。
連接方式的多樣化,ActiveMQ 提供了多種連接模式,例如 in-VM、TCP、SSL、NIO、UDP、多播、JGroups、JXTA。
可以通過使用 JDBC 和 journal 實現消息的快速持久化。
專為高性能群集,客戶端 - 服務器,點對點通信而設計。
提供與語言無關的 REST API。
支持 Ajax 方式調用 ActiveMQ。
ActiveMQ 可以輕松地與 CXF、Axis 等 Web Service 技術整合,以提供可靠的消息傳遞。
可用作為內存中的 JMS 提供者,非常適合 JMS 單元測試。
ActiveMQ 消息
點對點隊列模式
消息到達消息系統,被保留在消息隊列中,然后由一個或者多個消費者消費隊列中的消息,一個消息只能被一個消費者消費,然后就會被移除。例如訂單處理系統。
發布-訂閱模式
消息發送時指定主題(或者說通道),消息被保留在指定的主題中,消費者可以訂閱多個主題,并使用主題中的所有的消息,例如現實中的電視與電視頻道。所有客戶端包括發布者和訂閱者,主題中的消息可以被所有的訂閱者消費,消費者只能消費訂閱之后發送到主題中的消息。
ActiveMQ 概念
Broker,消息代理,表示消息隊列服務器實體,接受客戶端連接,提供消息通信的核心服務。
Producer,消息生產者,業務的發起方,負責生產消息并傳輸給 Broker 。
Consumer,消息消費者,業務的處理方,負責從 Broker 獲取消息并進行業務邏輯處理。
Topic,主題,發布訂閱模式下的消息統一匯集地,不同生產者向 Topic 發送消息,由 Broker 分發到不同的訂閱者,實現消息的廣播。
Queue,隊列,點對點模式下特定生產者向特定隊列發送消息,消費者訂閱特定隊列接收消息并進行業務邏輯處理。
Message,消息體,根據不同通信協議定義的固定格式進行編碼的數據包,來封裝業務 數據,實現消息的傳輸。
ActiveMQ 工程實例
下面是使用 ActiveMQ 的隊列模式和發布-訂閱模式的 Java 代碼示例。
POM 依賴
? ? ? ? ? ? ? ?<dependency> ? ? ? ? ? ?<groupId>org.apache.activemqgroupId> ? ? ? ? ? ?<artifactId>activemq-allartifactId> ? ? ? ? ? ?<version>5.15.5version> ? ? ? ?dependency>隊列模式消費者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息消費者,用于消費消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:45
?*/
public?class?AppConsumer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建消費者
????????MessageConsumer?consumer?=?session.createConsumer(destination);
????????//?7.創建一個監聽器
????????consumer.setMessageListener(new?MessageListener()?{
????????????public?void?onMessage(Message?message)?{
????????????????TextMessage?textMessage?=?(TextMessage)?message;
????????????????try?{
????????????????????System.out.println("接收消息:"?+?textMessage.getText());
????????????????}?catch?(JMSException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????????//?8.關閉連接
????????//connection.close();
????}
}
隊列模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
隊列模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?queueName?=?"queue-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createQueue(queueName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
發布訂閱模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?主題模式
?*?消息消費者,用于消費消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:45
?*/
public?class?AppConsumer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?topicName?=?"topic-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createTopic(topicName);
????????//?6.創建消費者
????????MessageConsumer?consumer?=?session.createConsumer(destination);
????????//?7.創建一個監聽器
????????consumer.setMessageListener(new?MessageListener()?{
????????????public?void?onMessage(Message?message)?{
????????????????TextMessage?textMessage?=?(TextMessage)?message;
????????????????try?{
????????????????????System.out.println("接收消息:"?+?textMessage.getText());
????????????????}?catch?(JMSException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
????????//?8.關閉連接
????????//connection.close();
????}
}
發布訂閱模式生產者
import?org.apache.activemq.ActiveMQConnectionFactory;import?javax.jms.*;
/**
?*?
?*?主題模式
?*?消息提供者,用于向消息中間件發送消息
?*
?*?@Author?niujinpeng
?*?@Date?2018/9/4?23:28
?*/
public?class?AppProducer?{
????private?static?final?String?url?=?"tcp://127.0.0.1:61616";
????private?static?final?String?topicName?=?"topic-test";
????public?static?void?main(String[]?args)?throws?JMSException?{
????????//?1.創建ConnectionFactory
????????ActiveMQConnectionFactory?connectionFactory?=?new?ActiveMQConnectionFactory();
????????//?2.創建Connection
????????Connection?connection?=?connectionFactory.createConnection();
????????//?3.啟動連接
????????connection.start();
????????//?4.創建會話,false,不使用事務,自動應答模式
????????Session?session?=?connection.createSession(false,?Session.AUTO_ACKNOWLEDGE);
????????//?5.創建一個目標
????????Destination?destination?=?session.createTopic(topicName);
????????//?6.創建生產者
????????MessageProducer?producer?=?session.createProducer(destination);
????????//?7.創建消息并發送
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?創建消息
????????????TextMessage?textMessage?=?session.createTextMessage("textMessage"?+?i);
????????????//?發布消息
????????????producer.send(textMessage);
????????????System.out.println("發送消息:"?+?textMessage.getText());
????????}
????????//?8.關閉連接
????????connection.close();
????}
}
GitHub源碼:
https://github.com/niumoo/message-queue
Spring 整合 ActiveMQ
在 Spring 中配置 Active MQ 就像Spring 整合其他功能一樣,我們需要在 XML 配置中配置幾個關鍵的實例即可。在 Active MQ 中有幾個對象的實例是至關重要的,如 Active MQ jms 連接工廠,為了減少連接斷開性能時間消耗的 jms 連接池以及生產者消費者等。
下面是一些詳細說明。
ConnectionFactory 用于管理連接的連接工廠(Spring提供)。
一個 Spring 為我們提供的連接池。
JmsTemplate 每次發送都會重新創建連接,會話和 Productor。
Spring 中提供了SingleConnectionFactory 和CachingConnectionFactory(增加了緩存功能)。
JmsTemplate 是用于發送和接收消息的模板類。
是spring提供的,只需要向Spring 容器內注冊這個類就可以使用 JmsTemplate 方便的操作jms。
JmsTemplate 類是線程安全的,可以在整個應用范圍使用。
MessageListerner 消息監聽器
使用一個onMessage方法,該方法只接收一個Message參數。
POM 依賴
<properties>????????<spring.version>5.0.4.RELEASEspring.version>
????properties>
????<dependencies>
????????<dependency>
????????????<groupId>junitgroupId>
????????????<artifactId>junitartifactId>
????????????<version>4.11version>
????????????<scope>testscope>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-contextartifactId>
????????????<version>${spring.version}version>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-jmsartifactId>
????????????<version>5.1.1.RELEASEversion>
????????dependency>
????????<dependency>
????????????<groupId>org.springframeworkgroupId>
????????????<artifactId>spring-testartifactId>
????????????<version>${spring.version}version>
????????dependency>
????????
????????<dependency>
????????????<groupId>javax.jmsgroupId>
????????????<artifactId>javax.jms-apiartifactId>
????????????<version>2.0.1version>
????????dependency>
????????<dependency>
????????????<groupId>org.apache.activemqgroupId>
????????????<artifactId>activemq-coreartifactId>
????????????<version>5.7.0version>
????????????<exclusions>
????????????????<exclusion>
????????????????????<artifactId>spring-contextartifactId>
????????????????????<groupId>org.springframeworkgroupId>
????????????????exclusion>
????????????????<exclusion>
????????????????????<groupId>org.apache.geronimo.specsgroupId>
????????????????????<artifactId>geronimo-jms_1.1_specartifactId>
????????????????exclusion>
????????????exclusions>
????????dependency>
????dependencies>
XML 配置
XML 公共配置
為了份文件配置方便管理,下面是提取出來的公共配置,為了在獨立配置生產者和消費者 XML文件時引入,當然也可以直接把生產者和消費者以及所有的 XML bean 配置在一個文件里。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????<content:annotation-config/>
????
????<bean?id="targerConnectionFactory"?class="org.apache.activemq.ActiveMQConnectionFactory">
????????<property?name="brokerURL"?value="tcp://127.0.0.1:61616"/>
????bean>
????
????<bean?id="connectionFactory"?class="org.springframework.jms.connection.SingleConnectionFactory">
????????<property?name="targetConnectionFactory"?ref="targerConnectionFactory"/>
????bean>
????
????<bean?id="queueDestination"?class="org.apache.activemq.command.ActiveMQQueue">
????????<constructor-arg?value="queue-spring"/>
????bean>
????
????<bean?id="topicDestination"?class="org.apache.activemq.command.ActiveMQTopic">
????????<constructor-arg?value="topic-spring"/>
????bean>
beans>
XML 消費者
消費者主要是一個消息監聽器,監聽指定的隊列或者主題的消息信息,來有消息時調用回調監聽處理方法。這里我注釋掉了監聽的隊列模式,指定了主題模式。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????
????<import?resource="common.xml"/>
????
????<bean?id="consumerMessageListener"?class="net.codingme.jms.consumer.ConsumerMessageListener"/>
????
????<bean?id="jmsContainer"?class="org.springframework.jms.listener.DefaultMessageListenerContainer">
????????<property?name="connectionFactory"?ref="connectionFactory"/>
????????
????????
????????
????????<property?name="destination"?ref="topicDestination"/>
????????<property?name="messageListener"?ref="consumerMessageListener"/>
????bean>
beans>
XML 生產者
生成者的配置主要是使用 spring jms 模版對象,創建生產者實例用于生產消息。
<?xml ?version="1.0"?encoding="UTF-8"?><beans?xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:content="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd?http://www.springframework.org/schema/context?http://www.springframework.org/schema/context/spring-context.xsd">
????
????<import?resource="common.xml"/>
????
????<bean?id="jmsTemplate"?class="org.springframework.jms.core.JmsTemplate">
????????<property?name="connectionFactory"?ref="connectionFactory"/>
????bean>
????<bean?class="net.codingme.jms.producer.ProducerServiceImpl">bean>
beans>
生產者編寫
1. 定義接口
package?net.codingme.jms.producer;/**
?*?
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/2518:19
?*/
public?interface?ProducerService?{
????public?void?sendMessage(String?message);
}
2. 主題模式生產者
package?net.codingme.jms.producer;import?org.springframework.beans.factory.annotation.Autowired;
import?org.springframework.jms.core.JmsTemplate;
import?org.springframework.jms.core.MessageCreator;
import?javax.annotation.Resource;
import?javax.jms.*;
/**
?*?
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?19:24
?*/
public?class?ProducerServiceImpl?implements?ProducerService?{
????@Autowired
????JmsTemplate?jmsTemplate;
????/**
?????*?主題模式
?????*/
????@Resource(name?=?"topicDestination")
????Destination?destination;
????@Override
????public?void?sendMessage(String?message)?{
????????//?使用jmsTemplate發送消息
????????jmsTemplate.send(destination,?new?MessageCreator()?{
????????????//?創建消息
????????????@Override
????????????public?Message?createMessage(Session?session)?throws?JMSException?{
????????????????TextMessage?textMessage?=?session.createTextMessage(message);
????????????????return?textMessage;
????????????}
????????});
????????System.out.println("發送消息:"?+?message);
????}
}
3. Spring 啟動 生產者
package?net.codingme.jms.producer;import?org.springframework.context.support.ClassPathXmlApplicationContext;
/**
?*?
?*?啟動器
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?21:48
?*/
public?class?AppProducer?{
????public?static?void?main(String[]?args)?{
????????//?裝載配置文件
????????ClassPathXmlApplicationContext?context?=?new?ClassPathXmlApplicationContext("classpath:producer.xml");
????????ProducerService?service?=?context.getBean(ProducerService.class);
????????for?(int?i?=?0;?i?10;?i++)?{
????????????service.sendMessage("test"?+?i);
????????}
????????context.close();
????}
}
消費者編寫
Spring啟動和生產者類似。下面是消費者監聽器的實現。
package?net.codingme.jms.consumer;import?javax.jms.JMSException;
import?javax.jms.Message;
import?javax.jms.MessageListener;
import?javax.jms.TextMessage;
/**
?*?
?*?消息監聽器
?*
?*?@Author?niujinpeng
?*?@Date?2018/11/25?22:28
?*/
public?class?ConsumerMessageListener?implements?MessageListener?{
????@Override
????public?void?onMessage(Message?message)?{
????????TextMessage?textMessage?=?(TextMessage)?message;
????????try?{
????????????System.out.println("接收消息:"?+?textMessage.getText());
????????}?catch?(JMSException?e)?{
????????????e.printStackTrace();
????????}
????}
}
運行測試
首先主題模式下啟動兩個消費者,使用生產者推送10條消息。
在每個消費者下面都可以看到推送的完整消息。
文中代碼已經上傳到GitHub:
https://github.com/niumoo/message-queue
推薦閱讀
(點擊標題可跳轉閱讀)
夯實Java基礎系列16:一文讀懂Java IO流和常見面試題
夯實Java基礎系列15:Java注解簡介和最佳實踐
夯實Java基礎系列14:深入理解Java枚舉類
夯實Java基礎系列11:深入理解Java中的回調機制
夯實Java基礎系列10:深入理解Java中的異常體系
夯實Java基礎系列9:深入理解Class類和Object類
夯實Java基礎系列8:深入理解Java內部類及其實現原理
夯實Java基礎系列7:一文讀懂Java 代碼塊和代碼執行順序
一文搞懂抽象類和接口,從基礎到面試題,揭秘其本質區別!
一文讀懂 Java 文件和包結構,解讀開發中常用的 jar 包
一文了解 final 關鍵字的特性、使用方法以及實現原理
點個“在看”,轉發朋友圈,都是對我最好的支持!總結
以上是生活随笔為你收集整理的arouter跨module传递消息_消息队列中间件(二)使用 ActiveMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: window服务器cpu过高的排查_高频
- 下一篇: 梳妆镜对卧室门好不好