RocketMQ原理解析-producer 4.发送分布式事物消息
2019獨角獸企業重金招聘Python工程師標準>>>
RocketMQ原理解析-producer 4.發送分布式事物消息 博客分類: MQ為什么消息要具備事務能力
還是比較清晰的。簡單的說 就是在你業務邏輯過程中,需要發送一條消息給訂閱消息的人,但是期望是 此邏輯過程完全成功完成之后才能使訂閱者收到消息。
業務邏輯過程 假設是這樣的:
邏輯部分a-->發消息給MQ-->邏輯部分b
假設我們在發送消息給MQ之后執行邏輯部分b時產生了異常,那如果MQ不具備事務消息能力時,訂閱者也收到了消息。這是我們不希望見到的。
分布式事務基礎概念
rocketmq具備事務能力的demo
參見TransactionProducerDemo.java
向producer注冊的TransactionCheckListener實現并沒有用,因為返回LocalTransactionState.UNKNOW狀態時,在3.2.6版本中,是不支持此狀態下回調TransactionCheckListener的,具體參見以下兩個issue。
事務消息 LocalTransactionState.UNKNOW 狀態下不回查 #221
開源版本支持事務消息嗎 #364
測試過程中發現返回UNKNOW狀態是不能正確達到期望的,但是返回ROLLBACK_MESSAGE狀態還是能達到期望的。
實現分析入口
這個實現的入口還是比較容易找的,只要搜尋ROLLBACK_MESSAGE這個變量的引用即可發現。順著搜索查看,其實很容易發現,客戶端在收到業務邏輯返回的事務狀態時會發送一條結束事務的指令給broker。
// com.alibaba.rocketmq.client.impl.MQClientAPIImpl.endTransactionOneway(String, EndTransactionRequestHeader, String, long) 871行 RemotingCommand request =RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, requestHeader);按broker對外部指令的常規做法,一般會有一個Processor與之對應。是EndTransactionProcessor,看BrokerController374行其注冊的地方,沒錯。
EndTransactionProcessor分析(broker側)
如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.ROLLBACK_MESSAGE時,EndTransactionProcessor會清空message的body的置成null,queueOffset也不會更新,那么consumer就收不到消息了。
//--EndTransactionProcessor.processRequest 200行-- if (MessageSysFlag.TransactionRollbackType == requestHeader.getCommitOrRollback()) {msgInner.setBody(null); }如果LocalTransactionExecuter.executeLocalTransactionBranch返回LocalTransactionState.COMMIT_MESSAGE,那么EndTransactionProcessor則會照常put message。
事務消息分為兩個階段,prepare階段與commit階段。prepare階段的消息會寫入store,只是寫完之后的queueOffset(邏輯位置)為0(commit階段寫完消息后的queueOffset就不是0了。);
?
?
// -- com.alibaba.rocketmq.store.CommitLog.DefaultAppendMessageCallback.doAppend(long, ByteBuffer, int, Object) 1002行 -- final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queue case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType:queueOffset = 0L;break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default:break;待分析問題列表:
1. prepare階段已經將消息發了過去,commit的時候是否還會再發送一次消息?
2. rollback的時候是否會將prepare的消息刪除?
?
http://www.cnblogs.com/simoncook/p/6478196.html
?
分布式事物是基于二階段提交的
1)??????一階段,向broker發送一條prepared的消息,返回消息的offset即消息地址commitLog中消息偏移量。Prepared狀態消息不被消費
發送消息ok,執行本地事物分支, 本地事物方法需要實現rocketmq的回調接口2)2)2) LocalTransactionExecuter,處理本地事物邏輯返回處理的事物狀態LocalTransactionState
3)? 二階段,處理完本地事物中業務得到事物狀態, 根據offset查找到commitLog中的prepared消息,設置消息狀態commitType或者rollbackType, 讓后將信息添加到commitLog中, 其實二階段生成了兩條消息
?
?
?
事物消息發送
http://blog.csdn.net/quhongwei_zhanqiu/article/details/39142389/
?
轉載于:https://my.oschina.net/xiaominmin/blog/1597808
總結
以上是生活随笔為你收集整理的RocketMQ原理解析-producer 4.发送分布式事物消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑不能安装虚拟机--解决办法
- 下一篇: Java 获得方法调用者名称