RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析
文章目錄
- 前言
- 流程解析
- 總結
前言
在上一篇博客中我們了解到,PullMessageService線程主要是負責從pullRequestQueue中獲得拉取消息請求并進行請求處理的。
PullMessageService#run
//在拉取消息請求隊列中拉取消息請求 PullRequest pullRequest = this.pullRequestQueue.take(); //處理請求 this.pullMessage(pullRequest);但是pullRequestQueue中的PullRequest是從哪來的呢?是什么時候由誰進行填充的呢?
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();流程解析
通過pullRequestQueue中的PullRequest添加操作這個線索一步步跟蹤下去,最后得出了pullRequestQueue的調用鏈:
RebalanceService#run↓ MQClientInstance#doRebalance↓ DefaultMQPullConsumerImpl#doRebalance↓ RebalanceImpl#doRebalance↓ RebalanceImpl#rebalanceByTopic↓ RebalanceImpl#updateProcessQueueTableInRebalance↓ RebalancePushImpl#dispatchPullRequest↓ DefaultMQPushConsumerImpl#executePullRequestImmediately↓ PullMessageService#executePullRequestImmediately由上面的調用鏈我們可以看到,向PullMessageService中的LinkedBlockingQueue<PullRequest>添加拉取消息請求的是RebalanceService#run,接下來我們對這個源頭RebalanceService進行解析。
RebalanceService
public class RebalanceService extends ServiceThread {//等待時間private static long waitInterval =Long.parseLong(System.getProperty("rocketmq.client.rebalance.waitInterval", "20000"));private final InternalLogger log = ClientLogger.getLog();//消息客戶端private final MQClientInstance mqClientFactory;public RebalanceService(MQClientInstance mqClientFactory) {this.mqClientFactory = mqClientFactory;}@Overridepublic void run() {log.info(this.getServiceName() + " service started");while (!this.isStopped()) {this.waitForRunning(waitInterval);//進入mqClientFactorythis.mqClientFactory.doRebalance();}log.info(this.getServiceName() + " service end");} }RebalanceService是一個服務線程,其run方法主要是調用MQClientInstance#doRebalance進行重新負載。
MQClientInstance
private final RebalanceService rebalanceService;public void start() throws MQClientException {synchronized (this) {switch (this.serviceState) {case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;...// Start rebalance service//負載均衡服務啟動this.rebalanceService.start();}} }MQClientInstance持有一個RebalanceService線程,在start方法中開啟該線程。
MQClientInstance#doRebalance
//循環遍歷每個消費者組中的MQConsumerInner(即DefaultMQPush<Pull>ConsumerImpl)并調用其doRebalance for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {MQConsumerInner impl = entry.getValue();if (impl != null) {try {impl.doRebalance();} catch (Throwable e) {log.error("doRebalance exception", e);}} }DefaultMQPushConsumerImpl#doRebalance
@Override public void doRebalance() {if (!this.pause) {this.rebalanceImpl.doRebalance(this.isConsumeOrderly());} }經過多層的對象委托,終于來到實現消息負載分發的核心。
RebalanceImpl
//消息處理隊列 protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<MessageQueue, ProcessQueue>(64); //Topic的隊列信息 protected final ConcurrentMap<String/* topic */, Set<MessageQueue>> topicSubscribeInfoTable =new ConcurrentHashMap<String, Set<MessageQueue>>(); //Topic訂閱信息 protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =new ConcurrentHashMap<String, SubscriptionData>(); //消費者組 protected String consumerGroup; //消費模式 protected MessageModel messageModel; //隊列分配策略 protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; //MQ客戶端 protected MQClientInstance mQClientFactory;RebalanceImpl#doRebalance
/*** 遍歷訂閱消息對每個主題的訂閱的隊列進行重新負載* @param isOrder 是否是順序消息*/ public void doRebalance(final boolean isOrder) {Map<String, SubscriptionData> subTable = this.getSubscriptionInner();if (subTable != null) {for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {final String topic = entry.getKey();try {//根據Topic來對隊列進行重新負載this.rebalanceByTopic(topic, isOrder);} catch (Throwable e) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("rebalanceByTopic Exception", e);}}}}//如果消息隊列的Topic不在訂閱的主題中-刪除該消息隊列this.truncateMessageQueueNotMyTopic(); }RebalanceImpl#rebalanceByTopic
//從主題訂閱消息緩存表中獲取主題的隊列信息 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //查找該主題訂閱組所有的消費者ID List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); //消費模式 switch (messageModel) {case BROADCASTING: {... break;}case CLUSTERING: {if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}//對主題的消息隊列和消費者ID進行排序if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);//獲取當前負載均衡策略AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//根據策略對消息隊列進行重新分配allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//重新負載后-對消息消費隊列進行更新-返回消息隊列負載是否發生變化boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break; }Rebalance#updateProcessQueueTableInRebalance
boolean changed = false; //消息隊列負載是否發生變化Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); //遍歷<消息隊列,處理隊列>緩存表 while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();//如果消息隊列不在該Topic處理范圍內if (mq.getTopic().equals(topic)) {//消息隊列已經被分配到其他消費者去消費了-不包含在當前主題的Set<MessageQueue>中if (!mqSet.contains(mq)) {//private volatile boolean dropped;//設置當前處理隊列為被丟棄-及時阻止繼續向該消息處理隊列進行消息拉取pq.setDropped(true);//判斷是否需要移除if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();//發生變化changed = true;log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);}} else if (pq.isPullExpired()) {switch (this.consumeType()) {case CONSUME_ACTIVELY:break;case CONSUME_PASSIVELY:pq.setDropped(true);if (this.removeUnnecessaryMessageQueue(mq, pq)) {it.remove();changed = true;log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it",consumerGroup, mq);}break;default:break;}}} }RebalancePushImpl#removeUnnecessaryMessageQueue
//丟棄消息隊列之前先將消息隊列進行持久化 //保存在本地(LocalFileOffsetStore)/消息服務器Broker(RemoteBrokerOffsetStore) this.defaultMQPushConsumerImpl.getOffsetStore().persist(mq); this.defaultMQPushConsumerImpl.getOffsetStore().removeOffset(mq); //順序消費進入的分支 if (this.defaultMQPushConsumerImpl.isConsumeOrderly()&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {try {if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {try {return this.unlockDelay(mq, pq);} finally {pq.getConsumeLock().unlock();}} else {log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",mq,pq.getTryUnlockTimes());pq.incTryUnlockTimes();}} catch (Exception e) {log.error("removeUnnecessaryMessageQueue Exception", e);}return false; } //暫時只看非順序消息-返回true return true;RebalanceImpl#updateProcessQueueTableInRebalance
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); //遍歷消息隊列 for (MessageQueue mq : mqSet) {if (!this.processQueueTable.containsKey(mq)) {if (isOrder && !this.lock(mq)) {log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);continue;}this.removeDirtyOffset(mq);ProcessQueue pq = new ProcessQueue();long nextOffset = -1L;try {//根據不同的消息消費策略獲取下一次消費的偏移量//CONSUME_FROM_LAST_OFFSET/CONSUME_FROM_FIRST_OFFSET/CONSUME_FROM_TIMESTAMPnextOffset = this.computePullFromWhereWithException(mq);} catch (Exception e) {log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);continue;}if (nextOffset >= 0) {ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);//消息隊列已經存在if (pre != null) {log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);} else {//消息隊列不存在-新添加log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);//封裝拉取請求PullRequestPullRequest pullRequest = new PullRequest();pullRequest.setConsumerGroup(consumerGroup);pullRequest.setNextOffset(nextOffset);pullRequest.setMessageQueue(mq);pullRequest.setProcessQueue(pq);//放入拉取請求列表pullRequestList.add(pullRequest);changed = true;}} else {log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);}} } //分發消息拉取請求 this.dispatchPullRequest(pullRequestList);return changed;RebalancePushImpl#dispatchPullRequest
@Override public void dispatchPullRequest(List<PullRequest> pullRequestList) {//遍歷請求列表for (PullRequest pullRequest : pullRequestList) {//立刻拉取消息this.defaultMQPushConsumerImpl.executePullRequestImmediately(pullRequest);log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);} }DefaultMQPushConsumerImpl#executePullRequestImmediately
public void executePullRequestImmediately(final PullRequest pullRequest) {//將請求丟入PullMessageService線程中this.mQClientFactory.getPullMessageService().executePullRequestImmediately(pullRequest); }PullMessageService
public void executePullRequestImmediately(final PullRequest pullRequest) {try {//放入消息拉取請求隊列中this.pullRequestQueue.put(pullRequest);} catch (InterruptedException e) {log.error("executePullRequestImmediately pullRequestQueue.put", e);} }總結
本文主要解析了消息消費端的負載機制,首先RebalanceService線程啟動,為消息消費者分發消息隊列,每一個MessageQueue消息隊列都回構建一個PullRequest,通過將這個PullRequest放入PullMessageService中的pullRequestQueue,進而喚醒PullMessageService#run,在pullRequestQueue中獲得拉取消息請求并進行處理。從上一篇的的消息拉取分析中我們可以得知,接下來執行DefaultMQPushConsumerImpl#pullMessage,通過網絡遠程調用從Broker中拉取消息,一次最多拉取消息數量默認為32條,然后Broker將拉取的消息進行過濾并封裝后返回。返回之后再回到消息消費端,將消費任務提交到消費者的ConsumerMessageService執行消息的消費。
本文僅作為個人學習使用,如有不足或錯誤請指正!
總結
以上是生活随笔為你收集整理的RocketMQ:消费端的消息消息队列负载均衡与重新发布机制源码解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: RocketMQ:Consumer概述及
- 下一篇: RocketMQ:消息ACK机制源码解析