RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)
在閱讀本文前,若您對RocketMQ技術感興趣,請加入?RocketMQ技術交流群
根據上文的描述,發送事務消息的入口為:
TransactionMQProducer#sendMessageInTransaction: public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { // @1 throw new MQClientException("TransactionListener is null", null); } return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); // @2 }代碼@1:如果transactionListener為空,則直接拋出異常。
代碼@2:調用defaultMQProducerImpl的sendMessageInTransaction方法。
接下來重點分享sendMessageInTransaction方法
DefaultMQProducerImpl#sendMessageInTransaction public TransactionSendResult sendMessageInTransaction(final Message msg, final TransactionListener tranExecuter, final Object arg) throws MQClientException {Step1:首先先闡述一下參數含義。
- final Message msg:消息
- TransactionListener tranExecuter:事務監聽器
- Object arg:其他附加參數
Step2:在消息屬性中增加兩個屬性:TRAN_MSG,其值為true,表示為事務消息;PGROUP:消息所屬發送者組,然后以同步方式發送消息。在消息發送之前,會先檢查消息的屬性TRAN_MSG,如果存在并且值為true,則通過設置消息系統標記的方式,設置消息為MessageSysFlag.TRANSACTION_PREPARED_TYPE。
DefaultMQProducerImpl#sendKernelImpl final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; }SendMessageProcessor#sendMessage String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (traFlag != null && Boolean.parseBoolean(traFlag)) { if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) { response.setCode(ResponseCode.NO_PERMISSION); response.setRemark( "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); }Step3:Broker端收到客戶端發送消息請求后,判斷消息類型。如果是事務消息,則調用TransactionalMessageService#prepareMessage方法,否則走原先的邏輯,調用MessageStore#putMessage方法將消息存入Broker服務端。
本節重點闡述事務消息的實現原理,故接下來將重點關注prepareMessage方法,如想了解RocketMQ消息存儲相關,可以關注作者源碼分析RocketMQ系列。
step4:事務消息,將調用TransactionalMessageServiceImpl#prepareMessage方法,繼而調用TransactionalMessageBridge#prepareMessage方法。
TransactionalMessageBridge#parseHalfMessageInner public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {return store.putMessage(parseHalfMessageInner(messageInner)); } private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }Step5:備份消息的原主題名稱與原隊列ID,然后取消消息的事務消息標簽,重新設置消息的主題為:RMQ_SYS_TRANS_HALF_TOPIC,隊列ID固定為0。然后調用MessageStore#putMessage方法將消息持久化,這里TransactionalMessageBridge橋接類,就是封裝事務消息的相關流程,最終調用MessageStore完成消息的持久化。消息入庫后,會繼續回到DefaultMQProducerImpl#sendMessageInTransaction,上文的Step2后面,也就是通過同步將消息發送到消息服務端。
注:這是事務消息Prepare狀態的處理邏輯,消息是存儲在消息服務器了,但存儲的并不是原主題,而是RMQ_SYS_TRANS_HALF_TOPIC,故此時消費端是無法消費shen
生產者發送的消息的??吹竭@里,如果對RocketMQ比較熟悉的話,肯定會有一個“定時任務”去取這個主題下的消息,然后則“合適”的時機將消息的主題恢復。
Step6:如果消息發送成功,會回調TransactionListener#executeLocalTransaction方法,執行本地事務,并且返回本地事務狀態為LocalTransactionState,枚舉值如下:
- COMMIT_MESSAGE,
- ROLLBACK_MESSAGE,
- UNKNOW
注意:TransactionListener#executeLocalTransaction是在發送者成功發送PREPARED消息后,會執行本地事務方法,然后返回本地事務狀態;如果PREPARED消息發送失敗,則不會調用TransactionListener#executeLocalTransaction,并且本地事務消息,設置為LocalTransactionState.ROLLBACK_MESSAGE,表示消息需要被回滾。
DefaultMQProducerImpl#sendMessageInTransaction try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }step7:調用endTransaction方法結束事務(提交或回滾)。
DefaultMQProducerImpl#endTransaction EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) {case COMMIT_MESSAGE:requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);break;case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId());step8:組裝結束事務請求,主要參數為:事務ID、事務操作(commitOrRollback)、消費組、消息隊列偏移量、消息ID,fromTransactionCheck,從這里發出的請求,默認為false。Broker端的請求處理器為:EndTransactionProcessor。
step9:EndTransactionProcessor根據事務提交類型:TRANSACTION_COMMIT_TYPE(提交事務)、TRANSACTION_ROLLBACK_TYPE(回滾事務)、TRANSACTION_NOT_TYPE(忽略該請求)。
到目前為止,已詳細梳理了RocketMQ事務消息的發送流程,更加準確的說是Prepare狀態的消息發送流程。具體流程如圖所示:
本文到這里,初步展示了事務消息的發送流程,總的說來,RocketMQ的事務消息發送使用二階段提交思路,首先,在消息發送時,先發送消息類型為Prepread類型的消息,然后在將該消息成功存入到消息服務器后,會回調TransactionListener#executeLocalTransaction,執行本地事務狀態回調函數,然后根據該方法的返回值,結束事務:
1、COMMIT_MESSAGE :提交事務。
2、ROLLBACK_MESSAGE:回滾事務。
3、UNKNOW:未知事務狀態,此時消息服務器(Broker)收到EndTransaction命令時,將不對這種消息做處理,消息還處于Prepared類型,存儲在主題為:RMQ_SYS_TRANS_HALF_TOPIC的隊列中,然后消息發送流程將結束,那這些消息如何提交或回滾呢?
為了實現避免客戶端需要再次發送提交、回滾命令,RocketMQ會采取定時任務將RMQ_SYS_TRANS_HALF_TOPIC中的消息取出,然后回到客戶端,判斷該消息是否需要提交或回滾,來完成事務消息的聲明周期,該部分內容將在下節重點探討。
?
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
轉載于:https://www.cnblogs.com/yunqishequ/p/10239047.html
總結
以上是生活随笔為你收集整理的RocketMQ源码分析之RocketMQ事务消息实现原理上篇(二阶段提交)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Boot 项目的这些文件都
- 下一篇: 张小龙:做 PC 版微信是一种破坏,本来