RocketMQ的核心概念,一一梳理清楚
生活随笔
收集整理的這篇文章主要介紹了
RocketMQ的核心概念,一一梳理清楚
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
目錄
一、MQ概述
1、MQ簡介
2、MQ用途
限流削峰
異步解耦
數(shù)據(jù)收集
二、RocketMQ的基本概念
1 消息(Message)
2 主題(Topic)
3 標(biāo)簽(Tag)
4 隊(duì)列(Queue)
5 消息標(biāo)識(MessageId/Key)
三、系統(tǒng)架構(gòu)
1 Producer
2 Consumer
3 Name Server
功能介紹
路由注冊
路由剔除
路由發(fā)現(xiàn)
客戶端NameServer選擇策略
4 Broker
功能介紹
模塊構(gòu)成
集群部署
5 工作流程
具體流程
Topic的創(chuàng)建模式
讀/寫隊(duì)列
一、MQ概述
1、MQ簡介
????????MQ,Message Queue,是一種提供消息隊(duì)列服務(wù)的中間件,也稱為消息中間件,是一套提供了消息生產(chǎn)、存儲、消費(fèi)全過程API的軟件系統(tǒng)。消息即數(shù)據(jù)。一般消息的體量不會很大。2、MQ用途
從網(wǎng)上可以查看到很多的關(guān)于MQ用途的敘述,但總結(jié)起來其實(shí)就以下三點(diǎn)。限流削峰
MQ可以將系統(tǒng)的超量請求暫存其中,以便系統(tǒng)后期可以慢慢進(jìn)行處理,從而避免了請求的丟失或系統(tǒng)被壓垮。?
異步解耦
????????上游系統(tǒng)對下游系統(tǒng)的調(diào)用若為同步調(diào)用,則會大大降低系統(tǒng)的吞吐量與并發(fā)度,且系統(tǒng)耦合度太高。 ????????而異步調(diào)用則會解決這些問題。所以兩層之間若要實(shí)現(xiàn)由同步到異步的轉(zhuǎn)化,一般性做法就是,在這兩層間添加一個MQ層。?
數(shù)據(jù)收集
????????分布式系統(tǒng)會產(chǎn)生海量級數(shù)據(jù)流,如:業(yè)務(wù)日志、監(jiān)控數(shù)據(jù)、用戶行為等。針對這些數(shù)據(jù)流進(jìn)行實(shí)時或批量采集匯總,然后對這些數(shù)據(jù)流進(jìn)行大數(shù)據(jù)分析,這是當(dāng)前互聯(lián)網(wǎng)平臺的必備技術(shù)。通過MQ完成此類數(shù)據(jù)收集是最好的選擇。二、RocketMQ的基本概念
1 消息(Message)
消息是指,消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費(fèi)數(shù)據(jù)的最小單位,每條消息必須屬于一個主 題。2 主題(Topic)
????????Topic表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進(jìn)行消息訂閱的基本單位。 topic:message 1:n message:topic 1:1 ????????一個生產(chǎn)者可以同時發(fā)送多種Topic的消息;而一個消費(fèi)者只對某種特定的Topic感興趣,即只可以訂閱和消費(fèi)一種Topic的消息。 producer:topic 1:n consumer:topic 1:1?
3 標(biāo)簽(Tag)
為消息設(shè)置的標(biāo)簽,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息,可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標(biāo)簽。標(biāo)簽?zāi)軌蛴行У乇3执a的清晰度和連貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費(fèi)者可以根據(jù)Tag實(shí)現(xiàn)對不同子主題的不同消費(fèi)邏輯,實(shí)現(xiàn)更好的擴(kuò)展性。 Topic是消息的一級分類,Tag是消息的二級分類。 例如: ------ 生產(chǎn)者生產(chǎn)------- Topic:貨物 tag=上海 tag=江蘇 tag=浙江 ------- 消費(fèi)者消費(fèi)?----- topic=貨物 tag = 上海 topic=貨物 tag = 上海|浙江 topic=貨物 tag = *4 隊(duì)列(Queue)
????????存儲消息的物理實(shí)體。一個Topic中可以包含多個Queue,每個Queue中存放的就是該Topic的消息。一 個Topic的Queue也被稱為一個Topic中消息的分區(qū)(Partition)。 ????????一個Topic的Queue中的消息只能被一個消費(fèi)者組中的一個消費(fèi)者消費(fèi)。一個Queue中的消息不允許同 一個消費(fèi)者組中的多個消費(fèi)者同時消費(fèi)。?
????????在學(xué)習(xí)參考其它相關(guān)資料時,還會看到一個概念:分片(Sharding)。分片不同于分區(qū)。在RocketMQ 中,分片指的是存放相應(yīng)Topic的Broker。每個分片中會創(chuàng)建出相應(yīng)數(shù)量的分區(qū),即Queue,每個 Queue的大小都是相同的。?
5 消息標(biāo)識(MessageId/Key)
????????RocketMQ中每個消息擁有唯一的MessageId,且可以攜帶具有業(yè)務(wù)標(biāo)識的Key,以方便對消息的查詢。 不過需要注意的是,MessageId有兩個:在生產(chǎn)者send()消息時會自動生成一個MessageId(msgId), 當(dāng)消息到達(dá)Broker后,Broker也會自動生成一個MessageId(offsetMsgId)。msgId、offsetMsgId與key都 稱為消息標(biāo)識。 msgId:由producer端生成,其生成規(guī)則為: ????????producerIp + 進(jìn)程pid + MessageClientIDSetter類的ClassLoader的hashCode + 當(dāng)前時間 + AutomicInteger自增計數(shù)器 offsetMsgId:由broker端生成,其生成規(guī)則為:brokerIp + 物理分區(qū)的offset(Queue中的偏移量) key:由用戶指定的業(yè)務(wù)相關(guān)的唯一標(biāo)識三、系統(tǒng)架構(gòu)
RocketMQ架構(gòu)上主要分為四部分構(gòu)成:
1 Producer
????????消息生產(chǎn)者,負(fù)責(zé)生產(chǎn)消息。Producer通過MQ的負(fù)載均衡模塊選擇相應(yīng)的Broker集群隊(duì)列進(jìn)行消息投 遞,投遞的過程支持快速失敗并且低延遲。?
例如,業(yè)務(wù)系統(tǒng)產(chǎn)生的日志寫入到MQ的過程,就是消息生產(chǎn)的過程 再如,電商平臺中用戶提交的秒殺請求寫入到MQ的過程,就是消息生產(chǎn)的過程 ????????RocketMQ中的消息生產(chǎn)者都是以生產(chǎn)者組(Producer Group)的形式出現(xiàn)的。生產(chǎn)者組是同一類生產(chǎn) 者的集合,這類Producer發(fā)送相同Topic類型的消息。一個生產(chǎn)者組可以同時發(fā)送多個主題的消息。2 Consumer
????????消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息。一個消息消費(fèi)者會從Broker服務(wù)器中獲取到消息,并對消息進(jìn)行相關(guān)業(yè)務(wù)處理。? 例如,QoS系統(tǒng)從MQ中讀取日志,并對日志進(jìn)行解析處理的過程就是消息消費(fèi)的過程。 再如,電商平臺的業(yè)務(wù)系統(tǒng)從MQ中讀取到秒殺請求,并對請求進(jìn)行處理的過程就是消息消費(fèi)的過程。 ????????RocketMQ中的消息消費(fèi)者都是以消費(fèi)者組(Consumer Group)的形式出現(xiàn)的。消費(fèi)者組是同一類消 費(fèi)者的集合,這類Consumer消費(fèi)的是同一個Topic類型的消息。消費(fèi)者組使得在消息消費(fèi)方面,實(shí)現(xiàn)負(fù)載均衡(將一個Topic中的不同的Queue平均分配給同一個Consumer Group的不同的Consumer,注意,并不是將消息負(fù)載均衡)和容錯(一個Consmer掛了,該Consumer Group中的其它Consumer可以接著消費(fèi)原Consumer消費(fèi)的Queue)的目標(biāo)變得非常容易。
?
????????消費(fèi)者組中Consumer的數(shù)量應(yīng)該小于等于訂閱Topic的Queue數(shù)量。如果超出Queue數(shù)量,則多出的Consumer將不能消費(fèi)消息。?
不過,一個Topic類型的消息可以被多個消費(fèi)者組同時消費(fèi)。 注意, 1)消費(fèi)者組只能消費(fèi)一個Topic的消息,不能同時消費(fèi)多個Topic消息 2)一個消費(fèi)者組中的消費(fèi)者必須訂閱完全相同的Topic3 Name Server
功能介紹
????????NameServer是一個Broker與Topic路由的注冊中心,支持Broker的動態(tài)注冊與發(fā)現(xiàn)。 ????????RocketMQ的思想來自于Kafka,而Kafka是依賴了Zookeeper的。所以,在RocketMQ的早期版本,即在 MetaQ v1.0與v2.0版本中,也是依賴于Zookeeper的。從MetaQ v3.0,即RocketMQ開始去掉了Zookeeper依賴,使用了自己的NameServer。 主要包括兩個功能: ????????Broker管理:接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù);提供心跳檢測機(jī)制,檢查Broker是否還存活。 ????????路由信息管理:每個NameServer中都保存著Broker集群的整個路由信息和用于客戶端查詢的隊(duì)列信息。Producer和Conumser通過NameServer可以獲取整個Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。路由注冊
????????NameServer通常也是以集群的方式部署,不過,NameServer是無狀態(tài)的,即NameServer集群中的各個節(jié)點(diǎn)間是無差異的,各節(jié)點(diǎn)間相互不進(jìn)行信息通訊。那各節(jié)點(diǎn)中的數(shù)據(jù)是如何進(jìn)行數(shù)據(jù)同步的呢?在Broker節(jié)點(diǎn)啟動時,輪詢NameServer列表,與每個NameServer節(jié)點(diǎn)建立長連接,發(fā)起注冊請求。在NameServer內(nèi)部維護(hù)著?個Broker列表,用來動態(tài)存儲Broker的信息。 注意,這是與其它像zk、Eureka、Nacos等注冊中心不同的地方。 這種NameServer的無狀態(tài)方式,有什么優(yōu)缺點(diǎn): 優(yōu)點(diǎn):NameServer集群搭建簡單,擴(kuò)容簡單。 缺點(diǎn):對于Broker,必須明確指出所有NameServer地址。否則未指出的將不會去注冊。也正因?yàn)槿绱?#xff0c;NameServer并不能隨便擴(kuò)容。因?yàn)?#xff0c;若Broker不重新配置,新增的NameServer對于Broker來說是不可見的,其不會向這個NameServer進(jìn)行注冊。 ????????Broker節(jié)點(diǎn)為了證明自己是活著的,為了維護(hù)與NameServer間的長連接,會將最新的信息以心跳包的方式上報給NameServer,每30秒發(fā)送一次心跳。心跳包中包含 BrokerId、Broker地址(IP+Port)、Broker名稱、Broker所屬集群名稱等等。NameServer在接收到心跳包后,會更新心跳時間戳,記錄這個Broker的最新存活時間。路由剔除
????????由于Broker關(guān)機(jī)、宕機(jī)或網(wǎng)絡(luò)抖動等原因,NameServer沒有收到Broker的心跳,NameServer可能會將其從Broker列表中剔除。 ????????NameServer中有?個定時任務(wù),每隔10秒就會掃描?次Broker表,查看每一個Broker的最新心跳時間戳距離當(dāng)前時間是否超過120秒,如果超過,則會判定Broker失效,然后將其從Broker列表中剔除。 擴(kuò)展:對于RocketMQ日常運(yùn)維工作,例如Broker升級,需要停掉Broker的工作。OP需要怎么 做? OP需要將Broker的讀寫權(quán)限禁掉。一旦client(Consumer或Producer)向broker發(fā)送請求,都會收到broker的NO_PERMISSION響應(yīng),然后client會進(jìn)行對其它Broker的重試。 當(dāng)OP觀察到這個Broker沒有流量后,再關(guān)閉它,實(shí)現(xiàn)Broker從NameServer的移除。 OP:運(yùn)維工程師 SRE:Site Reliability Engineer,現(xiàn)場可靠性工程師路由發(fā)現(xiàn)
RocketMQ的路由發(fā)現(xiàn)采用的是Pull模型。當(dāng)Topic路由信息出現(xiàn)變化時,NameServer不會主動推送給 客戶端,而是客戶端定時拉取主題最新的路由。默認(rèn)客戶端每30秒會拉取一次最新的路由。 擴(kuò)展: 1)Push模型:推送模型。其實(shí)時性較好,是一個“發(fā)布-訂閱”模型,需要維護(hù)一個長連接。而長連接的維護(hù)是需要資源成本的。該模型適合于的場景: 實(shí)時性要求較高,Client數(shù)量不多,Server數(shù)據(jù)變化較頻繁 2)Pull模型:拉取模型。存在的問題是,實(shí)時性較差。 3)Long Polling模型:長輪詢模型。其是對Push與Pull模型的整合,充分利用了這兩種模型的優(yōu)勢,屏蔽了它們的劣勢。客戶端NameServer選擇策略
這里的客戶端指的是Producer與Consumer 客戶端在配置時必須要寫上NameServer集群的地址,那么客戶端到底連接的是哪個NameServer節(jié)點(diǎn)呢?客戶端首先會生產(chǎn)一個隨機(jī)數(shù),然后再與NameServer節(jié)點(diǎn)數(shù)量取模,此時得到的就是所要連接的節(jié)點(diǎn)索引,然后就會進(jìn)行連接。如果連接失敗,則會采用round-robin策略,逐個嘗試著去連接其它節(jié)點(diǎn)。 首先采用的是隨機(jī)策略進(jìn)行的選擇,失敗后采用的是輪詢策略。 擴(kuò)展:Zookeeper Client是如何選擇Zookeeper Server的? 簡單來說就是,經(jīng)過兩次Shuf? e,然后選擇第一臺Zookeeper Server。 詳細(xì)說就是,將配置文件中的zk server地址進(jìn)行第一次shuf? e,然后隨機(jī)選擇一個。這個選擇出 的一般都是一個hostname。然后獲取到該hostname對應(yīng)的所有ip,再對這些ip進(jìn)行第二次shuf? e,從shuf? e過的結(jié)果中取第一個server地址進(jìn)行連接。4 Broker
功能介紹
????????Broker充當(dāng)著消息中轉(zhuǎn)角色,負(fù)責(zé)存儲消息、轉(zhuǎn)發(fā)消息。Broker在RocketMQ系統(tǒng)中負(fù)責(zé)接收并存儲從生產(chǎn)者發(fā)送來的消息,同時為消費(fèi)者的拉取請求作準(zhǔn)備。Broker同時也存儲著消息相關(guān)的元數(shù)據(jù),包括消費(fèi)者組消費(fèi)進(jìn)度偏移offset、主題、隊(duì)列等。 Kafka 0.8版本之后,offset是存放在Broker中的,之前版本是存放在Zookeeper中的。模塊構(gòu)成
下圖為Broker Server的功能模塊示意圖。?
Remoting Module:整個Broker的實(shí)體,負(fù)責(zé)處理來自clients端的請求。而這個Broker實(shí)體則由以下模塊構(gòu)成。 Client Manager:客戶端管理器。負(fù)責(zé)接收、解析客戶端(Producer/Consumer)請求,管理客戶端。例如,維護(hù)Consumer的Topic訂閱信息 Store Service:存儲服務(wù)。提供方便簡單的API接口,處理消息存儲到物理硬盤和消息查詢功能。 HA Service:高可用服務(wù),提供Master Broker 和 Slave Broker之間的數(shù)據(jù)同步功能。 Index Service:索引服務(wù)。根據(jù)特定的Message key,對投遞到Broker的消息進(jìn)行索引服務(wù),同時也提供根據(jù)Message Key對消息進(jìn)行快速查詢的功能。集群部署
????????為了增強(qiáng)Broker性能與吞吐量,Broker一般都是以集群形式出現(xiàn)的。各集群節(jié)點(diǎn)中可能存放著相同 Topic的不同Queue。不過,這里有個問題,如果某Broker節(jié)點(diǎn)宕機(jī),如何保證數(shù)據(jù)不丟失呢?其解決方案是,將每個Broker集群節(jié)點(diǎn)進(jìn)行橫向擴(kuò)展,即將Broker節(jié)點(diǎn)再建為一個HA集群,解決單點(diǎn)問題。 ????????Broker節(jié)點(diǎn)集群是一個主從集群,即集群中具有Master與Slave兩種角色。Master負(fù)責(zé)處理讀寫操作請求,Slave負(fù)責(zé)對Master中的數(shù)據(jù)進(jìn)行備份。當(dāng)Master掛掉了,Slave則會自動切換為Master去工作。所以這個Broker集群是主備集群。一個Master可以包含多個Slave,但一個Slave只能隸屬于一個Master。Master與Slave 的對應(yīng)關(guān)系是通過指定相同的BrokerName、不同的BrokerId 來確定的。BrokerId為0表示Master,非0表示Slave。每個Broker與NameServer集群中的所有節(jié)點(diǎn)建立長連接,定時注冊Topic信息到所有NameServer。?
5 工作流程
具體流程
1)啟動NameServer,NameServer啟動后開始監(jiān)聽端口,等待Broker、Producer、Consumer連接。 2)啟動Broker時,Broker會與所有的NameServer建立并保持長連接,然后每30秒向NameServer定時發(fā)送心跳包。 3)發(fā)送消息前,可以先創(chuàng)建Topic,創(chuàng)建Topic時需要指定該Topic要存儲在哪些Broker上,當(dāng)然,在創(chuàng)建Topic時也會將Topic與Broker的關(guān)系寫入到NameServer中。不過,這步是可選的,也可以在發(fā)送消息時自動創(chuàng)建Topic。 4)Producer發(fā)送消息,啟動時先跟NameServer集群中的其中一臺建立長連接,并從NameServer中獲取路由信息,即當(dāng)前發(fā)送的Topic消息的Queue與Broker的地址(IP+Port)的映射關(guān)系。然后根據(jù)算法策略從隊(duì)選擇一個Queue,與隊(duì)列所在的Broker建立長連接從而向Broker發(fā)消息。當(dāng)然,在獲取到路由信息后,Producer會首先將路由信息緩存到本地,再每30秒從NameServer更新一次路由信息。 5)Consumer跟Producer類似,跟其中一臺NameServer建立長連接,獲取其所訂閱Topic的路由信息,然后根據(jù)算法策略從路由信息中獲取到其所要消費(fèi)的Queue,然后直接跟Broker建立長連接,開始消費(fèi)其中的消息。Consumer在獲取到路由信息后,同樣也會每30秒從NameServer更新一次路由信息。不過不同于Producer的是,Consumer還會向Broker發(fā)送心跳,以確保Broker的存活狀態(tài)。Topic的創(chuàng)建模式
手動創(chuàng)建Topic時,有兩種模式: ????????集群模式:該模式下創(chuàng)建的Topic在該集群中,所有Broker中的Queue數(shù)量是相同的。 ????????Broker模式:該模式下創(chuàng)建的Topic在該集群中,每個Broker中的Queue數(shù)量可以不同。 自動創(chuàng)建Topic時,默認(rèn)采用的是Broker模式,會為每個Broker默認(rèn)創(chuàng)建4個Queue。讀/寫隊(duì)列
????????從物理上來講,讀/寫隊(duì)列是同一個隊(duì)列。所以,不存在讀/寫隊(duì)列數(shù)據(jù)同步問題。讀/寫隊(duì)列是邏輯上進(jìn)行區(qū)分的概念。一般情況下,讀/寫隊(duì)列數(shù)量是相同的。 ????????例如,創(chuàng)建Topic時設(shè)置的寫隊(duì)列數(shù)量為8,讀隊(duì)列數(shù)量為4,此時系統(tǒng)會創(chuàng)建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到這8個隊(duì)列,但Consumer只會消費(fèi)0 1 2 3這4個隊(duì)列中的消息,4 5 6 7中的消息是不會被消費(fèi)到的。 ????????再如,創(chuàng)建Topic時設(shè)置的寫隊(duì)列數(shù)量為4,讀隊(duì)列數(shù)量為8,此時系統(tǒng)會創(chuàng)建8個Queue,分別是0 1 2 3 4 5 6 7。Producer會將消息寫入到0 1 2 3 這4個隊(duì)列,但Consumer只會消費(fèi)0 1 2 3 4 5 6 7這8個隊(duì)列中 的消息,但是4 5 6 7中是沒有消息的。此時假設(shè)Consumer Group中包含兩個Consuer,Consumer1消 費(fèi)0 1 2 3,而Consumer2消費(fèi)4 5 6 7。但實(shí)際情況是,Consumer2是沒有消息可消費(fèi)的。 ????????也就是說,當(dāng)讀/寫隊(duì)列數(shù)量設(shè)置不同時,總是有問題的。那么,為什么要這樣設(shè)計呢? ????????其這樣設(shè)計的目的是為了,方便Topic的Queue的縮容。 ????????例如,原來創(chuàng)建的Topic中包含16個Queue,如何能夠使其Queue縮容為8個,還不會丟失消息?可以動態(tài)修改寫隊(duì)列數(shù)量為8,讀隊(duì)列數(shù)量不變。此時新的消息只能寫入到前8個隊(duì)列,而消費(fèi)都消費(fèi)的卻是16個隊(duì)列中的數(shù)據(jù)。當(dāng)發(fā)現(xiàn)后8個Queue中的消息消費(fèi)完畢后,就可以再將讀隊(duì)列數(shù)量動態(tài)設(shè)置為8。整個縮容過程,沒有丟失任何消息。 ????????perm用于設(shè)置對當(dāng)前創(chuàng)建Topic的操作權(quán)限:2表示只寫,4表示只讀,6表示讀寫。總結(jié)
以上是生活随笔為你收集整理的RocketMQ的核心概念,一一梳理清楚的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ThreadLocal应用-使用Thre
- 下一篇: RocketMQ-单机版安装与启动详细步