探秘RocketMQ源码——Series1:Producer视角看事务消息
1. 前言
Apache RocketMQ作為廣為人知的開源消息中間件,誕生于阿里巴巴,于2016年捐贈給了Apache。從RocketMQ 4.0到如今最新的v4.7.1,不論是在阿里巴巴內部還是外部社區,都贏得了廣泛的關注和好評。
出于興趣和工作的需要,近期本人對RocketMQ 4.7.1的部分代碼進行了研讀,其間產生了很多困惑,也收獲了更多的啟發。
本文將站在發送方視角,通過閱讀RocketMQ Producer源碼,來分析在事務消息發送中RocketMQ是如何工作的。需要說明的是,本文所貼代碼,均來自4.7.1版本的RocketMQ源碼。本文中所討論的發送,僅指從Producer發送到Broker的過程,并不包含Broker將消息投遞到Consumer的過程。
2. 宏觀概覽
RocketMQ事務消息發送流程:
圖1
結合源碼來看,RocketMQ的事務消息TransactionMQProducer的sendMessageInTransaction方法,實際調用了DefaultMQProducerImpl的sendMessageInTransaction方法。我們進入sendMessageInTransaction方法,整個事務消息的發送流程清晰可見:
首先,做發送前檢查,并填入必要參數,包括設prepare事務消息。
源碼清單-1
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter localTransactionExecuter, final Object arg)throws MQClientException {TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) {throw new MQClientException("tranExecutor is null", null);}// ignore DelayTimeLevel parameterif (msg.getDelayTimeLevel() != 0) {MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);}Validators.checkMessage(msg, this.defaultMQProducer);SendResult sendResult = null;MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());進入發送處理流程:
源碼清單-2
try {sendResult = this.send(msg);} catch (Exception e) {throw new MQClientException("send message Exception", e);}根據broker返回的處理結果決策本地事務是否執行,半消息發送成功則開始本地事務執行:
源碼清單-3
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;Throwable localException = null;switch (sendResult.getSendStatus()) {case SEND_OK: {try {if (sendResult.getTransactionId() != null) {msg.putUserProperty("__transactionId__", sendResult.getTransactionId());}String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);if (null != transactionId && !"".equals(transactionId)) {msg.setTransactionId(transactionId);}if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);} else if (transactionListener != null) { log.debug("Used new transaction API");localTransactionState = transactionListener.executeLocalTransaction(msg, arg); }if (null == localTransactionState) {localTransactionState = LocalTransactionState.UNKNOW;}if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {log.info("executeLocalTransactionBranch return {}", localTransactionState);log.info(msg.toString());}} catch (Throwable e) {log.info("executeLocalTransactionBranch exception", e);log.info(msg.toString());localException = e;}}break;case FLUSH_DISK_TIMEOUT:case FLUSH_SLAVE_TIMEOUT:case SLAVE_NOT_AVAILABLE: // 當備broker狀態不可用時,半消息要回滾,不執行本地事務localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;break;default:break;}本地事務執行結束,根據本地事務狀態進行二階段處理:
源碼清單-4
try {this.endTransaction(sendResult, localTransactionState, localException);} catch (Exception e) {log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);}// 組裝發送結果// ...return transactionSendResult; }接下來,我們深入每個階段代碼分析。
3. 深扒內幕
3.1 一階段發送
重點分析send方法。進入send方法后,我們發現,RocketMQ的事務消息的一階段,使用了SYNC同步模式:
源碼清單-5
public SendResult send(Message msg,long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }這一點很容易理解,畢竟事務消息是要根據一階段發送結果來決定要不要執行本地事務的,所以一定要阻塞等待broker的ack。
我們進入DefaultMQProducerImpl.java中去看sendDefaultImpl方法的實現,通過讀這個方法的代碼,來嘗試了解在事務消息的一階段發送過程中producer的行為。 值得注意的是,這個方法并非為事務消息定制,甚至不是為SYNC同步模式定制的,因此讀懂了這段代碼,基本可以對RocketMQ的消息發送機制有了一個較為全面的認識。
這段代碼邏輯非常通暢,不忍切片。為了節省篇幅,將代碼中較為繁雜但信息量不大的部分以注釋代替,盡可能保留流程的完整性。個人認為較為重要或是容易被忽略的部分,以注釋標出,后文還有部分細節的詳細解讀。
源碼清單-6
private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();// 一、消息有效性校驗。見后文Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;// 獲取當前topic的發送路由信息,主要是要broker,如果沒找到則從namesrv獲取TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;// 二、發送重試機制。見后文int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {// 第一次發送是mq == null, 之后都是有broker信息的String lastBrokerName = null == mq ? null : mq.getBrokerName();// 三、rocketmq發送消息時如何選擇隊列?——broker異常規避機制 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);if (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}// 發送核心代碼sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();// rocketmq 選擇 broker 時的規避機制,開啟 sendLatencyFaultEnable == true 才生效this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {// 四、RocketMQ的三種CommunicationMode。見后文case ASYNC: // 異步模式return null;case ONEWAY: // 單向模式return null;case SYNC: // 同步模式if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {// ...// 自動重試} catch (MQClientException e) {// ...// 自動重試} catch (MQBrokerException e) {// ...// 僅返回碼==NOT_IN_CURRENT_UNIT==205 時自動重試// 其他情況不重試,拋異常} catch (InterruptedException e) {// ...// 不重試,拋異常}} else {break;}}if (sendResult != null) {return sendResult;}// 組裝返回的info信息,最后以MQClientException拋出// ... ...// 超時場景拋RemotingTooMuchRequestExceptionif (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}// 填充MQClientException異常信息// ...}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }3.1.1 消息有效性校驗
源碼清單-7
Validators.checkMessage(msg, this.defaultMQProducer);在此方法中校驗消息的有效性,包括對topic和消息體的校驗。topic的命名必須符合規范,且避免使用內置的系統消息TOPIC。消息體長度 > 0 && 消息體長度 <= 102410244 = 4M 。
源碼清單-8
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)throws MQClientException {if (null == msg) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");}// topicValidators.checkTopic(msg.getTopic());Validators.isNotAllowedSendTopic(msg.getTopic());// bodyif (null == msg.getBody()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");}if (0 == msg.getBody().length) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");}if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());} }3.1.2 發送重試機制
Producer在消息發送不成功時,會自動重試,最多發送次數 = retryTimesWhenSendFailed + 1 = 3次 。
值得注意的是,并非所有異常情況都會重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會自動重試:
1)發生RemotingException,MQClientException兩種異常之一時。
2)發生MQBrokerException異常,且ResponseCode是NOT_IN_CURRENT_UNIT = 205時。
3)SYNC模式下,未發生異常且發送結果狀態非 SEND_OK。
在每次發送消息之前,會先檢查是否在前面這兩步就已經耗時超長(超時時長默認3000ms),若是,則不再繼續發送并且直接返回超時,不再重試。這里說明了2個問題:
1)producer內部自動重試對業務應用而言是無感知的,應用看到的發送耗時是包含所有重試的耗時在內的;
2)一旦超時意味著本次消息發送已經以失敗告終,原因是超時。這個信息最后會以RemotingTooMuchRequestException的形式拋出。
這里需要指出的是,在RocketMQ官方文檔中指出,發送超時時長是10s,即10000ms,網上許多人對rocketMQ的超時時間解讀也認為是10s。然而代碼中卻明明白白寫著3000ms,最終我debug之后確認,默認超時時間確實是3000ms。這里也建議RocketMQ團隊對文檔進行確認,如確有誤,還是早日更正為好。
圖2
3.1.3 broker的異常規避機制
源碼清單-8
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);這行代碼是發送前選擇queue的過程。
這里涉及RocketMQ消息發送高可用的的一個核心機制,latencyFaultTolerance。這個機制是Producer負載均衡的一部分,通過sendLatencyFaultEnable的值來控制,默認是false關閉狀態,不啟動broker故障延遲機制,值為true時啟用broker故障延遲機制,可由Producer主動打開。
選擇隊列時,開啟異常規避機制,則根據broker的工作狀態避免選擇當前狀態不佳的broker代理,不健康的broker會在一段時間內被規避,不開啟異常規避機制時,則按順序選取下一個隊列,但在重試場景下會盡量選擇不同于上次發送broker的queue。每次消息發送都會通過updateFaultItem方法來維護broker的狀態信息。
源碼清單-9
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {if (this.sendLatencyFaultEnable) {// 計算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個比30s小的延遲值,再按下標判斷規避的周期,若30s,則是10min規避;// 否則,按上一次發送耗時來決定規避時長;long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);} }深入到selectOneMessageQueue方法內部一探究竟:
源碼清單-10
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {// 開啟異常規避try {int index = tpInfo.getSendWhichQueue().getAndIncrement();for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;// 按順序取下一個message queue作為發送的queueMessageQueue mq = tpInfo.getMessageQueueList().get(pos);// 當前queue所在的broker可用,且與上一個queue的broker相同,// 或者第一次發送,則使用這個queueif (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))return mq;}}final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {mq.setBrokerName(notBestBroker);mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);}return mq;} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}// 不開啟異常規避,則隨機自增選擇Queuereturn tpInfo.selectOneMessageQueue(lastBrokerName); }3.1.4 RocketMQ的三種CommunicationMode
源碼清單-11
public enum CommunicationMode {SYNC,ASYNC,ONEWAY, }以上三種模式指的都是消息從發送方到達broker的階段,不包含broker將消息投遞給訂閱方的過程。
三種模式的發送方式的差異:
- 單向模式:ONEWAY。消息發送方只管發送,并不關心broker處理的結果如何。這種模式下,由于處理流程少,發送耗時非常小,吞吐量大,但不能保證消息可靠不丟,常用于流量巨大但不重要的消息場景,例如心跳發送等。
- 異步模式:ASYNC。消息發送方發送消息到broker后,無需等待broker處理,拿到的是null的返回值,而由一個異步的線程來做消息處理,處理完成后以回調的形式告訴發送方發送結果。異步處理時如有異常,返回發送方失敗結果之前,會經過內部重試(默認3次,發送方不感知)。這種模式下,發送方等待時長較小,吞吐量較大,消息可靠,用于流量大但重要的消息場景。
- 同步模式:SYNC。消息發送方需等待broker處理完成并明確返回成功或失敗,在消息發送方拿到消息發送失敗的結果之前,也會經歷過內部重試(默認3次,發送方不感知)。這種模式下,發送方會阻塞等待消息處理結果,等待時長較長,消息可靠,用于流量不大但重要的消息場景。需要強調的是,事務消息的一階段半事務消息的處理是同步模式。
在sendKernelImpl方法中也可以看到具體的實現差異。ONEWAY模式最為簡單,不做任何處理。負責發送的sendMessage方法參數中,相比同步模式,異步模式多了回調方法、包含topic發送路由元信息的topicPublishInfo、包含發送broker信息的instance、包含發送隊列信息的producer、重試次數。另外,異步模式下,會對有壓縮的消息先做copy。
源碼清單-12
switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC:long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細過程:
圖3
3.2 二階段發送
源碼清單-3體現了本地事務的執行,localTransactionState將本地事務執行結果與事務消息二階段的發送關聯起來。
值得注意的是,如果一階段的發送結果是SLAVE_NOT_AVAILABLE,即備broker不可用時,也會將localTransactionState置為Rollback,此時將不會執行本地事務。之后由endTransaction方法負責二階段提交,見源碼清單-4。具體到endTransaction的實現:
源碼清單-13
public void endTransaction(final SendResult sendResult,final LocalTransactionState localTransactionState,final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {final MessageId id;if (sendResult.getOffsetMsgId() != null) {id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());} else {id = MessageDecoder.decodeMessageId(sendResult.getMsgId());}String transactionId = sendResult.getTransactionId();final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();requestHeader.setTransactionId(transactionId);requestHeader.setCommitLogOffset(id.getOffset());switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);break;case UNKNOW:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);break;default:break;}requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());requestHeader.setMsgId(sendResult.getMsgId());String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;// 采用oneway的方式發送二階段消息this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,this.defaultMQProducer.getSendMsgTimeout()); }在二階段發送時,之所以用oneway的方式發送,個人理解這正是因為事務消息有一個特殊的可靠機制——回查。
3.3 消息回復
當Broker經過了一個特定的時間,發現依然沒有得到事務消息的二階段是否要提交或者回滾的確切信息,Broker不知道Producer發生了什么情況(可能producer掛了,也可能producer發了commit但網絡抖動丟了,也可能...),于是主動發起回查。
事務消息的回查機制,更多的是在broker端的體現。RocketMQ的broker以Half消息、Op消息、真實消息三個不同的topic來將不同發送階段的事務消息進行了隔離,使得Consumer只能看到最終確認commit需要投遞出去的消息。其中詳細的實現邏輯在本文中暫不多贅述,后續可另開一篇專門來從Broker視角來解讀。
回到Producer的視角,當收到了Broker的回查請求,Producer將根據消息檢查本地事務狀態,根據結果決定提交或回滾,這就要求Producer必須指定回查實現,以備不時之需。
當然,正常情況下,并不推薦主動發送UNKNOW狀態,這個狀態毫無疑問會給broker帶來額外回查開銷,只在出現不可預知的異常情況時才啟動回查機制,是一種比較合理的選擇。
另外,4.7.1版本的事務回查并非無限回查,而是最多回查15次:
源碼清單-14
/*** The maximum number of times the message was checked, if exceed this value, this message will be discarded.*/ @ImportantField private int transactionCheckMax = 15;附錄
官方給出Producer的默認參數如下(其中超時時長的參數,在前文中也已經提到,debug的結果是默認3000ms,并非10000ms):
圖4
RocketMQ作為一款優秀的開源消息中間件,有很多開發者基于它做了二次開發,例如螞蟻集團商業化產品SOFAStack MQ消息隊列,就是基于RocketMQ內核進行的再次開發的金融級消息中間件,在消息管控、透明運維等方面做了大量優秀的工作。
愿RocketMQ在社區廣大開發者的共創共建之下,能夠不斷發展壯大,迸發更強的生命力。
我們是阿里云智能全球技術服務-SRE團隊,我們致力成為一個以技術為基礎、面向服務、保障業務系統高可用的工程師團隊;提供專業、體系化的SRE服務,幫助廣大客戶更好地使用云、基于云構建更加穩定可靠的業務系統,提升業務穩定性。我們期望能夠分享更多幫助企業客戶上云、用好云,讓客戶云上業務運行更加穩定可靠的技術,您可用釘釘掃描下方二維碼,加入阿里云SRE技術學院釘釘圈子,和更多云上人交流關于云平臺的那些事。
原文鏈接:https://developer.aliyun.com/article/783843?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的探秘RocketMQ源码——Series1:Producer视角看事务消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于写好文章的3个心法和5点技巧
- 下一篇: MQTT 轻量版实例发布,满足更多移动互