【RocketMQ】MQ消息发送总结
RocketMQ是通過DefaultMQProducer進行消息發送的,它實現了MQProducer接口,MQProducer接口中定義了消息發送的方法,方法主要分為三大類:
send同步進行消息發送,向Broker發送消息之后等待響應結果;send異步進行消息發送,向Broker發送消息之后立刻返回,當消息發送成功/失敗之后觸發回調函數;sendOneway單向發送,也是異步消息發送,向Broker發送消息之后立刻返回,但是沒有回調函數;
public interface MQProducer extends MQAdmin {
// 同步發送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
// 異步發送消息,SendCallback為回調函數
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
// 異步發送消息,沒有回調函數
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
}
接下來以同步發送為例,看下生產者發送消息的過程。
生產者發送消息
Topic主題
一般會為同一業務類型設定一個Topic,將不同的業務類型的數據放到不同的Topic中管理,不過主題只是一個邏輯概念,并不是實際的消息容器。
主題內部由多個消息隊列(MessageQueue)組成,消息隊列是消息存儲的實際容器,消息隊列與Kafka的分區(Partition)類似。
獲取主題的發布信息(TopicPublishInfo)
Broker在啟動時會向NameServer發送注冊信息并定時向NameServer發送心跳包,Broker向NameServer注冊信息中包括了該Broker的IP、Name以及Topic的配置信息,
生產者和消費者默認每30s從NameServer更新一次路由信息,可以知道消息所在的Topic分布在哪些Broker上。
生產者有一個主題路由信息表topicPublishInfoTable,緩存從NameServer拉取到的路由信息,它是ConcurrentMap類型的,KEY為topic主題名稱, value為該Topic的發布信息,是TopicPublishInfo類型。
當生產者向Broker發送消息之前,首先需要知道消息所屬的Topic的路由信息,有了Topic的路由信息才能知道Topic分布在哪個Broker上,生產者往哪個Broker上發,而topicPublishInfoTable中記錄了每個主題的相關信息,可以從topicPublishInfoTable中查找Topic的路由信息。
如果從topicPublishInfoTable中查找成功,就可以繼續后續的步驟,如果查找失敗,此時生產者需要從NameServer中查詢該Topic的路由信息:
- 如果查詢成功,會判斷路由信息是否發生了變化,如果發生變化,生產者會更新本地緩存的該Topic的路由信息;
- 如果依舊未查詢到,它會有一個默認的主題,會使用這個默認的主題進行消息發送;
選取消息隊列
前面知道,一個Topic一般由多個消息隊列組成,所以主題的發布信息數據TopicPublishInfo獲取到之后,需要從中選取一個消息隊列,然后獲取此消息隊列所屬的Broker,與Broker通信將消息投遞到對應的消息隊列中。
未啟用故障延遲機制
在每個Topic內部,設置了一個計數器sendWhichQueue用于輪詢從消息隊列集合中選取隊列。
在未啟用故障延遲機制的時候,如果上一次選擇的BrokerName為空,也就是首次發送消息時,處理邏輯如下:
- 對計數器增一;
- 根據計數器的值對消息隊列列表的長度取余得到下標值
pos,從隊列列表中獲取pos位置的元素,以此達到輪詢從消息隊列列表中選擇消息隊列的目的; - 返回第2步中獲取到的消息隊列;
- 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;
如果上一次選擇的BrokerName不為空,表示上次發送消息時就發送給了此Broker,此時的處理邏輯與上面的不同點在第3步,通過從隊列列表中獲取pos位置的元素之后,并沒有直接把選取到的消息隊列返回,而是再增加一個判斷,判斷當前選取到的Broker是否與上次選擇的Broker名稱一致,如果一致會繼續循環,輪詢選擇下一個消息隊列,如果不一致則直接返回:
- 對計數器增一;
- 根據計數器的值對消息隊列列表的長度取余得到下標值
pos,從隊列列表中獲取pos位置的元素; - 對第2步獲取到的消息隊列進行判斷:
- 如果本次選取到的隊列與上次發送消息的Broker一致,回到第1步繼續選擇下一個隊列,如果一直未選出滿足要求的消息隊列,則不作判斷,使用上面的方式輪詢選擇一個隊列返回;
- 如果本次選取到的隊列與上次發送消息的Broker不一致,返回當前的隊列;
- 在調用獲取消息隊列的地方,會記錄本次選擇消息隊列所在的BrokerName;
總結
在未啟用故障延遲機制時,從該消息所屬的Topic下的所有消息隊列集合中,輪詢選擇消息隊列進行發送,如果上一次選擇了某個Broker發送消息,本次將不會再選擇這個Broker,當然如果最后仍未找到滿足要求的消息隊列,將會跳過這個判斷,直接從隊列中輪詢獲取消息隊列返回。
開啟故障延遲機制
在生產者進行發送消息的時候,無論消息是否發送成功與否都會記錄向每個Broker的發送消息的條目信息FaultItem,有一個失敗條目表faultItemTable,faultItemTable記錄了每個Broker對應的失敗條目FaultItem,FaultItem中主要有以下信息:
- name:Broker的名稱;
- currentLatency:延遲時間,可以理解為是本次向該Broker發送消息耗時時間:發送消息結束時間 - 消息發送開始時間;
- startTimestamp:規避故障時間,一般為
當前時間 + 不可用的持續時間,不可用的持續時間有兩種情況,分別為30000ms或者使用currentLatency延遲時間(也就是上次發送消息所用的時間),一般在出現異常的時候,會將不可用的持續時間設置為30000ms,消息正常發送的時候使用currentLatency延遲時間。
設置規避故障時間主要是為了在某個時間段內規避某個Broker,假設向某個Broker發送失敗/或者向此Broker發生消息的耗時比較長,生產者認為此Broker可能暫時處于異常狀態/或者該時間段內此Broker的性能不高,在下次發送消息時盡量規避這個Broker,避免向此Broker上投遞消息。
每次消息發送之后會更新該Broker的失敗條目的處理邏輯如下:
- 根據Broker名稱從
faultItemTable獲取對應的FaultItem對象; - 如果上一步獲取為空,說明之前沒有記錄過該Broker的信息,需要新建對應
FaultItem對象,此時需要設置name、currentLatency延遲時間、startTimestamp規避故障時間; - 如果第1步中獲取到該Broker對應的
FaultItem對象,直接更新里面的currentLatency延遲時間、startTimestamp規避故障時間即可;
接下來看如何使用FaultItem中記錄的信息,來實現故障規避。
使用故障規避,需要啟用故障延遲機制,此時從隊列集合中選擇消息隊列的處理邏輯如下:
對計數器增一;
根據計數器的值對消息隊列列表的長度取余得到下標值
pos,從隊列列表中獲取pos位置的元素,依舊輪詢從消息隊列列表中選擇消息隊列,這兩步與未開啟故障時邏輯一致;選擇出消息隊列之后,會獲取該隊列所在的Broker名稱,上面說到,生產者每次與Broker通信發送消息時,會記錄消息發發送情況,此時可以根據Broker的名稱,從失敗條目表
faultItemTable中獲取該Broker的FaultItem,用來判斷當前選擇的消息隊列是否可用,FaultItem中有一個規避故障時間,來看兩種情況:- 情況一:上次向此Broker發送消息失敗,那么這個時間的值為
發送消息失敗時的時間 + 30000ms,判斷當前時間有沒有超過故障規避設置的時間,如果超過了當前選擇的消息隊列可用,那么就會返回當前選擇的這個消息隊列,如果未超過表示該Broker暫時不可用所以不能使用當前選擇的消息隊列,需要回到第1步繼續選擇下一個隊列; - 情況二:上次向此Broker發送消息成功,那么這個時間的值為
發送消息失敗時的時間 + 上次發送消息的耗時時間,判斷當前時間有沒有超過故障規避設置的時間,這個依賴于上次發送消息的耗時時間的長短,如果耗時比較長,可能還未超過規避時間,本次就不能選擇向此Broker發送消息同樣需要回到第1步選擇下一個隊列,如果耗時比較短,可能現在已經過了規避時間,那么就可以選擇當前的消息隊列返回;
- 情況一:上次向此Broker發送消息失敗,那么這個時間的值為
如果進行到這一步,以上步驟沒有選擇到可用的消息隊列,此時需要通過以下方式再次選擇消息隊列:
(1)遍歷faultItemTable失敗條目表,將每一個Broker對應的FaultItem加入一個LinkedList鏈表;
(2)對鏈表進行排序,FaultItem實現Comparable就是為了在這里進行排序,值小的排在鏈表前面,值的大小判斷規則如下:- 對比是否有超過規避時間的Broker(調用
isAvailable可以判斷),如果有表示值比較小,會排在前面,之后被優先選擇,如果所有的Broker都為超過規避時間,進入下一個對比條件; - 對比
currentLatency的值,值越小排序的時候越靠前,也就是盡量選擇發送消息耗時短的那個Broker,如果值相等進入下一個對比條件; - 對比
startTimestamp的值,同樣值越小排序的時候越靠前,盡量選擇規避時間較短的那個Broker;
(3)經過以上的規則進行排序后,會根據鏈表的總大小,計算一個中間值:
- 如果half值小于等于0,取鏈表中的第一個元素;
- 如果half值大于0,從前half個元素中輪詢選擇元素;
(4)在鏈表中越靠前的元素,表示發送消息的延遲越低,在選擇時優先級就越高,如果half值小于等于0的時候,取鏈表中的第一個元素,half值大于0的時候,處于鏈表前half個的Broker,延遲都是相對較低的,此時輪詢從前haft個Broker中選擇一個Broker,總之經過這么多處理就是為了選擇一個延遲相對較低的Broker;
(5)獲取上一步選取到的那個Broker,獲取Broker可寫的隊列數量:
- 如果數量小于0表示該Broker不可用,需要移除然后進入下一步;
- 如果數量大于0,表示該Broker可用,然后重新輪詢從消息隊列列表中選取一個隊列,將本次選取到的消息隊列所屬的Broker設置為第(4)步中選取到的那個Broker,也就是將這個消息隊列及Topic重置到新的Broker中(認為原本所屬的Broker不可用,需要設置一個新的Broker),然后返回當前選取的消息隊列;
- 對比是否有超過規避時間的Broker(調用
如果經過第4步依舊未選出可用的消息隊列,那么就跳過故障延遲機制,直接從該Topic的所有隊列中輪詢選擇一個返回;
總結
故障延遲機制指的是在發送消息時記錄每個Broker的耗時時間,如果某個Broker發生故障,但是生產者還未感知(NameServer 30s檢測一次心跳,有可能Broker已經發生故障但未到檢測時間,所以會有一定的延遲),用耗時時間做為一個故障規避時間(也可以是30000ms),此時消息會發送失敗,在重試或者下次選擇消息隊列的時候,如果在規避時間內,可以避免再次選擇到此Broker,以此達到故障規避的目的。
如果某個Topic所在的所有Broker都處于不可用狀態,此時盡量選擇延遲時間最短、規避時間最短(排序后的失敗條目中靠前的元素)的Broker作為此次發送消息消息的Broker。
對應的相關源碼可參考:
參考
總結
以上是生活随笔為你收集整理的【RocketMQ】MQ消息发送总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: AT_agc064_a题解
- 下一篇: 一幅图像为f=[1 4 7;2 5 8;