javascript
Spring Cloud —— 消息队列与 RocketMQ
導航
- 一、什么是 MQ
- 二、常見的 MQ 產品
- 三、RocketMQ 概念與架構設計
- 3.1 基本概念
- 1、消息模型(Message Model)
- 2、生產者與消費者(Producer & Consumer)
- 3、主題(Topic)
- 4、代理服務器與名稱服務(Broker Server & Name Server)
- 5、拉取式與推送式消費
- 6、生產者組與消費者組
- 7、集群消費與廣播消費
- 8、消息(Message)
- 9、標簽(Tag)
- 10、順序消息(Ordered Message)
- 3.2 架構設計
- 3.3 部署架構
- 3.4 集群工作流程
- 四、RocketMQ 搭建
- 4.1 基礎環境搭建
- 4.2 控制臺安裝
- 五、Java 實現 MQ 消息發送與接收
- 六、在 Spring 中使用 RocketMQ
- 6.1 消息生產端
- 6.2 消息消費端
一、什么是 MQ
MQ 是 Message Queue 的縮寫,譯為 “消息隊列”。
MQ 的主要職責就是轉發生產者的消息給消費者,并具備一定的消息緩存能力,在分布式系統中,常常用于各個應用進程之間的通訊行為。
在傳統的應用間通訊手段上,往往大多采用直接訪問對方URL等同步的數據傳輸方式,客戶端與服務端的消息耦合,這在某些要求實時性和必要性的業務場景下是必需的,但對于某些業務場景,例如短信通知、郵件通知等,本身并不是主業務流程中必要的關鍵環節,實時性也要求不高,因此,完全可以采用異步的方式來完成,MQ的一個重要作用就是基于這種情況,實現應用間、業務間的異步解耦,是將比較耗時且不需要即時響應的的操作作為消息放入消息隊列,同時,由于使用了MQ,只要保證消息格式不變,消息的發送方和接收方并不需要聯系彼此,也不需要受對方處理速度的影響,即解耦合。
流量削峰也是MQ 的常用場景,一般在秒殺或團購活動中使用廣泛。
二、常見的 MQ 產品
目前業界有很多MQ產品,比較出名的有以下這些:
1、ZeroMQ
號稱最快的消息隊列系統,尤其針對大吞吐量的需求場景。擴展性好,開發比較靈活,采用C語言實現,實際上只是一個socket庫的重新封裝,如果做為消息隊列使用,需要開發大量的代碼。ZeroMQ僅提供非持久性的隊列,也就是說如果down機,數據將會丟失。
2、RabbitMQ
使用 erlang 語言開發,性能較好,適合企業級開發,但不利于做二次開發和維護。
3、ActiveMQ
歷史悠久的 Apache 開源項目。已經在很多產品中得到應用,實現了JMS 1.1 規范,可以和 spring-jms 輕松融合,實現了多種協議,支持持久化,對隊列數較多的情況支持不好。
4、RocketMQ
阿里開源的 MQ 組件,由Java 開發,性能很好,使用簡單。
5、Kafka
Apache 下的一個子項目,是一個高性能跨語言分布式 Publish/Subscribe 消息隊列系統,相對于ActiveMQ 是一個非常輕量級的消息系統,除了性能非常好之外,還是一個工作良好的分布式系統。
三、RocketMQ 概念與架構設計
本節內容取材自官方文檔:基本概念、架構設計
3.1 基本概念
Rocket MQ 中重要角色的比喻:
Producer 寄件人、Consumer 收件人、NameServer 郵局、Broker 郵遞員。
1、消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,Producer 負責生產消息,Consumer 負責消費消息,Broker 負責存儲消息。這三者共同組成 RocketMQ 的消息模型。Broker 在實際部署過程中對應一臺服務器,每個 Broker 可以存儲多個Topic的消息,每個Topic的消息也可以分片存儲于不同的 Broker。Message Queue 用于存儲消息的物理地址,每個Topic中的消息地址存儲于多個 Message Queue 中。ConsumerGroup 由多個Consumer 實例構成。
2、生產者與消費者(Producer & Consumer)
生產者負責生產消息,一般由業務系統負責生產消息。一個消息生產者會把業務應用系統里產生的消息發送到broker服務器。RocketMQ提供多種發送方式,同步發送、異步發送、順序發送、單向發送。同步和異步方式均需要Broker返回確認信息,單向發送不需要。Rocket MQ 支持分布式集群方式部署,Producer 通過MQ的負載均衡模塊選擇相應的 Broker 集群隊列進行消息投遞,投遞過程支持快速失敗并且低延遲。
消費者負責消費消息,一般是后臺系統負責異步消費。一個消息消費者會從Broker服務器拉取消息、并將其提供給應用程序。從用戶應用的角度而言提供了pull 和 push 兩種消費形式。RocketMQ 支持分布式集群方式部署,同時支持實時消息訂閱機制和集群、廣播等消費模式。
3、主題(Topic)
表示一類消息的集合。每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。
4、代理服務器與名稱服務(Broker Server & Name Server)
Broker Server 消息中轉角色,負責存儲消息、轉發消息。代理服務器在RocketMQ系統中負責接收從生產者發送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務器也存儲消息相關的元數據,包括消費者組、消費進度偏移和主題和隊列消息等。
Name Server 充當路由消息的提供者。生產者或消費者能夠通過名字服務查找各主題相應的Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
5、拉取式與推送式消費
拉取式:應用通常主動調用Consumer的拉消息方法從Broker服務器拉消息、主動權由應用控制。一旦獲取了批量消息,應用就會啟動消費過程。
推送式:該模式下Broker收到數據后會主動推送給消費端,該消費模式一般實時性較高。
6、生產者組與消費者組
同一類Producer或Consumer的集合,發送或消費同一類消息且發送或消費邏輯一致。
7、集群消費與廣播消費
這是消費者組的兩種消息模式。
集群消費模式,相同Consumer Group的每個Consumer實例平均分攤消息。
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
8、消息(Message)
消息系統所傳輸信息的物理載體,每條消息必須屬于一個主題。每個消息擁有唯一的Message ID,且可以攜帶具有業務標識的Key。系統提供了通過Message ID和Key查詢消息的功能。
9、標簽(Tag)
為消息設置的標志,用于同一主題下區分不同類型的消息。消費者可以根據Tag實現對不同子主題的不同消費邏輯,實現更好的擴展性。
10、順序消息(Ordered Message)
順序消息分為兩種類型。
普通順序消息,消費者通過同一個消息隊列( Topic 分區,稱作 Message Queue) 收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。
嚴格順序消息,消費者收到的所有消息均是有順序的。
3.2 架構設計
RocketMQ 架構上主要分為四部分:生產者、消費者、Name Server、Broker Server:
-
NameServer:NameServer 是一個非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動態注冊與發現。主要包括兩個功能:Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數據。然后提供心跳檢測機制,檢查Broker是否還存活;路由信息管理,每個NameServer將保存關于Broker集群的整個路由信息和用于客戶端查詢的隊列信息。然后Producer和Conumser通過NameServer就可以知道整個Broker集群的路由信息,從而進行消息的投遞和消費。NameServer通常也是集群的方式部署,各實例間相互不進行信息通訊。Broker是向每一臺NameServer注冊自己的路由信息,所以每一個NameServer實例上面都保存一份完整的路由信息。當某個NameServer因某種原因下線了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以動態感知Broker的路由的信息。
-
BrokerServer:Broker主要負責消息的存儲、投遞和查詢以及服務高可用保證,為了實現這些功能,Broker包含了以下幾個重要子模塊。
– Remoting Module:整個Broker的實體,負責處理來自clients端的請求。
– Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息
– Store Service:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能。
– HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能。
– Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。
3.3 部署架構
- NameServer是一個幾乎無狀態節點,可集群部署,節點之間無任何信息同步。
- Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關系通過指定相同的BrokerName,不同的BrokerId 來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有NameServer。 注意:當前RocketMQ版本在部署架構上支持一Master多Slave,但只有BrokerId=1的從服務器才會參與消息的讀負載。
- Producer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic 服務的Master建立長連接,且定時向Master發送心跳。Producer完全無狀態,可集群部署。
- Consumer與NameServer集群中的其中一個節點(隨機選擇)建立長連接,定期從NameServer獲取Topic路由信息,并向提供Topic服務的Master、Slave建立長連接,且定時向Master、Slave發送心跳。Consumer既可以從Master訂閱消息,也可以從Slave訂閱消息,消費者在向Master拉取消息時,Master服務器會根據拉取偏移量與最大偏移量的距離(判斷是否讀老消息,產生讀I/O),以及從服務器是否可讀等因素建議下一次是從Master還是Slave拉取。
3.4 集群工作流程
1、啟動NameServer,NameServer起來后監聽端口,等待Broker、Producer、Consumer連上來,相當于一個路由控制中心。
2、Broker啟動,跟所有的NameServer保持長連接,定時發送心跳包。心跳包中包含當前Broker信息(IP+端口等)以及存儲所有Topic信息。注冊成功后,NameServer集群中就有Topic跟Broker的映射關系。
3、收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic。
4、Producer發送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取當前發送的Topic存在哪些Broker上,輪詢從隊列列表中選擇一個隊列,然后與隊列所在的Broker建立長連接從而向Broker發消息。
5、Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取當前訂閱Topic存在哪些Broker上,然后直接跟Broker建立連接通道,開始消費消息。
四、RocketMQ 搭建
RocketMQ是阿里開源的分布式消息中間件,現在是Apache 的一個頂級項目,在阿里內部使用非常廣泛,已經經過了“雙十一”萬億級場景下的消息流轉。
(待補充)
4.1 基礎環境搭建
準備環境:Linux系統 CentOS6 64位(虛擬機),IP:192.168.1.140,JDK8
1、下載并上傳 RocketMQ
打開官網下載頁面:Release Notes - Apache RocketMQ - Version 4.9.1 下載 Binary 版本 zip 包:
下載后上傳到 Linux /usr/local/src/ 目錄下:
2、解壓縮,并移動到安裝目錄
3、啟動 RocketMQ
切換到 RocketMQ 安裝目錄,啟動 NameServer、BrokerServer,啟動腳本在 bin 目錄下。& 代表后臺啟動
查看 rocketmq 啟動日志,可以看到 The Name Server boot success 字眼,說明NameServer啟動成功:
啟動 Broker 之前,需要修改幾項配置。
然后啟動 Broker
# -n 代表 NameServer 地址 nohup ./bin/mqbroker -n localhost:9876 &查看 Broker 啟動日志
4、測試RocketMQ
官方提供了兩個測試腳本用于驗證 RocketMQ 的可用性。
開啟兩個終端,分別執行以下命令:
Producer 發送消息:
export NAMESRV_ADDR=localhost:9876 ./bin/tools.sh org.apache.rocketmq.example.quickstart.ProducerConsumer 接收消息:
export NAMESRV_ADDR=localhost:9876 ./bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer可以看到發送消息和接收消息都正常完成了:
5、關閉 RocketMQ
4.2 控制臺安裝
1、下載
在 git 上下載工程
https://github.com/apache/rocketmq-externals/releases
2、修改配置文件
修改 rocketmq-console\src\main\resources\application.properties
3、打成 jar 包,并啟動
進入控制臺項目,將工程打成 jar 包
4、訪問控制臺
打開瀏覽器,輸入 http://localhost:7777 ,就可以看到如下界面:
五、Java 實現 MQ 消息發送與接收
本節使用 main 方法實現簡單的 rocketmq 的消息發送和接收,在此之前,需要確認好是否已經完成前面的 RocketMQ 的環境部署,以及控制臺的安裝。
1、引入依賴
在需要使用 rocketmq 的項目中加入maven依賴
2、編寫消息發送端
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message;public class RocketMQSendTest {public static void main(String[] args) throws Exception {// 1. 創建消息生產者,并設置生產組名DefaultMQProducer producer = new DefaultMQProducer("shop-order");// 2. 為生產者設置 NameServer 地址producer.setNamesrvAddr("192.168.1.140:9876");// 3. 啟動生產者producer.start();// 4. 構建消息對象,主要是設置消息的主題、標簽、內容Message message = new Message("topic-order", "morty", "Test RocketMQ Message".getBytes());// 5. 發送消息,第二個參數代表超時時間SendResult result = producer.send(message, 10000);System.out.println("發送結果:" + result);// 6. 關閉生產者producer.shutdown();} }代碼可直接運行,由于 MQ是一種解耦組件,所以,可以直接向MQ 中發送消息而不需要等待消費者。
3、編寫消息接收端
消息接收者基于訂閱監聽機制,需要注冊相應的監聽器完成消息的消費:
啟動消費者代碼,觀察日志:
同時,也可以看到 RocketMQ 控制臺有相關主題信息展示:
六、在 Spring 中使用 RocketMQ
以 shop-order、shop-user 兩個微服務為基礎,實現一個下單的消息通知功能。
下單消息通知功能要求,下單后向用戶發送下單消息,結構如下圖所示:
6.1 消息生產端
1、在 shop-order 中添加 rocketmq 依賴
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version> </dependency> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version> </dependency>2、添加配置
rocketmq:name-server: 192.168.1.140:9876producer:group: shop-order3、編寫發送消息代碼
在下單成功后,使用 rocketmq 的接口實現消息的發送。
4、測試
訪問下單接口,觀察 RocketMQ 控制臺。
6.2 消息消費端
1、添加必要的依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-nacos-discovery</artifactId> </dependency> <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version> </dependency>2、配置 rocketmq NameServer 地址
rocketmq:name-server: 192.168.1.140:98763、編寫 MQ 監聽器
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener;@Slf4j @Service @RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic") public class OrderListener implements RocketMQListener<Order> {@Overridepublic void onMessage(Order order) {log.info("接收到了下單成功的消息,{}", order);} }4、測試消息接收
啟動 shop-user 模塊,觀察日志,可以看到應用一啟動成功,就收到了來自 MQ 的訂閱消息:
總結
以上是生活随笔為你收集整理的Spring Cloud —— 消息队列与 RocketMQ的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mcp证书有什么用_建造师的行情怎么样呢
- 下一篇: linux 查看登入记录_无时无刻,用