rocketmq原理_彻底看懂RocketMQ事务实现原理
面試中經(jīng)常會(huì)問(wèn)到比如RocketMQ的事務(wù)是如何實(shí)現(xiàn)的呢?學(xué)習(xí)框架,我們不僅要熟練使用,更要掌握設(shè)計(jì)及原理,才算熟悉一個(gè)框架。
1 RocketMQ 事務(wù)使用案例
public class CreateOrderService { @Autowired private OrderDao orderDao; @Autowired private ExecutorService executorService; private TransactionMQProducer producer; // 初始化transactionListener 和 producer @Init public void init() throws MQClientException { TransactionListener transactionListener = createTransactionListener(); producer = new TransactionMQProducer("myGroup"); producer.setExecutorService(executorService); producer.setTransactionListener(transactionListener); producer.start(); } // 創(chuàng)建訂單服務(wù)的請(qǐng)求入口 @PUT @RequestMapping(...) public boolean createOrder(@RequestBody CreateOrderRequest request) { // 根據(jù)創(chuàng)建訂單請(qǐng)求創(chuàng)建一條消息 Message msg = createMessage(request); // 發(fā)送事務(wù)消息 SendResult sendResult = producer.sendMessageInTransaction(msg, request); // 返回:事務(wù)是否成功 return sendResult.getSendStatus() == SendStatus.SEND_OK; } private TransactionListener createTransactionListener() { return new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { CreateOrderRequest request = (CreateOrderRequest ) arg; try { // 執(zhí)行本地事務(wù)創(chuàng)建訂單 orderDao.createOrderInDB(request); // 如果沒(méi)拋異常說(shuō)明執(zhí)行成功,提交事務(wù)消息 return LocalTransactionState.COMMIT_MESSAGE; } catch (Throwable t) { // 失敗則直接回滾事務(wù)消息 return LocalTransactionState.ROLLBACK_MESSAGE; } } // 反查本地事務(wù) @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 從消息中獲得訂單ID String orderId = msg.getUserProperty("orderId"); // 去db查詢(xún)訂單號(hào)是否存在,若存在則提交事務(wù) // 若不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回UNKNOW return orderDao.isOrderIdExistsInDB(orderId)? LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW; } }; }}如上案例展示了一個(gè)訂單創(chuàng)建服務(wù),即往db插一條訂單記錄,并發(fā)一條創(chuàng)建訂單的消息,要求寫(xiě)db和發(fā)消息倆個(gè)操作在一個(gè)事務(wù)內(nèi)執(zhí)行。
首先在init()方法中初始化了transactionListener和發(fā)生RocketMQ事務(wù)消息的變量producer。
createOrder()
真正提供創(chuàng)建訂單服務(wù)的方法,根據(jù)請(qǐng)求的參數(shù)創(chuàng)建一條消息,然后調(diào)用 producer發(fā)事務(wù)消息,并返回事務(wù)執(zhí)行結(jié)果。createTransactionListener()
在init()方法中調(diào)用,構(gòu)造實(shí)現(xiàn)RocketMQ的TransactionListener接口的匿名類(lèi),該接口需要實(shí)現(xiàn)如下兩個(gè)方法:executeLocalTransaction:執(zhí)行本地事務(wù),在這里我們直接把訂單數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中,并返回本地事務(wù)的執(zhí)行結(jié)果。
checkLocalTransaction:反查本地事務(wù),上述流程中是在db中查詢(xún)訂單號(hào)是否存在,若存在則提交事務(wù),若不存在,可能本地事務(wù)失敗了,也可能本地事務(wù)還在執(zhí)行,所以返回UNKNOW
這樣便使用RocketMQ的事務(wù)簡(jiǎn)單實(shí)現(xiàn)了一個(gè)創(chuàng)建訂單的分布式事務(wù)。
2 RocketMQ事務(wù)消息實(shí)現(xiàn)原理
2.1 Pro端如何發(fā)事務(wù)消息?
DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; // 給待發(fā)送消息添加屬性,表明是一個(gè)事務(wù)消息(即半消息) MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); // 像發(fā)送普通消息一樣,把這條事務(wù)消息發(fā)往Broker try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 事務(wù)消息發(fā)送成功 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); // 開(kāi)始執(zhí)行本地事務(wù) localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 事務(wù)過(guò)程的最后,給Broker發(fā)送提交或回滾事務(wù)的RPC請(qǐng)求。 try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult;}有事務(wù)反查機(jī)制作兜底,該RPC請(qǐng)求即使失敗或丟失,也不會(huì)影響事務(wù)最終的結(jié)果。最后構(gòu)建事務(wù)消息的發(fā)送結(jié)果,并返回。
2.2 Broker端如何處理事務(wù)消息?
SendMessageProcessor#asyncSendMessage
跟進(jìn)去看看真正處理半消息的業(yè)務(wù)邏輯,這段處理邏輯在類(lèi)
TransactionalMessageBridge
putHalfMessage
parseHalfMessageInner
RocketMQ并非將事務(wù)消息保存至消息中 client 指定的 queue,而是記錄了原始的 topic 和 queue 后,把這個(gè)事務(wù)消息保存在
特殊的內(nèi)部 topic:RMQ_SYS_TRANS_HALF_TOPIC
序號(hào)為 0 的 queue
這套 topic 和 queue 對(duì)消費(fèi)者不可見(jiàn),因此里面的消息也永遠(yuǎn)不會(huì)被消費(fèi)。這就保證在事務(wù)提交成功之前,這個(gè)事務(wù)消息對(duì) Consumer 是消費(fèi)不到的。
2.3 Broker端如何事務(wù)反查?
在Broker的TransactionalMessageCheckService服務(wù)中啟動(dòng)了一個(gè)定時(shí)器,定時(shí)從事務(wù)消息queue中讀出所有待反查的事務(wù)消息。
AbstractTransactionalMessageCheckListener#resolveHalfMsg
針對(duì)每個(gè)需要反查的半消息,Broker會(huì)給對(duì)應(yīng)的Producer發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的RPC請(qǐng)求
AbstractTransactionalMessageCheckListener#sendCheckMessage
Broker2Client#checkProducerTransactionState根據(jù)RPC返回響應(yīng)中的反查結(jié)果,來(lái)決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來(lái)反查。
最后,提交或者回滾事務(wù)。首先把半消息標(biāo)記為已處理
如果是提交事務(wù),就把半消息從半消息隊(duì)列中復(fù)制到該消息真正的topic和queue中
如果是回滾事務(wù),什么都不做
EndTransactionProcessor#processRequest
最后結(jié)束該事務(wù)。
3 總結(jié)
整體實(shí)現(xiàn)流程
RocketMQ是基于兩階段提交來(lái)實(shí)現(xiàn)的事務(wù),把這些事務(wù)消息暫存在一個(gè)特殊的queue中,待事務(wù)提交后再移動(dòng)到業(yè)務(wù)隊(duì)列中。最后,RocketMQ的事務(wù)適用于解決本地事務(wù)和發(fā)消息的數(shù)據(jù)一致性問(wèn)題。
參考
https://juejin.im/post/6844904193526857742
總結(jié)
以上是生活随笔為你收集整理的rocketmq原理_彻底看懂RocketMQ事务实现原理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: android 16 登陆,那些年我们一
- 下一篇: c++ 指针拼接字符串_字符串拼接+和c