事务消息应用场景、实现原理与项目实战
作者:丁威
活動中心場景介紹
在電商系統上線初期,往往會進行一些“拉新”活動,例如活動部門提出新用戶注冊送積分、送優惠券活動。
基于分布式、微服務的設計理念,通常的架構設計(子系統交互)如下圖所示:
其核心系統介紹如下:
- 賬戶中心
提供用戶登錄、用戶注冊等服務,一個新用戶注冊時,向 MQ 服務器中的 USER_REGISTER 主題發送一條消息,主流程結束,與送積分,送優惠券等過程解耦。 - 優惠券(券系統)
提供發放優惠券、使用優惠券等與券相關的基礎服務。 - 積分中心
提供積分相關的服務,例如積分贈送、積分消費、積分查詢等基礎服務。 - 送積分服務(消費者)
訂閱 MQ,按照規則決定是否需要贈送積分,如果需要則調用積分相關的基礎接口,完成積分的發放。 - 送優惠券(消費者)
訂閱 MQ,按照規則決定是否需要贈送優惠券,如果需要則調用券系統相關的基礎接口,完成優惠券的發放。
上面的架構設計非常優雅,但并不是無懈可擊,如果新用戶注冊成功,但消息發送到 MQ 失敗,或者消息成功發送到 MQ,但發送完 MQ 后系統出現異常導致用戶注冊失敗又該如何呢?
上面的問題其實就是典型的分布式事務問題:即如何保證用戶注冊(數據庫操作)與 MQ 消息發送這兩個分布式操作的一致性。
RocketMQ 事務消息閃亮登場。
事務消息實現原理
一言以蔽之:RocketMQ 事務消息要解決的問題是消息發送與業務的一致性,其解決思路:二階段提交與事務狀態回查,其具體實現流程如下圖所示:
其核心設計理念:
- 應用程序開啟一個數據庫事務,進行數據庫操作,并且在事務中發送一條 PREPARE 消息,PREPARE 消息發送成功后通知應用程序記錄本地事務狀態,然后提交本地事務。
- RocketMQ 在收到類型為 PREPARE 的消息時,首先備份消息的原主題與原消息消費隊列,然后將消息存儲在主題為 RMQ_SYS_TRANS_HALF_TOPIC 的消息隊列中,故 PREPARE 的消息是不會被客戶端消費的。
- Broker 消息服務器開啟一個定時任務處理 RMQ_SYS_TRANS_HALF_TOPIC 中的消息,會每隔指定時間向消息發送者發起事務狀態查詢請求 ,詢問消息發送者客戶端本地事務是否成功,然后根據回查狀態決定是提交還是回滾,即對處于 PREPARE 狀態進行提交或回滾操作。
- 發送者如果明確得知事務成功,則可以返回 COMMIT,服務端會提交該條消息,具體操作是恢復原消息的主題與隊列,重新發送到 Broker,消費端感知后消費。
- 發送者如果無法明確得知事務狀態,則返回 UNOWN,此時服務端會等待一定時間后再次向發送者詢問,默認詢問 15 次。
- 發送者如果非常明確得知事務失敗,則可以返回 ROLLBACK。
在具體實踐中,消息發送者在無法獲取事務狀態時不要武斷的返回 ROLLBACK,而是要返回 UNOWN,讓服務端定時重試回查,說明如下:
在將 PREPARE 消息發送到 Broker 后,服務端發起事務查詢時本地事務可能還未提交,為了避免無效的事務回查機制,RocketMQ 通常至少在收到 PREPARE 消息 6s 后才會發起第一次事務回查,可通過 transactionTimeOut 配置。故客戶端在實現事務回查時無法證明事務狀態時不應該返回 ROLLBACK,而是返回 UNOWN。
事務消息實戰
光說不練假把式,接下來以一個新用戶注冊送優惠券的場景來詳細介紹如何使用事務消息。
項目模塊職責說明如下:
事務消息的核心代碼組裝在 transaction-service,其核心類圖如下:
其中核心要點如下:
- UserServiceImpl
Dubbo 接口業務實現類,類似 MVC 的控制層,在這里做一些參數驗證,但不執行具體的業務邏輯,只是發送一條事務消息到 MQ。 - UserRegTransactionListener
事務監聽器,在 executeLocalTransaction 方法中執行業務邏輯,數據庫本地事務加在該方法。
接下來展示其核心代碼,全部源碼已上傳到 github 倉庫。
倉庫地址:https://github.com/dingwpmz/rocketmq-learning。
UserServiceImpl 核心實現
UserServiceImpl 的核心要點如下:
- 首先應該對參數進行校驗、業務邏輯進行校驗,如果不滿足業務條件,會發送一些無效消息到 MQ,雖然不會造成業務異常,但會消耗性能。
- 發送事務消息,建議對消息設置 Key,Key 的值可以用業務處理流水號(可唯一表示該業務操作)或者核心業務字段(例如訂單編號)。
- 業務入口類可通過事務消息發送狀態來判斷業務是否失敗。
UserRegTransactionListener 核心實現
事務監聽器需要實現執行本地事務與事務回查兩個接口。
1、實現 executeLocalTransaction
首先需要實現 executeLocalTransaction 方法,執行本地事務,其代碼如下圖所示:
其中幾個關鍵點說明如下:
- 在該方法上添加數據庫事務標簽。
- 執行業務邏輯,示例 Demo 只是將用戶數據存儲到數據庫。
- 如果業務執行失敗,可明確告知需要回滾,上層調用方也可根據 ROLLBACK_MESSAGE 進行相應的處理。
- 如果業務成功,不建議直接返回 COMMIT,而是建議返回 UNKNOW,因為該方法盡管在方法最后一行,但可能發生斷電等異常情況,數據庫并沒有成功。
2、實現 checkLocalTransaction
其次需要實現事務狀態回查,用來 RocketMQ 服務端感知事務是否成功,其實現原理如下圖所示:
其實現關鍵點如下:
- 如果能明確得知本地事務成功,則返回 COMMIT_MESSAGE
- 如該不能明確得知本地事務成功,不能返回 ROLLBACK_MESSAGE,而是返回 UNKNOW,等待服務端下一次事務回查(不會立即觸發),服務端默認回查 15 次,如果 15 次都得到 UNKNOW,則會回滾該消息。
代碼獲取
上文只是將事務消息的核心代碼加以解讀,并重點闡述每個步驟的實現關鍵點,筆者基于 SpringBoot,嘗試結合場景學習 RocketMQ 的使用技巧,其代碼上傳到了 github 倉庫:https://github.com/dingwpmz/rocketmq-learning。
點擊跳轉到代碼倉庫。
原文鏈接:https://developer.aliyun.com/article/783534?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的事务消息应用场景、实现原理与项目实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 函数计算助力闲鱼构建云端一体化变成模式
- 下一篇: 算法专家解读 | 开放搜索教育搜题能力和