Fescar TC-commit流程
生活随笔
收集整理的這篇文章主要介紹了
Fescar TC-commit流程
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
開篇
?這篇文章的目的主要是講解Fescar TC執(zhí)行commit的流程,目的是講解清楚commit流程中的一些步驟。
?遺憾的是因為commit本身Fescar的分支事務注冊上報,如果事先不了解Fescar的分支事務,有些邏輯理解起來會有一些奇怪,對于branchSession本身還未了解,所以只能單獨講解commit流程。
背景
說明:
- 分支事務中數(shù)據(jù)的 本地鎖 由本地事務管理,在分支事務 Phase1 結束時釋放。
同時,隨著本地事務結束,連接 也得以釋放。 - 分支事務中數(shù)據(jù)的 全局鎖 在事務協(xié)調器側管理,在決議 Phase2 全局提交時,全局鎖馬上可以釋放。只有在決議全局回滾的情況下,全局鎖 才被持有至分支的 Phase2 結束。
這個設計,極大地減少了分支事務對資源(數(shù)據(jù)和連接)的鎖定時間,給整體并發(fā)和吞吐的提升提供了基礎。
這里需要重點指出的是:Phase1階段的commit()操作是各個分支事務本地的事務操作。Phase2階段的操作是全局的commit()和rollback()。TC-commit流程指的就是Phase2階段。
TC commit流程介紹
- 1.根據(jù)transactionId查找begin階段生成的GlobalSession對象。
- 2.對GlobalSession對象進行清理操作,刪除分支事務的鎖并清理GlobalSession對象。
- 3.TC通知所有RM(各分支事務的資源管理器)進行全局提交操作(doGlobalCommit)。
TC commit源碼分析
public class DefaultCoordinator extends AbstractTCInboundHandlerimplements TransactionMessageHandler, ResourceManagerInbound {@Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext)throws TransactionException {response.setGlobalStatus(core.commit(XID.generateXID(request.getTransactionId())));} }說明:
- DefaultCoordinator的doGlobalCommit()作為全局回滾入口
- core.commit()根據(jù)XID去執(zhí)行全局commit()操作。
Commit 主流程
public class DefaultCore implements Core {public GlobalStatus commit(String xid) throws TransactionException {// 1.查找GlobalSessionGlobalSession globalSession = SessionHolder.findGlobalSession(XID.getTransactionId(xid));if (globalSession == null) {return GlobalStatus.Finished;}GlobalStatus status = globalSession.getStatus();// 2.關閉全局session并執(zhí)行清理工作globalSession.closeAndClean(); // Highlight: Firstly, close the session, then no more branch can be registered.// 3.執(zhí)行GlobalCommit通知動作if (status == GlobalStatus.Begin) {if (globalSession.canBeCommittedAsync()) {asyncCommit(globalSession);} else {doGlobalCommit(globalSession, false);}}// 返回GlobalCommit后的狀態(tài)return globalSession.getStatus();} }說明:
- DefaultCore是全局回滾的核心邏輯。
- SessionHolder.findGlobalSession查找全局的GlobalSession對象。
- GlobalSession執(zhí)行closeAndClean操作。
- DefaultCore執(zhí)行doGlobalCommit通知TC執(zhí)行全局回滾操作。
查找GlobalSession
public class SessionHolder {public static GlobalSession findGlobalSession(Long transactionId) throws TransactionException {return getRootSessionManager().findGlobalSession(transactionId);} }public class DefaultSessionManager extends AbstractSessionManager {}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();public GlobalSession findGlobalSession(Long transactionId) throws TransactionException {return sessionMap.get(transactionId);} }說明:
- findGlobalSession()方法從DefaultSessionManager當中獲取GlobalSession。
- DefaultSessionManager的父類AbstractSessionManager的findGlobalSession從sessionMap獲取GlobalSession對象。
GlobalSession的closeAndClean
public class GlobalSession implements SessionLifecycle, SessionStorable {public void closeAndClean() throws TransactionException {close();clean();}public void close() throws TransactionException {if (active) {for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onClose(this);}}}private void clean() throws TransactionException {for (BranchSession branchSession : branchSessions) {branchSession.unlock();}} }public class DefaultSessionManager extends AbstractSessionManager {} public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {public void onClose(GlobalSession globalSession) throws TransactionException {globalSession.setActive(false);} }說明:
- GlobalSession的執(zhí)行closeAndClean操作,先執(zhí)行close再執(zhí)行clean。
- lifecycleListener.onClose()執(zhí)行DefaultSessionManager的onClose()。
- DefaultSessionManager的onClose()把設置active標識為false。
- clean()操作對所有的分支事務branchSession釋放鎖。這部分邏輯比較復雜單獨列出。
BranchSession的unlock
public class BranchSession implements Lockable, Comparable<BranchSession>, SessionStorable {public boolean unlock() throws TransactionException {if (lockHolder.size() == 0) {return true;}Iterator<Map.Entry<Map<String, Long>, Set<String>>> it = lockHolder.entrySet().iterator();while (it.hasNext()) {Map.Entry<Map<String, Long>, Set<String>> entry = it.next();Map<String, Long> bucket = entry.getKey();Set<String> keys = entry.getValue();synchronized (bucket) {for (String key : keys) {Long v = bucket.get(key);if (v == null) {continue;}if (v.longValue() == getTransactionId()) {bucket.remove(key);}}}}lockHolder.clear();return true;} }說明:
- BranchSession的unlock()操作對BranchSession 進行清理。
- BranchSession內(nèi)部的數(shù)據(jù)由于暫未閱讀該部分代碼所以暫時不能解釋清楚。
- 全局清除lockHolder。
TC執(zhí)行GlobalCommit
public class DefaultCore implements Core {public void doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {// 遍歷所有的BranchSession執(zhí)行回滾操作for (BranchSession branchSession : globalSession.getSortedBranches()) {BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {continue;}try {BranchStatus branchStatus = resourceManagerInbound.branchCommit(XID.generateXID(branchSession.getTransactionId()), branchSession.getBranchId(),branchSession.getResourceId(), branchSession.getApplicationData());switch (branchStatus) {case PhaseTwo_Committed:globalSession.removeBranch(branchSession);continue;case PhaseTwo_CommitFailed_Unretryable:if (globalSession.canBeCommittedAsync()) {LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);continue;} else {globalSession.changeStatus(GlobalStatus.CommitFailed);globalSession.end();LOGGER.error("Finally, failed to commit global[{}] since branch[{}] commit failed",globalSession.getTransactionId(), branchSession.getBranchId());return;}default:if (!retrying) {queueToRetryCommit(globalSession);return;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("By [{}], failed to commit branch {}", branchStatus, branchSession);continue;} else {LOGGER.error("Failed to commit global[{}] since branch[{}] commit failed, will retry later.",globalSession.getTransactionId(), branchSession.getBranchId());return;}}} catch (Exception ex) {LOGGER.info("Exception committing branch {}", branchSession, ex);if (!retrying) {queueToRetryCommit(globalSession);if (ex instanceof TransactionException) {throw (TransactionException) ex;} else {throw new TransactionException(ex);}}}}if (globalSession.hasBranch()) {return;}globalSession.changeStatus(GlobalStatus.Committed);globalSession.end();} }說明:
- 對所有的BranchSession執(zhí)行branchCommit通知。
- 針對branchCommit返回狀態(tài)進行判斷,有一些邏輯在里面,后續(xù)閱讀了Branch相關資料后再補充狀態(tài)轉移圖。
總結
以上是生活随笔為你收集整理的Fescar TC-commit流程的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 欧几里得及扩展欧几里得算法
- 下一篇: 从一个实例,一窥docker进程管理