javascript
Spring Boot Transaction 源码解析(一)
目錄
PlatformTransactionManager
TransactionStatus?
?DefaultTransactionStatus
AbstractPlatformTransactionManager
屬性
getTransaction
handleExistingTransaction
需要由子類實現的方法
commit
processCommit?
需要由子類實現的方法
rollback
processRollback?
需要由子類實現的方法
SuspendedResourcesHolder
事務狀態控制
TransactionSynchronization
TransactionSynchronizationManager
TransactionSynchronizationUtils
在使用Spring事務時,會使用@Transactional注解。@Transactional指定了PlatformTransactionManager。
public @interface Transactional {@AliasFor("transactionManager")String value() default "";/*PlatformTransactionManager*/@AliasFor("value")String transactionManager() default "";Propagation propagation() default Propagation.REQUIRED;/*TransactionAttribute*/Isolation isolation() default Isolation.DEFAULT;int timeout() default TransactionDefinition.TIMEOUT_DEFAULT;boolean readOnly() default false;Class<? extends Throwable>[] rollbackFor() default {};String[] rollbackForClassName() default {};Class<? extends Throwable>[] noRollbackFor() default {};String[] noRollbackForClassName() default {};}PlatformTransactionManager
PlatformTransactionManager接口定義了事務的相關的3個方法,包括獲取事務,提交和回滾。
public interface PlatformTransactionManager extends TransactionManager {TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException;void commit(TransactionStatus status) throws TransactionException;void rollback(TransactionStatus status) throws TransactionException;}上圖為PlatformTransactionManager接口的實現類,其中以DataSourceTransactionManager最為常用。
TransactionStatus?
TransactionStatus用于控制事務的狀態。
public interface TransactionStatus extends TransactionExecution, SavepointManager, Flushable {boolean hasSavepoint();@Overridevoid flush();}public interface TransactionExecution {/**是否一個新事務。 */boolean isNewTransaction();void setRollbackOnly(); //事務一定會回滾,用來代替拋出異常。boolean isRollbackOnly();boolean isCompleted();}public interface SavepointManager {Object createSavepoint() throws TransactionException;void rollbackToSavepoint(Object savepoint) throws TransactionException;void releaseSavepoint(Object savepoint) throws TransactionException;}?DefaultTransactionStatus
@Nullableprivate final Object transaction;private final boolean newTransaction;private final boolean newSynchronization;private final boolean readOnly;private final boolean debug;@Nullableprivate final Object suspendedResources;AbstractPlatformTransactionManager
AbstractPlatformTransactionManager是PlatformTransactionManager抽象實現,主要處理以下幾個工作:
定義了整個的開啟事務,處理事務的骨架。
屬性
private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT; //是否允許嵌套事務private boolean nestedTransactionAllowed = false; //是否檢驗已存在事務private boolean validateExistingTransaction = false; //部分失敗是否導致全局回滾。private boolean globalRollbackOnParticipationFailure = true;private boolean failEarlyOnGlobalRollbackOnly = false; //提交失敗是否回滾。private boolean rollbackOnCommitFailure = false;getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)throws TransactionException {//如果不存在事務定義,則使用默認事務定義。TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());//獲取事務。由子類實現。Object transaction = doGetTransaction();boolean debugEnabled = logger.isDebugEnabled();//判斷事務已存在。if (isExistingTransaction(transaction)) {// 如果事務存在,根據事務傳播特性處理事務。return handleExistingTransaction(def, transaction, debugEnabled);}// 為新事務設置超時。if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());}// 如果不存在事務,并且傳播特性為PROPAGATION_MANDATORY,則拋出異常。if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {throw new IllegalTransactionStateException("No existing transaction found for transaction marked with propagation 'mandatory'");}else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {//掛起事務。SuspendedResourcesHolder suspendedResources = suspend(null);if (debugEnabled) {logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);}try {//判斷是否新的事務同步。boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);//新事務。DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources);//開始事務。doBegin(transaction, def);prepareSynchronization(status, def);return status;}catch (RuntimeException | Error ex) {resume(null, suspendedResources);throw ex;}}else {// Create "empty" transaction: no actual transaction, but potentially synchronization.if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {logger.warn("Custom isolation level specified but no actual transaction initiated; " +"isolation level will effectively be ignored: " + def);}//以非事務方式執行。boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);}}handleExistingTransaction
處理已存在事務的情況。
//處理已經存在事務的情況。private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled)throws TransactionException {//PROPAGATION_NEVER,如果當前存在事務,則拋出異常。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");}//PROPAGATION_NOT_SUPPORTED:以非事務方式執行操作,如果當前存在事務,就把當前事務掛起。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {if (debugEnabled) {logger.debug("Suspending current transaction");}//把當前事務掛起Object suspendedResources = suspend(transaction);boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);}//PROPAGATION_REQUIRES_NEW:一直新建事務,如果當前存在事務,把當前事務掛起。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {if (debugEnabled) {logger.debug("Suspending current transaction, creating new transaction with name [" +definition.getName() + "]");}//把當前事務掛起SuspendedResourcesHolder suspendedResources = suspend(transaction);try {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);//內嵌事務開始執行。doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}catch (RuntimeException | Error beginEx) {resumeAfterBeginException(transaction, suspendedResources, beginEx);throw beginEx;}}//如果當前存在事務,則在嵌套事務內執行。if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {if (!isNestedTransactionAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions by default - " +"specify 'nestedTransactionAllowed' property with value 'true'");}if (debugEnabled) {logger.debug("Creating nested transaction with name [" + definition.getName() + "]");}if (useSavepointForNestedTransaction()) {// Create savepoint within existing Spring-managed transaction,// through the SavepointManager API implemented by TransactionStatus.// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.DefaultTransactionStatus status =prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);status.createAndHoldSavepoint();return status;}else {boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);doBegin(transaction, definition);prepareSynchronization(status, definition);return status;}}// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.if (debugEnabled) {logger.debug("Participating in existing transaction");}//檢查已存在事務。if (isValidateExistingTransaction()) {if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {Constants isoConstants = DefaultTransactionDefinition.constants;throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] specifies isolation level which is incompatible with existing transaction: " +(currentIsolationLevel != null ?isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :"(unknown)"));}}if (!definition.isReadOnly()) {if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {throw new IllegalTransactionStateException("Participating transaction with definition [" +definition + "] is not marked as read-only but existing transaction is");}}}boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);} protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {if (status.isNewSynchronization()) {TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?definition.getIsolationLevel() : null);TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());TransactionSynchronizationManager.initSynchronization();}}?
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {//判斷是否激活同步。if (TransactionSynchronizationManager.isSynchronizationActive()) {List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();try {Object suspendedResources = null;if (transaction != null) {suspendedResources = doSuspend(transaction);}//以下都是復制狀態。String name = TransactionSynchronizationManager.getCurrentTransactionName();TransactionSynchronizationManager.setCurrentTransactionName(null);boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();TransactionSynchronizationManager.setActualTransactionActive(false);return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);}catch (RuntimeException | Error ex) {// doSuspend failed - original transaction is still active...doResumeSynchronization(suspendedSynchronizations);throw ex;}}else if (transaction != null) {// Transaction active but no synchronization active.Object suspendedResources = doSuspend(transaction);return new SuspendedResourcesHolder(suspendedResources);}else {// Neither transaction nor synchronization active.return null;}}需要由子類實現的方法
protected abstract Object doGetTransaction() throws TransactionException;protected abstract void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException;protected Object doSuspend(Object transaction) throws TransactionException {throw new TransactionSuspensionNotSupportedException("Transaction manager [" + getClass().getName() + "] does not support transaction suspension");}commit
@Overridepublic final void commit(TransactionStatus status) throws TransactionException {//不可重復提交if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;// 如果在事務鏈中已經被標記回滾,那么不會嘗試提交事務,直接回滾if (defStatus.isLocalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Transactional code has requested rollback");}// 這里會進行回滾,不拋出一個異常processRollback(defStatus, false);return;}//設置了全局回滾。if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {if (defStatus.isDebug()) {logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");}// 這里會進行回滾,并且拋出一個異常processRollback(defStatus, true);return;}//執行提交processCommit(defStatus);}processCommit?
private void processCommit(DefaultTransactionStatus status) throws TransactionException {try {boolean beforeCompletionInvoked = false;try {boolean unexpectedRollback = false;//準備提交prepareForCommit(status);//提交前triggerBeforeCommit(status);//triggerBeforeCompletion(status);beforeCompletionInvoked = true;if (status.hasSavepoint()) {//有安全點,說明此時的事務是嵌套事務NESTED,這個事務外面還有事務,這里不提交,只是釋放保存點。if (status.isDebug()) {logger.debug("Releasing transaction savepoint");}unexpectedRollback = status.isGlobalRollbackOnly();status.releaseHeldSavepoint();}else if (status.isNewTransaction()) {// 判斷是否是新事務。這里如果是子事務,只有PROPAGATION_NESTED狀態才會走到這里提交,也說明了此狀態子事務提交和外層事務是隔離的if (status.isDebug()) {logger.debug("Initiating transaction commit");}unexpectedRollback = status.isGlobalRollbackOnly();// 這里才真正去提交!doCommit(status);}else if (isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = status.isGlobalRollbackOnly();}// Throw UnexpectedRollbackException if we have a global rollback-only// marker but still didn't get a corresponding exception from commit.if (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");}}catch (UnexpectedRollbackException ex) {// 提交失敗處理。triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);throw ex;}catch (TransactionException ex) {// can only be caused by doCommitif (isRollbackOnCommitFailure()) {doRollbackOnCommitException(status, ex);}else {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);}throw ex;}catch (RuntimeException | Error ex) {if (!beforeCompletionInvoked) {triggerBeforeCompletion(status);}doRollbackOnCommitException(status, ex);throw ex;}// Trigger afterCommit callbacks, with an exception thrown there// propagated to callers but the transaction still considered as committed.try {//提交成功觸發triggerAfterCommit(status);}finally {triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);}}finally {cleanupAfterCompletion(status);}}需要由子類實現的方法
protected abstract void doCommit(DefaultTransactionStatus status) throws TransactionException;rollback
@Overridepublic final void rollback(TransactionStatus status) throws TransactionException {if (status.isCompleted()) {throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");}DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;processRollback(defStatus, false);}processRollback?
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {try {boolean unexpectedRollback = unexpected;try {//處理前觸發器triggerBeforeCompletion(status);//如果有保存點,僅回滾到保存點if (status.hasSavepoint()) {if (status.isDebug()) {logger.debug("Rolling back transaction to savepoint");}status.rollbackToHeldSavepoint();}else if (status.isNewTransaction()) {if (status.isDebug()) {logger.debug("Initiating transaction rollback");}//新的事務,則執行新事務的回滾。doRollback(status);}else {// Participating in larger transactionif (status.hasTransaction()) {if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {if (status.isDebug()) {logger.debug("Participating transaction failed - marking existing transaction as rollback-only");}doSetRollbackOnly(status);}else {if (status.isDebug()) {logger.debug("Participating transaction failed - letting transaction originator decide on rollback");}}}else {logger.debug("Should roll back transaction but cannot - no transaction available");}// Unexpected rollback only matters here if we're asked to fail earlyif (!isFailEarlyOnGlobalRollbackOnly()) {unexpectedRollback = false;}}}catch (RuntimeException | Error ex) {triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);throw ex;}//處理后觸發器triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);// Raise UnexpectedRollbackException if we had a global rollback-only markerif (unexpectedRollback) {throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");}}finally {//處理后清理。cleanupAfterCompletion(status);}}?
需要由子類實現的方法
protected abstract void doRollback(DefaultTransactionStatus status) throws TransactionException;SuspendedResourcesHolder
掛起事務狀態Holder,用于保存掛起事務信息,以便后面回復。
//掛起的資源private final Object suspendedResources;//事務同步器private List<TransactionSynchronization> suspendedSynchronizations;//事務名稱private String name;//是否只讀事務private boolean readOnly;//隔離級別private Integer isolationLevel;//是否激活private boolean wasActive;事務狀態控制
TransactionSynchronization
TransactionSynchronization接口定義了一系列的回調方法,對應一個事務執行的不同階段:掛起、恢復、flush、提交(前、后)、完成(事務成功或失敗)等。當事務運行到對應階段時,事務管理器會從TransactionSynchronizationManager維護的synchronizations中拿出所有的回調器,逐個回調其中的對應方法。
public interface TransactionSynchronization extends Flushable {/** 正確提交時完成狀態 */int STATUS_COMMITTED = 0;/** 回滾后完成狀態 */int STATUS_ROLLED_BACK = 1;/** 未知狀態:混合完成或系統錯誤 */int STATUS_UNKNOWN = 2;/**掛起同步器。從TransactionSynchronizationManager 解綁資源*/default void suspend() {}/**繼續同步器。重新為TransactionSynchronizationManager 綁定資源*/default void resume() {}/**Flush指定sessioin到數據存儲*/@Overridedefault void flush() {}/*** 事務提交前調用(before "beforeCompletion").*/default void beforeCommit(boolean readOnly) {}/*** 事務commit/rollback完成前調用.在beforeCommit之后,忽略beforeCommit是否拋出異常,任何返回結果。不要在子類拋出TransactionException 異常。*/default void beforeCompletion() {}/*** 事務提交后調用. 事務已經提交,但是事務資源可能是激活狀態和可訪問。* 不要在子類拋出TransactionException 異常。*/default void afterCommit() {}/*** 事務 提交后調用,可以清理資源了。不要在子類拋出TransactionException 異常。*/default void afterCompletion(int status) {}} public abstract class TransactionSynchronizationAdapter implements TransactionSynchronization, Ordered {@Overridepublic int getOrder() {return Ordered.LOWEST_PRECEDENCE;}@Overridepublic void suspend() {}@Overridepublic void resume() {}@Overridepublic void flush() {}@Overridepublic void beforeCommit(boolean readOnly) {}@Overridepublic void beforeCompletion() {}@Overridepublic void afterCommit() {}@Overridepublic void afterCompletion(int status) {}}TransactionSynchronizationManager
TransactionSynchronizationManager用于管理TransactionSynchronization,由一系列的ThreadLocal對象構成。ThreadLocal類型屬性與SuspendedResourcesHolder屬性基本對應,都是事務狀態信息。
public abstract class TransactionSynchronizationManager {//線程上下文中保存著【線程池對象:ConnectionHolder】的Map對象。線程可以通過該屬性獲取到同一個Connection對象。private static final ThreadLocal< Map<Object, Object> > resources = new NamedThreadLocal<>("Transactional resources");//事務同步器,是Spring交由程序員進行擴展的代碼,每個線程可以注冊N個事務同步器。private static final ThreadLocal< Set<TransactionSynchronization> > synchronizations = new NamedThreadLocal<>("Transaction synchronizations");// 事務的名稱 private static final ThreadLocal< String > currentTransactionName = new NamedThreadLocal<>("Current transaction name");// 事務是否是只讀 private static final ThreadLocal< Boolean > currentTransactionReadOnly = new NamedThreadLocal<>("Current transaction read-only status");// 事務的隔離級別private static final ThreadLocal< Integer > currentTransactionIsolationLevel = new NamedThreadLocal<>("Current transaction isolation level");// 事務是否開啟 actual:真實的private static final ThreadLocal< Boolean > actualTransactionActive = new NamedThreadLocal<>("Actual transaction active"); }TransactionSynchronizationUtils
TransactionSynchronizationUtils執行同步器操作,內部調用TransactionSynchronizationManager.getSynchronizations()獲取所有同步器,然后循環調用對應操作。
public static void triggerBeforeCompletion() {for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {try {synchronization.beforeCompletion();}catch (Throwable tsex) {logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);}}}?
超強干貨來襲 云風專訪:近40年碼齡,通宵達旦的技術人生總結
以上是生活随笔為你收集整理的Spring Boot Transaction 源码解析(一)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Security OAut
- 下一篇: Spring AOP源码解析(一)——核