rocketmq新扩容的broker没有tps_深入研究RocketMQ消费者是如何获取消息的
前言
小伙伴們,國(guó)慶都過的開心嗎?國(guó)慶后的第一個(gè)工作日是不是很多小伙伴還沉浸在假期的心情中,沒有工作狀態(tài)呢?
那王子今天和大家聊一聊RocketMQ的消費(fèi)者是如何獲取消息的,通過學(xué)習(xí)知識(shí)來找回狀態(tài)吧。
廢話不多說,我們開始吧。
消費(fèi)者組
首先我們了解一個(gè)概念,什么是消費(fèi)者組。
消費(fèi)者組你就可以把它理解為,給一組消費(fèi)者起一個(gè)名字。
假設(shè)我們有一個(gè)訂單Topic名字是OrderTopic,然后庫(kù)存系統(tǒng)和積分系統(tǒng)都要消費(fèi)這個(gè)Topic中的數(shù)據(jù),我們分別給庫(kù)存系統(tǒng)和積分系統(tǒng)起一個(gè)消費(fèi)組名字:stock_consumer_group、score_consumer_group。
設(shè)置消費(fèi)者組名字是在代碼中實(shí)現(xiàn)的,如下:
?DefaultMQPushConsumer?consumer?=?? ? ? ? ?new?DefaultMQPushConsumer("stock_consumer_group");
比如我們的庫(kù)存系統(tǒng)提供了2臺(tái)機(jī)器,每臺(tái)機(jī)器上的消費(fèi)者組名字都是stock_consumer_group,那么這2臺(tái)機(jī)器就是一個(gè)消費(fèi)者組。
大體結(jié)構(gòu)如上圖所示,那么當(dāng)訂單系統(tǒng)發(fā)送消息到OrderTopic中后,庫(kù)存系統(tǒng)和積分系統(tǒng)是如何進(jìn)行消費(fèi)的呢?
默認(rèn)情況下,這條消息發(fā)送到Broker后,庫(kù)存系統(tǒng)和積分系統(tǒng)都會(huì)拉取這條消息,而且?guī)齑嫦到y(tǒng)的兩臺(tái)機(jī)器中只有一臺(tái)會(huì)消費(fèi)到這條消息,積分系統(tǒng)也一樣。
這就是消費(fèi)組的概念,不同的系統(tǒng)設(shè)置不同的消費(fèi)組,如果不同的消費(fèi)組訂閱了同一個(gè)Topic,那么對(duì)于Topic中的一條消息,每個(gè)消費(fèi)組都會(huì)獲取到這條消息。
集群模式和廣播模式
接下來我們思考一個(gè)問題,對(duì)于消費(fèi)者組而言,當(dāng)它獲取到一條消息后,假設(shè)消費(fèi)者組內(nèi)有多臺(tái)機(jī)器,那么到底是只有一臺(tái)機(jī)器獲取到消息,還是所有機(jī)器都獲取到消息呢?
這其實(shí)是消費(fèi)的兩種模式,集群模式和廣播模式。
默認(rèn)情況下我們都是使用的集群模式,也就是說消費(fèi)者組收到消息后,只有其中的一臺(tái)機(jī)器會(huì)接收到消息。
我們可以手動(dòng)指定為廣播模式。
consumer.setMessageModel(MessageModel.BROADCASTING)指定為廣播模式后,消費(fèi)者組內(nèi)的每臺(tái)機(jī)器都會(huì)收到這條消息。
具體要根據(jù)業(yè)務(wù)場(chǎng)景選擇消費(fèi)模式。
MessageQueue與消費(fèi)者的關(guān)系
接著我們想一下,對(duì)于一個(gè)Topic下的多個(gè)MessageQueue,消費(fèi)者組中的多臺(tái)機(jī)器是如何消費(fèi)的呢?
這部分內(nèi)容底層實(shí)現(xiàn)是很復(fù)雜的,我們可以簡(jiǎn)單的理解為它會(huì)均勻的將多個(gè)MessageQueue分配給消費(fèi)者組中的多臺(tái)機(jī)器消費(fèi)。
舉個(gè)例子,假如我們的OrderTopic有四個(gè)MessageQueue,這4個(gè)MessageQueue分布在兩臺(tái)MasterBroker上,每個(gè)MasterBroker上有兩個(gè)MessageQueue。
然后庫(kù)存系統(tǒng)作為一個(gè)消費(fèi)者組有兩臺(tái)機(jī)器,那么最好的分配方式就是每臺(tái)消費(fèi)者機(jī)器負(fù)責(zé)兩個(gè)MessageQueue,這樣就實(shí)現(xiàn)了機(jī)器的負(fù)載消費(fèi),示意圖如下:
所以我們可以大致的認(rèn)為,一個(gè)Topic中的多個(gè)MessageQueue會(huì)被均勻的分布給一個(gè)消費(fèi)者組中的多臺(tái)機(jī)器進(jìn)行消費(fèi),這里要注意一點(diǎn),一個(gè)MessageQueue只能被一臺(tái)消費(fèi)者機(jī)器消費(fèi),但是一臺(tái)消費(fèi)者機(jī)器可以同時(shí)負(fù)責(zé)處理多個(gè)MessageQueue。
那么當(dāng)消費(fèi)者組中的機(jī)器數(shù)量發(fā)生變化時(shí),是怎么處理的。
機(jī)器數(shù)量發(fā)生變化一般就兩種情況,一種是有機(jī)器宕機(jī)了,另一種是增加機(jī)器進(jìn)行集群擴(kuò)容了。
其實(shí)這種情況下是會(huì)進(jìn)行rebalance環(huán)節(jié)的,也就是會(huì)重新分配每個(gè)消費(fèi)者機(jī)器要處理的MessageQueue。
Push模式和Pull模式
不知道小伙伴們還記不記得,在之前的文章RocketMQ的發(fā)送模式和消費(fèi)模式中,我們已經(jīng)用代碼說明了消費(fèi)者的兩種消費(fèi)模式:Push和Pull,當(dāng)時(shí)只提供了Push消費(fèi)的代碼,而沒有提供Pull消費(fèi)的代碼。
其實(shí)這兩種模式本質(zhì)上是一樣的,都是消費(fèi)者主動(dòng)發(fā)出請(qǐng)求到Broker上拉取消息。
Push模式的底層也是通過消費(fèi)者主動(dòng)拉取的方式來實(shí)現(xiàn)的,只不過它的名字叫Push而已,意思是Broker盡可能實(shí)時(shí)的推送消息給消費(fèi)者。
我們一般在使用RocketMQ的時(shí)候,消費(fèi)模式基本都是使用的Push模式,因?yàn)镻ull模式真的使用起來代碼特別復(fù)雜,而且Push模式的底層還是Pull模式,只是對(duì)時(shí)效性有了更好的支持。
Push模式大體實(shí)現(xiàn)思路是這樣的:當(dāng)消費(fèi)者發(fā)送請(qǐng)求到Broker拉取消息的時(shí)候,如果有新的消息可以消費(fèi),會(huì)立馬返回消息到消費(fèi)者進(jìn)行消費(fèi),消費(fèi)后會(huì)接著發(fā)送請(qǐng)求到Broker拉取消息。
也就說Push模式下,處理完一批消息后會(huì)理解再發(fā)送請(qǐng)求給Broker拉取下一批消息,所以時(shí)效性更好,看起來就像是Broker在實(shí)時(shí)推送消息。
當(dāng)請(qǐng)求發(fā)送到Broker發(fā)現(xiàn)沒有需要消費(fèi)的消息時(shí),就會(huì)讓請(qǐng)求線程掛起,默認(rèn)掛起15秒,然后會(huì)有另一個(gè)后臺(tái)線程每隔一段時(shí)間判斷一下是否有新消息需要消費(fèi),一旦發(fā)現(xiàn)了新的消息,就會(huì)去喚醒掛起的線程,將消息返回給消費(fèi)者進(jìn)行消費(fèi),然后消費(fèi)完畢再次發(fā)送請(qǐng)求拉取消息。
這一部分的源碼實(shí)現(xiàn)是很復(fù)雜的,我們只要了解它的核心思路就可以了。就算是Push模式,本質(zhì)上也是對(duì)Pull模式的一種封裝。
Broker如何讀取消息返回給消費(fèi)者
接下來我們來聊聊Broker是如何讀取消息返回給消費(fèi)者的。之前的文章深入研究Broker是如何持久化的中我們已經(jīng)知道了Broker是如何持久化消息的,小伙伴們可以復(fù)習(xí)一下。
那么當(dāng)消費(fèi)者發(fā)送請(qǐng)求到Broker中拉取消息時(shí),假設(shè)是第一次拉取,就會(huì)從MessageQueue中的第一條消息開始拉取。
如何定位到第一條消息的位置呢,首先Broker會(huì)找到MessageQueue對(duì)應(yīng)的ConsumerQueue,從里面找到這條消息的offset,然后通過offset去CommitLog中讀取消息數(shù)據(jù),把消息返回給消費(fèi)者。
當(dāng)消費(fèi)者消費(fèi)完這條消息后,會(huì)提交一個(gè)消費(fèi)的進(jìn)度給Broker,Broker會(huì)記錄下一個(gè)ConsumerOffset來標(biāo)記我們的消費(fèi)進(jìn)度。
下次消費(fèi)者再去這個(gè)MessageQueue中拉取消息時(shí),就會(huì)從記錄的消費(fèi)位置繼續(xù)拉取消息,而不用從頭獲取了。
總結(jié)
好了,到這里本篇文章就結(jié)束了。
今天主要和大家一起討論了一下RocketMQ消費(fèi)者的拉取和消費(fèi)過程,也是國(guó)慶假期后的第一篇文章。
沒有從國(guó)慶中收回心的小伙伴們(ps:王子也一樣沒有進(jìn)入狀態(tài)(`?ω?′))就與王子一起通過學(xué)習(xí)找回狀態(tài)吧。
往期文章推薦:
什么是消息中間件?主要作用是什么?
常見的消息中間件有哪些?你們是怎么進(jìn)行技術(shù)選型的?
你懂RocketMQ 的架構(gòu)原理嗎?
聊一聊RocketMQ的注冊(cè)中心NameServer
Broker的主從架構(gòu)是怎么實(shí)現(xiàn)的?
RocketMQ生產(chǎn)部署架構(gòu)如何設(shè)計(jì)
RabbitMQ和Kafka的高可用集群原理
RocketMQ的發(fā)送模式和消費(fèi)模式
討論一下秒殺系統(tǒng)的技術(shù)難點(diǎn)與解決方案
秒殺系統(tǒng)中的扣減庫(kù)存和流量削峰
深入研究RocketMQ生產(chǎn)者發(fā)送消息的底層原理
深入研究Broker是如何持久化的
Dledger是如何實(shí)現(xiàn)主從自動(dòng)切換的
長(zhǎng)按識(shí)別二維碼,了解更多
總結(jié)
以上是生活随笔為你收集整理的rocketmq新扩容的broker没有tps_深入研究RocketMQ消费者是如何获取消息的的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解耦与人类行为 (完整)
- 下一篇: 余宏德:Sun所有的核心技术都是开放的