Hasor JDBC 的难关,嵌套事务处理思路
為什么80%的碼農都做不了架構師?>>> ??
? ? 本文存屬提醒我自己不要忘記的事情。也是向大家展示 Hasor 對于 JDBC 方面即將的又一個重大的進步。目前該方案還在實施中。
? ? 前段時間閑著沒事分析了下 Spring JDBC ,覺得 Spring JDBC 的設計實在是太絕了,于是就拷貝了 Spring JDBC 的關鍵接口,然后開始了遷移工作,最后 Hasor - JDBC 問世。
? ? 可是 Hasor JDBC 至今仍有一個重大問題沒有搞定,那就是事務控制。
? ? 雖然可以通過暴露 Connection 簡單的加裝一個 Aop 攔截器在配合 @Tar... 注解可以完成任務。但是我覺得我有點完美主義了。最近腦袋里一直都是 Spring 那套事務控制體系,我有種沖動在 Hasor 中重新實現這一套事務控制體系。
? ? 簡介一下 Spring 事務方面的內容,Spring 對于事務方面支持 7種事務傳播屬性。我用這個接口表示它們:
/*** 事務傳播屬性* @version : 2013-10-30* @author 趙永春(zyc@hasor.net)*/ public enum TransactionBehavior {/*** 加入已有事務* <p><i><b>釋意</b></i>:嘗試加入已經存在的事務中,如果沒有則開啟一個新的事務。*/PROPAGATION_REQUIRED,/*** 獨立事務* <p><i><b>釋意</b></i>:將掛起當前存在的事務,然后開啟一個獨立的事務進行處理(如果存在的話)。* 并且開啟一個全新的事務,新事務與已存在的事務之間彼此沒有關系。*/RROPAGATION_REQUIRES_NEW,/*** 嵌套事務* <p><i><b>釋意</b></i>:在當前事務中開啟一個子事務。* 如果事務回滾將連同上一級事務一同回滾(當主事務提交或回滾,子事務也會提交或回滾)* <p><i><b>注意</b></i>:需要驅動支持保存點。*/PROPAGATION_NESTED,/*** 跟隨環境* <p><i><b>釋意</b></i>:如果當前沒有事務存在,就以非事務方式執行;如果有,就使用當前事務。*/PROPAGATION_SUPPORTS,/*** 非事務方式* <p><i><b>釋意</b></i>:如果當前沒有事務存在,就以非事務方式執行;如果有,就將當前事務掛起。* */PROPAGATION_NOT_SUPPORTED,/*** 排除事務* <p><i><b>釋意</b></i>:如果當前沒有事務存在,就以非事務方式執行;如果有,就拋出異常。*/PROPAGATION_NEVER,/*** 強制要求事務* <p><i><b>釋意</b></i>:如果當前沒有事務存在,就拋出異常;如果有,就使用當前事務。*/PROPAGATION_MANDATORY, }? ? 由于分析過 Spring 有關事務控制部分的代碼,因此著手實現起來細節問題倒不是很難。Hasor 目前遇到的問題是結構設計上的難題。
? ? 首先 Hasor-JDBC 是一個幾乎是完全獨立的項目,甚至它都不需要 Hasor-Core 的支持。這就意味著,Hasor-JDBC 是獨立的。
? ? 其次我在設計 Hasor 時候一直保持著,穩定依賴原則,包與包之間的依賴完全隔離。這也為設計 Hasor-JDBC 的結構提出了要求。
? ? 之所以這樣的緣由是這樣的,首先我在設計 Hasor-JDBC 時候并不想像 Spring JDBC 那樣,讓?JdbcTemplate 部分和事務控制部分產生代碼依賴。因此需要拆解它們。
? ? 正因為如此 Hasor-JDBC 在 v0.0.1 版本時可以率先發布?JdbcTemplate 部分功能。而事務控制則可以交給插件體系完成。
? ? 這樣一來 Hasor 的松散設計會讓 Hasor 穩定很多,萬一事務控制過于復雜,開發者可以有選擇的關閉這個插件,從而避免相關邏輯代碼判斷提高運行效率。而這一切在 Spring JDBC 中是不可能的,Spring JDBC 在兩者之間有著一些代碼依賴。
? ? 為了達到這樣的目的,我為 Hasor-JDBC 建立了一個?DataSourceUtils。通過它的靜態方法 申請/釋放?Connection 對象。這樣一來 JDBC 數據庫操作部分就和事務完全隔離開了。
? ? 事務控制部分和JDBC 操作部分之間只需要通過?DataSourceUtils 上注冊的?DataSourceHelper 進行耦合。
? ? 默認情況下提供一個基于線程綁定的 DataSourceHelper 工具類,事務控制可以擴展這個類重新注冊它。
------------------------------
? ? 我先把負責實現上面 7 個事務傳播屬性的關鍵代碼貼上來分享給大家,由于代碼約有300行,這部分代碼在本文最后奉獻上,它這個類在 Hasor 中算是比較龐大的了,大家可以先看一下后面要介紹的實現原理然后在看關鍵代碼。
? ?首先為了支持多數據源下的嵌套事務管理,事務管理器是只針對一個數據源的。
? ?其次,由于事務可以嵌套,因此需要一個“事務?!毕冗M后出的原則處理每一個事務請求。這是由于考慮到事務“原子性”的問題才這樣設計的。
? ? 比方說:如果連續開啟了 3個事務。當遞交第一個事務時,無論后面兩個事務是否已經遞交都需要遞交?;貪L也是如此。至于為什么一定要使用“事務?!钡南冗M后出去實現,其主要原因是事務可能位于多個 Connection 中的緣故(詳見事務傳播屬性)。
? ? 此外還有掛起和恢復事務,這需要與線程綁定。
? ? 中和起來設計這個事務控制方案還是比較棘手的,不過可以借助下面這張表述事務鏈的圖來解釋。
? ??借助 AOP 思想,如果發生嵌套事務就為每一層事務創建一個事務狀態,然后將事務狀態放入“事務棧“。
? ? 由于事務是和線程綁定的,這就可以保證事務在多線程下的調用安全,不會發生跨線程問題。
? ? 位于事務棧中非頂端事務如果出現?commit/rollback 時,可以借助事務棧完成原子操作。
? ? 事務狀態中需要保存具體操作數據庫的那個 JDBC Connection接口。
? ? 每次創建事務狀態時,如果是新申請的數據庫連接,那么就設置其一個 NewConn 標志。這個標志可以用于處理嵌套事務中遞交時不是將整個事務遞交而是遞交一個事務保存點。
? ? 如果傳播屬性要求的是獨立事務,那么可以將當前事務的Connection 保存起來,然后重新申請一個再次綁定到線程上。已完成傳播屬性要求,當這個獨立事務處理完成之后,在將保存的 Connection 重新與當前線程綁定。
? ? 如果是跟隨環境的事務傳播屬性,則整個事務控制可以什么都不做,如果是不需要事務則可以通過判斷當前連接是否為 autoCommit 來進行后續處理。
? ? 上面是分析 Spring 事務控制時關鍵點的實現策略,下面是 我在 Hasor 中依照這個思想設計的事務管理器關鍵代碼,由于是半成品。下面這段代碼只能用于展示具體處理每一個不同傳播屬性時的細節。它還需要和整個 Hasor-JDBC 事務控制體系串起來才可以運行,現在和大家分享它們:
/*** 某一個數據源的事務管理器* * <p><b><i>事務棧:</i></b>* <p>事務管理器允許使用不同的傳播屬性反復開啟新的事務。所有被開啟的事務在正確處置(commit,rollback)* 它們之前都會按照先后順序依次壓入事務管理器的“事務?!敝?。一旦有事務被處理(commit,rollback)這個事務才會被從事務棧中彈出。* <p>倘若被彈出的事務(A)并不是棧頂的事務,那么在事務(A)被處理(commit,rollback)時會優先處理自事務(A)以后開啟的其它事務。* * @version : 2013-10-30* @author 趙永春(zyc@hasor.net)*/ public abstract class AbstractPlatformTransactionManager implements TransactionManager {private int defaultTimeout = -1;private LinkedList<TransactionStatus> tStatusStack = new LinkedList<TransactionStatus>();public boolean hasTransaction() {return !tStatusStack.isEmpty();}public boolean isTopTransaction(TransactionStatus status) {if (tStatusStack.isEmpty())return false;return this.tStatusStack.peek() == status;}/**開啟事務*/public final TransactionStatus getTransaction(TransactionBehavior behavior) throws TransactionDataAccessException {Hasor.assertIsNotNull(behavior);return getTransaction(behavior, TransactionLevel.ISOLATION_DEFAULT);};public final TransactionStatus getTransaction(TransactionBehavior behavior, TransactionLevel level) throws TransactionDataAccessException {Hasor.assertIsNotNull(behavior);Hasor.assertIsNotNull(level);Object transaction = doGetTransaction();獲取目前事務對象AbstractTransactionStatus defStatus = null;TODO new AbstractTransactionStatus(behavior, level, transaction);/*-------------------------------------------------------------| 環境已經存在事務|| PROPAGATION_REQUIRED :加入已有事務(不處理)| RROPAGATION_REQUIRES_NEW :獨立事務(掛起當前事務,開啟新事務)| PROPAGATION_NESTED :嵌套事務(設置保存點)| PROPAGATION_SUPPORTS :跟隨環境(不處理)| PROPAGATION_NOT_SUPPORTED:非事務方式(僅掛起當前事務)| PROPAGATION_NEVER :排除事務(異常)| PROPAGATION_MANDATORY :強制要求事務(不處理)===============================================================*/if (this.isExistingTransaction(transaction) == true) {/*RROPAGATION_REQUIRES_NEW:獨立事務*/if (behavior == RROPAGATION_REQUIRES_NEW) {this.suspend(transaction, defStatus);/*掛起當前事務*/this.processBegin(transaction, defStatus);/*開啟一個新的事務*/}/*PROPAGATION_NESTED:嵌套事務*/if (behavior == PROPAGATION_NESTED) {defStatus.markHeldSavepoint();/*設置保存點*/}/*PROPAGATION_NOT_SUPPORTED:非事務方式*/if (behavior == PROPAGATION_NOT_SUPPORTED) {this.suspend(transaction, defStatus);/*掛起當前事務*/}/*PROPAGATION_NEVER:排除事務*/if (behavior == PROPAGATION_NEVER)throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");return defStatus;}/*-------------------------------------------------------------| 環境不經存在事務|| PROPAGATION_REQUIRED :加入已有事務(開啟新事務)| RROPAGATION_REQUIRES_NEW :獨立事務(開啟新事務)| PROPAGATION_NESTED :嵌套事務(開啟新事務)| PROPAGATION_SUPPORTS :跟隨環境(不處理)| PROPAGATION_NOT_SUPPORTED:非事務方式(不處理)| PROPAGATION_NEVER :排除事務(不處理)| PROPAGATION_MANDATORY :強制要求事務(異常)===============================================================*//*PROPAGATION_REQUIRED:加入已有事務*/if (behavior == PROPAGATION_REQUIRED ||/*RROPAGATION_REQUIRES_NEW:獨立事務*/behavior == RROPAGATION_REQUIRES_NEW ||/*PROPAGATION_NESTED:嵌套事務*/behavior == PROPAGATION_NESTED) {this.processBegin(transaction, defStatus);/*開啟事務*/}/*PROPAGATION_MANDATORY:強制要求事務*/if (behavior == PROPAGATION_MANDATORY)throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");return defStatus;}/**使用一個新的連接開啟一個新的事務作為當前事務。請確保在調用該方法時候當前不存在事務。*/private void processBegin(Object transaction, AbstractTransactionStatus defStatus) {try {doBegin(transaction, defStatus);this.tStatusStack.push(defStatus);/*入棧*/} catch (SQLException ex) {throw new TransactionDataAccessException("SQL Exception :", ex);}}/**判斷當前事務對象是否已經處于事務中。該方法會用于評估事務傳播屬性的處理方式。*/protected abstract boolean isExistingTransaction(Object transaction);/**在當前連接上開啟一個全新的事務*/protected abstract void doBegin(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;/**遞交事務*/public final void commit(TransactionStatus status) throws TransactionDataAccessException {Object transaction = doGetTransaction();獲取底層維護的當前事務對象AbstractTransactionStatus defStatus = (AbstractTransactionStatus) status;/*已完畢,不需要處理*/if (defStatus.isCompleted())throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");/*回滾情況*/if (defStatus.isRollbackOnly()) {if (Hasor.isDebugLogger())Hasor.logDebug("Transactional code has requested rollback");rollBack(defStatus);return;}/*-------------------------------------------------------------| 1.無論何種傳播形式,遞交事務操作都會將 isCompleted 屬性置為 true。| 2.如果事務狀態中包含一個未處理的保存點。僅遞交保存點,而非遞交整個事務。| 3.事務 isNew 只有為 true 時才真正觸發遞交事務操作。===============================================================*/try {prepareCommit(defStatus);/*如果包含保存點,在遞交事務時只處理保存點*/if (defStatus.hasSavepoint())defStatus.releaseHeldSavepoint();else if (defStatus.isNewConnection())doCommit(transaction, defStatus);} catch (SQLException ex) {rollBack(defStatus);/*遞交失敗,回滾*/throw new TransactionDataAccessException("SQL Exception :", ex);} finally {cleanupAfterCompletion(defStatus);}}/**遞交前的預處理*/private void prepareCommit(AbstractTransactionStatus defStatus) {/*首先預處理的事務必須存在于管理器的事務棧內某一位置中,否則要處理的事務并非來源于該事務管理器。*/if (this.tStatusStack.contains(defStatus) == false)throw new IllegalTransactionStateException("This transaction is not derived from this Manager.");/*-------------------------------------------------------------| 如果預處理的事務并非位于棧頂,則進行彈棧操作。|--------------------------\| T5 ^ <-- pop-up | 假定預處理的事務為 T4,那么:| T4 ^ <-- pop-up | T5 事務會被先遞交,然后是 T4| T3 . <-- defStatus | 接下來就完成了預處理。| T2 || T1 ||--------------------------/|===============================================================*/TransactionStatus inStackStatus = null;while ((inStackStatus = this.tStatusStack.peek()) != defStatus)this.commit(inStackStatus);}/**處理當前底層數據庫連接的事務遞交操作。*/protected abstract void doCommit(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;/**回滾事務*/public final void rollBack(TransactionStatus status) throws TransactionDataAccessException {Object transaction = doGetTransaction();獲取目前事務對象AbstractTransactionStatus defStatus = (AbstractTransactionStatus) status;/*已完畢,不需要處理*/if (defStatus.isCompleted())throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");/*-------------------------------------------------------------| 1.無論何種傳播形式,遞交事務操作都會將 isCompleted 屬性置為 true。| 2.如果事務狀態中包含一個未處理的保存點。僅回滾保存點,而非回滾整個事務。| 3.事務 isNew 只有為 true 時才真正觸發回滾事務操作。===============================================================*/try {prepareRollback(defStatus);/*如果包含保存點,在遞交事務時只處理保存點*/if (defStatus.hasSavepoint())defStatus.rollbackToHeldSavepoint();else if (defStatus.isNewConnection())doRollback(transaction, defStatus);} catch (SQLException ex) {throw new TransactionDataAccessException("SQL Exception :", ex);} finally {cleanupAfterCompletion(defStatus);}}/**回滾前的預處理*/private void prepareRollback(AbstractTransactionStatus defStatus) {/*首先預處理的事務必須存在于管理器的事務棧內某一位置中,否則要處理的事務并非來源于該事務管理器。*/if (this.tStatusStack.contains(defStatus) == false)throw new IllegalTransactionStateException("This transaction is not derived from this Manager.");/*-------------------------------------------------------------| 如果預處理的事務并非位于棧頂,則進行彈棧操作。|--------------------------\| T5 ^ <-- pop-up | 假定預處理的事務為 T4,那么:| T4 ^ <-- pop-up | T5 事務會被先回滾,然后是 T4| T3 . <-- defStatus | 接下來就完成了預處理。| T2 || T1 ||--------------------------/|===============================================================*/TransactionStatus inStackStatus = null;while ((inStackStatus = this.tStatusStack.peek()) != defStatus)this.rollBack(inStackStatus);}/**處理當前底層數據庫連接的事務回滾操作。*/protected abstract void doRollback(Object transaction, AbstractTransactionStatus defStatus) throws SQLException;private static class SuspendedTransactionHolder {public Object transaction = null; /*掛起的底層事務對象*/}/**掛起當前事務*/protected final void suspend(Object transaction, AbstractTransactionStatus defStatus) {try {/*檢查事務是否為棧頂事務*/prepareCheckStack(defStatus);/*創建 SuspendedTransactionHolder 對象,用于保存當前底層數據庫連接以及事務對象*/doSuspend(transaction, defStatus);SuspendedTransactionHolder suspendedHolder = new SuspendedTransactionHolder();suspendedHolder.transaction = transaction;/*掛起的事務對象(來自于底層)*/defStatus.setSuspendHolder(suspendedHolder);} catch (SQLException ex) {throw new TransactionDataAccessException("SQL Exception :", ex);}}/**掛起事務,子類需要重寫該方法掛起transaction事務,并同時清空底層當前數據庫連接,*/protected void doSuspend(Object transaction, AbstractTransactionStatus defStatus) throws SQLException {throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");}/**恢復被掛起的事務,恢復掛起的事務時必須是當前事務請妥善處理當前事務之后在恢復掛起的事務*/protected final void resume(Object transaction, AbstractTransactionStatus defStatus) {if (defStatus.isCompleted() == false)throw new IllegalTransactionStateException("the Transaction has not completed.");try {/*檢查事務是否為棧頂事務*/prepareCheckStack(defStatus);SuspendedTransactionHolder suspendedHolder = (SuspendedTransactionHolder) defStatus.getSuspendedTransactionHolder();doResume(suspendedHolder.transaction, defStatus);} catch (SQLException ex) {throw new TransactionDataAccessException("SQL Exception :", ex);}}/**恢復事務,恢復原本掛起的事務(第一個參數),并使用掛起的狀態恢復當前數據庫連接。*/protected void doResume(Object resumeTransaction, AbstractTransactionStatus defStatus) throws SQLException {throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");}/**檢查正在處理的事務狀態是否位于棧頂,否則拋出異常*/private void prepareCheckStack(AbstractTransactionStatus defStatus) {if (!this.isTopTransaction(defStatus))throw new IllegalTransactionStateException("the Transaction Status is not top in stack.");}/**commit,rollback。之后的清理工作,同時也負責恢復事務和操作事務堆棧。*/private void cleanupAfterCompletion(AbstractTransactionStatus defStatus) {/*清理的事務必須是位于棧頂*/prepareCheckStack(defStatus);/*標記完成*/defStatus.setCompleted();/*恢復掛起的事務*/if (defStatus.getSuspendedTransactionHolder() != null) {if (Hasor.isDebugLogger())Hasor.logDebug("Resuming suspended transaction after completion of inner transaction");resume(defStatus.getSuspendedTransactionHolder(), defStatus);}}/**獲取當前事務管理器中存在的事務對象。*/protected abstract Object doGetTransaction(); }在最后連接一下 @黃勇 的Blog,他這里有一篇文章詳細介紹了 Spring 事務傳播屬性:http://my.oschina.net/huangyong/blog/160012
轉載于:https://my.oschina.net/ta8210/blog/188655
總結
以上是生活随笔為你收集整理的Hasor JDBC 的难关,嵌套事务处理思路的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Devexpress Barmanage
- 下一篇: linux rar工具