RocketMQ的负载均衡
在了解了RocketMQ的發送與接收后,也好奇RocketMQ內部是如何處理好生產端、消費端的負載均衡的,下面通過分析源碼、查閱相關文檔資料以及結合自己的理解,做了下歸納總結。
RocketMQ的消息負載均衡都是下放到Client端來實現的,具體可細分為2塊:發送負載(Producer端)、消費負載(Consumer端)。
?
1、發送負載
1.1 路由信息
消息生產者Producer作為客戶端發送消息時候,需要根據消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒有則更新路由信息會從NameServer上重新拉取,同時Producer會默認每隔30s向NameServer拉取一次路由信息。
1.2 選擇隊列
1.2.1 默認方式(sendLatencyFaultEnable 開關關閉)
生產者端發送消息時,會根據Topic信息(每條消息都必須指定有Topic信息),從TopicPublishInfo中的messageQueueList中選擇一個隊列(MessageQueue)進行發送消息。隨機遞增式的輪詢,每個生產者都通過ThreadLocal維護自己的一套下標index,初始化時產生隨機數生成下標,后續每次都遞增加1后對隊列個數取模,從而獲取對應下標的messageQueue。
1.2.2 Broker故障延遲方式(sendLatencyFaultEnable 開關打開)
在隨機遞增取模的基礎上,結合消息失敗延遲策略,過濾掉暫時認為不可用的Broker的消息隊列。
消息失敗延遲策略的算法在MQFaultStrategy上實現(MQFaultStrategy也被稱為失敗延遲策略實現的門面類),其中2個重要的參數 latencyMax、notAvailableDuration(單位都是毫秒)。
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};這2個參數如何結合實現延遲的呢?
latencyMax,在發送消息后,根據本次消息的發送耗時 currentLatency,從latencyMax數組最后一個值往前找,直到第一個比currentLatency小的值,其對應的下標為currentIdx,則可設置Broker的不可用時長為notAvailableDuration[currentIdx],調用門面類updateFaultItem方法進行更新,以此達到退避的效果。
舉個例子,如果請求的latency為3300L,則currentLatency=5,對應的不可用時長為notAvailableDuration[5]=180000L,也即本次記錄broker需要退避的時長180秒。
該延遲機制(latencyFaultTolerance)也是消費者端實現高可用的核心所在。
?
2、消費負載
這里主要講消費端的集群消費模式下的處理(另一種模式是廣播模式)。
2.1 消息獲取模型概述
目前客戶端與服務端(Broker)之間有兩種模式:推模式、拉模式。
這里的推模式是基于拉模式進行了封裝,也即通過長輪詢的方式來達到兼具Pull與Push的優點。在服務端收到客戶端的請求后,會進行查詢,如果隊列里沒有數據,此時服務端先掛起,不著急返回,等待一定時間(默認5s)后,會再進一步繼續查詢,當一直未查詢到結果并超過重試次數后返回空結果(比較適合在客戶端連接數量可控的場景中)。
PS,RocketMQ的前身,第一代的Notify主要使用了推模型,解決了事務消息。第二代的MetaQ則主要使用了拉模型,解決了順序消息和海量堆積的問題。所以一個優秀的項目其實都是在不斷進化演變中的。
2.2 消費者隊列如何負載
消息消費隊列在同一消費組不同消費者之間的負載均衡,其核心設計理念是在一個消息消費隊列在同一時間只允許被同一消費組內的一個消費者消費,一個消息消費者能同時消費多個消息隊列。
在RocketMQ中,消息隊列的負載均衡是由客戶端啟動MQClientInstance實例部分時,觸發負載均衡服務線程(具體由RebalanceService線程實現),默認每20s執行一次。
底層實現均衡的邏輯是在RebalanceImpl類的rebalanceByTopic()方法中。代碼如下:
/*** 消費負載均衡核心方法** @param topic 待重均衡主題* @param isOrder*/ private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}/** 集群模式 */case CLUSTERING: {/** 1、獲取該topic下的所有mq消費隊列 */Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);/** 2、獲取該topic、消費者分組下的所有消費者id */List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);/** 3、獲取消息隊列分配策略 */AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;/** 4、開始給當前消費者分配消費隊列 */List<MessageQueue> allocateResult = null;try {allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}/** 5、重均衡后,更新快照隊列信息 */boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;} }具體過程解釋(針對集群模式):
1、獲取該topic下的所有mq消息隊列;
2、獲取該topic、消費者分組下的所有消費者id;
3、校驗步驟1/2中任意一個結果,如果結果為空則跳過不做處理;否則進入步驟4;
4、獲取消息隊列分配策略;
目前RocketMQ提供了6種分配算法,默認使用消息隊列的平均分配算法(AllocateMessageQueueAveragely),也推薦使用這種。
平均算法舉例說明:假設有8個隊列,q1,q2,……,q8,有3個消費者c1,c2,c3,則在平均分配算法下,各消費者的分配隊列如下:
c1:q1,q2,q3
c2:q4,q5,q6
c3:q7,q8
(也因此可以看出,當消費者數量大于隊列數量時,則會存在消費者無法分配到隊列的情況)
RocketMQ提供的6種分配算法5、重均衡后,更新快照隊列信息(ProcessQueueTable)
此時調用RebalanceImpl#updateProcessQueueTableInRebalance()進行處理
假設本次通過上面幾個步驟分配后得到的隊列集合(mqSet)為mq1,mq2,mq3,mq4,在更新ProcessQueueTable中,會拿已分配到的隊列與當前的消費隊列快照(Queue consumption snapshot)比對。
變量解釋說明:
processQueueTable:當前消費者負載的消息隊列緩存表,結構是 ConcurrentMap<MessageQueue, ProcessQueue>
隊列的比對情況(3種)以及對應執行的操作如下:
1)當前快照隊列集合存在,新分配隊列集合不存在(假設為上圖processQueueTable標注的紅色部分,e1,e2)
執行剔除e1,e2的操作,將狀態標識字段 droped 置為 true,這樣,該 ProcessQueue 中的消息將不會再被消費。
2)當前快照隊列集合存在,新分配隊列集合也存在(假設為上圖processQueueTable標注的綠色部分,e3,e4)
Pull模式直接忽略不做調整;Push模式下判斷processQueueTable中的該2個ProcessQueue是否已過期,已過期則移除。
3)當前快照隊列集合不存在,新分配隊列集合存在(假設為上圖processQueueTable標注的白色部分,e5,e6);
本次新增的消息隊列,添加入processQueueTable中。
至此,完成了消費端的負載均衡。
?
?
?
?
?
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的RocketMQ的负载均衡的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 众辰变频器nz200t参数_【变频器 上
- 下一篇: 几款常用的OCR技术软件 新3