javascript
Spring TX源码分析
一、先思考一下
- 什么是事務?
事務是一系列數(shù)據(jù)庫操作的集合,在一個事務里,所有有關的數(shù)據(jù)庫操作一起提交或一起回滾 - 事務用在什么地方?
如果多個數(shù)據(jù)庫操作需要一起生效或一起失效,那么這些操作需要放在一個事務里面 - 事務如何創(chuàng)建?
用戶創(chuàng)建了針對數(shù)據(jù)庫操作的連接(java.sql.Connection)之后,就可以針對Connection進行事務的操作,事務依賴于連接 - 事務的基本操作?
1. 開啟事務:Connection.setAutoCommit(false);關閉自動提交則就開啟了事務
2. 提交事務:Connection.commit();
3. 回滾事務:Connection.rollback();
關于事務的各種概念:
更詳細的參考
看源碼之前,如果有看AOP源碼的經歷,會很有幫助,因為spring的事務就是基于AOP的
二、開啟事務
我們在beans.xml中開啟事務的應用,需要添加
<tx:annotation-driven transaction-manager="transactionManager"/>- tx:annotation-driven/注解的分析
但凡這種注解,都有對應的解析器,跟AOP功能的源碼一樣,解析器都實現(xiàn)了NamespaceHandlerSupport類,我們來獲取下NamespaceHandlerSupport的實現(xiàn)類都有哪些
看名字就是TxNamespaceHandler類,我們來看下這個類有哪些內容
- TxNamespaceHandler
Spring會默認調用其init()方法,annotation-driven對應的是AnnotationDrivenBeanDefinitionParser解析器,我們來看下這個解析器的作用
- AnnotationDrivenBeanDefinitionParser的作用分析
- AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
總結:通過以上的分析可知,tx:annotation-driven/的主要功能就是將以下四個類注冊到Spring容器中
- AnnotationTransactionAttributeSource
- TransactionInterceptor(主要的攔截功能都在這里實現(xiàn))
- BeanFactoryTransactionAttributeSourceAdvisor(創(chuàng)建bean的代理類的時候該Advisor會被用上)
- InfrastructureAdvisorAutoProxyCreator:用于創(chuàng)建事務代理
- InfrastructureAdvisorAutoProxyCreator功能分析
其實現(xiàn)了BeanPostProcessor接口,則Spring在創(chuàng)建bean的時候,會默認調用InfrastructureAdvisorAutoProxyCreator的postProcessAfterInitialization()方法,然后就是wrapIfNecessary,跟AOP的流程完全一致
可以看到,注冊了AutoProxyRegistrar(是一個BeanPostProcessor)和ProxyTransactionManagementConfiguration(用于解析事務屬性)
- ProxyTransactionManagementConfiguration
- 該類是一個配置Bean
- 目的:創(chuàng)建事務的切面,跟AOP的區(qū)別就在這里,AOP的切面是我們自己定義的,而事務的切面是Spring給我們生成的
- AutoProxyRegistrar
可以看到也注冊了一個InfrastructureAdvisorAutoProxyCreator
跟xml的方式殊途同歸:也實現(xiàn)了BeanPostProcessor接口,則Spring在創(chuàng)建bean的時候,會默認調用InfrastructureAdvisorAutoProxyCreator的postProcessAfterInitialization()方法,然后就是wrapIfNecessary,跟AOP的流程完全一致
那么后置處理bean有了,切面也有了,就可以創(chuàng)建事務代理了
三、執(zhí)行事務
跟AOP一樣,以JdkDynamicProxy一樣,調用代理,觸發(fā)ReflectiveMethodInvocation的執(zhí)行方法
retVal = invocation.proceed();進而執(zhí)行到
代碼跟到這里,跟AOP又有一個不同的點,就是這個invoke的實現(xiàn)是TransactionInterceptor類,該類專門處理事務切面的執(zhí)行
//TransactionInterceptorpublic Object invoke(MethodInvocation invocation) throws Throwable {//計算出目標類:可能是 {@code null}。 TransactionAttributeSource 應該傳遞目標類以及方法,該方法可能來自接口。Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);//適配TransactionAspectSupport的invokeWithinTransaction...return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);}進而進入invokeWithinTransaction
@Nullable//TransactionAspectSupportprotected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,final InvocationCallback invocation) throws Throwable {// If the transaction attribute is null, the method is non-transactional.// 如果事務屬性為空,則該方法是非事務性的。TransactionAttributeSource tas = getTransactionAttributeSource();//拿到事務5個屬性的值,@Transactional(propagation = Propagation.REQUIRED,isolation = Isolation.DEFAULT )final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);//獲取事務管理器,由我們通過配置指定。事務管理的底層一定會與數(shù)據(jù)庫有關,所以會注入數(shù)據(jù)源等屬性final PlatformTransactionManager tm = determineTransactionManager(txAttr);//切入點,也就是需要控制事務的目的方法(update...)final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {// Standard transaction demarcation with getTransaction and commit/rollback calls.//使用 getTransaction 和 commitRollback 調用進行標準事務劃分,這一句是最難理解的//ifNecessary?其實就是根據(jù)事務屬性決定是否開啟事務TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);Object retVal;try {// This is an around advice: Invoke the next interceptor in the chain.// This will normally result in a target object being invoked.// 源方法運行retVal = invocation.proceedWithInvocation();}catch (Throwable ex) {// target invocation exception//出現(xiàn)異常時則回滾事務,注意:如果是Exception不會回滾,只有RunTimeException或者Error來及其子類才會回滾completeTransactionAfterThrowing(txInfo, ex);throw ex;}finally {//重置 TransactionInfo ThreadLocal。 在所有情況下都調用它:異常或正常返回cleanupTransactionInfo(txInfo);}//提交事務//TransactionManager調用Connection.commit()commitTransactionAfterReturning(txInfo);return retVal;}進入createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {// If no name specified, apply method identification as transaction name.// 如果未指定名稱,則應用方法標識作為事務名稱if (txAttr != null && txAttr.getName() == null) {txAttr = new DelegatingTransactionAttribute(txAttr) {@Overridepublic String getName() {return joinpointIdentification;}};}//記錄當前事務的狀態(tài),是否新事務,是否只讀事務,是否開啟同步?TransactionStatus status = null;if (txAttr != null) {if (tm != null) {//獲取事務的狀態(tài)status = tm.getTransaction(txAttr);}else {if (logger.isDebugEnabled()) {logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +"] because no transaction manager has been configured");}}}return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);}當TransactionAttribute為null,則創(chuàng)建一個TransactionInfo
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,@Nullable TransactionAttribute txAttr, String joinpointIdentification,@Nullable TransactionStatus status) {/** 根據(jù)給定的事務屬性、事務管理器、方法連接點描述字符串(全限定方法名)信息創(chuàng)建一個事務信息對象TransactionInfo*/TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);// 如果事務屬性不為nullif (txAttr != null) {// We need a transaction for this method...if (logger.isTraceEnabled()) {logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");}//那么設置事務狀態(tài),這里就表示為當前方法創(chuàng)建了一個事務txInfo.newTransactionStatus(status);}// 如果事務屬性為null,那么表示當前方法必須要創(chuàng)建事務else {if (logger.isTraceEnabled()) {logger.trace("No need to create transaction for [" + joinpointIdentification +"]: This method is not transactional.");}}//始終將最新的TransactionInfo綁定到當前線程,即使我們沒有在此處創(chuàng)建新的事務也是如此。//也就是將當前線程的最新事務棧設置為當前對象存入transactionInfoHolder中//這保證即使此方面未創(chuàng)建任何事務,也將正確管理TransactionInfo堆棧。txInfo.bindToThread();return txInfo; }- 這里要先介紹一下TransactionInfo,這是SpringTX的核心,TransactionInfo是TransactionAspectSupport的內部類,用來保存線程的執(zhí)行方法時的事務信息。內部保存了事務管理器transactionManager、事務屬性transactionAttribute、全路徑方法名joinpointIdentification。還保存了當前方法的事務transactionStatus,以及前一個方法的事務信息對象oldTransactionInfo。
如果事務屬性不為空,那么走getTransaction獲取/開啟事務分支。在createTransactionIfNecessary方法中,如果存在事務屬性TransactionAttribute,并且存在事務管理器PlatformTransactionManager,那么將調用事務管理器的getTransaction方法根據(jù)為當前方法配置的事務定義的屬性嘗試獲取事務,將返回一個TransactionStatus對象。該方法是事務管理的核心方法,其骨干實現(xiàn)位于抽象實現(xiàn)類AbstractPlatformTransactionManager中,該方法根據(jù)配置的各種事務傳播行為做出不同的處理,方法執(zhí)行完畢將可能開啟了新事物,也可能沒有開啟,甚至拋出異常。AbstractPlatformTransactionManager的設計基于模版方法模式,他提供了處理流程,并且提供了一系列的do……模版方法供子類來實現(xiàn)自己的邏輯。
getTransaction方法的大概邏輯為:
-
- 如果條件都滿足,那么表示此前已經開啟過了事務,即存在外層事務,隨后調用handleExistingTransaction方法統(tǒng)一處理這種情況,比如加入當前事務、新建事務、拋出異常等等邏輯。
-
- 校驗為當前事務方法設置的事務超時時間,如果小于默認超時時間(-1),將會拋出異常。
-
- 如果為當前事務方法設置的傳播行為PROPAGATION_MANDATORY,該傳播行為的含義是:如果當前存在事務,則當前方法加入到該事務中去,如果當前不存在事務,則當前方法直接拋出異常。這里由于不存在外層事務,那么這里就直接拋出異常:“No existing transaction found for transaction marked with propagation ‘mandatory’”。
-
- 如果為當前事務方法設置的傳播行為是PROPAGATION_REQUIRED或者PROPAGATION_REQUIRES_NEW或者PROPAGATION_NESTED,這些傳播行為的含義的共同點之一就是:如果當前不存在事務,就創(chuàng)建一個新事務運行。這里由于不存在外層事務,那么這里就直接創(chuàng)建一個新事物。
-
-
- 首先調用suspend方法首先掛起事務同步,然后再委派給doSuspend模板方法掛起事務,將返回被掛起的資源,用于后續(xù)恢復,如果沒有事務同步也沒有事務,那么將返回null。這里由于沒有已存在的事務,那么參數(shù)傳遞null,一般也會將返回null。
-
-
-
- 調用startTransaction方法,內部依次調用newTransactionStatus方法創(chuàng)建TransactionStatus、doBegin方法真正的開啟新的事務、prepareSynchronization方法準備事務同步,最終返回TransactionStatus,該對象包含了創(chuàng)建的內部事務對象,以及其他事務信息。
-
-
- 調用prepareTransactionStatus方法返回一個TransactionStatus,和上面的startTransaction方法相比,其內部會調用newTransactionStatus和prepareSynchronization,但不會調用doBegin方法,因此不會真正的開啟事物。這里它的newTransaction參數(shù)為false,suspendedResources參數(shù)為null。
- doGetTransaction獲取事務連接
返回當前已存在的事務對象,返回的對象應包含有關任何現(xiàn)有事務的信息,即,在事務管理器上的當前getTransaction方法調用之前已經啟動的事務。因此,doGetTransaction的實現(xiàn)通常是將查找現(xiàn)有事務并將相應的狀態(tài)存儲在返回的事務對象中。返回的對象通常特定于具體的事務管理器子類自己實現(xiàn)。一般都使用DataSourceTransactionManager這個事務管理器,它的doGetTransaction方法邏輯如下:
- 這里也要介紹一個類TransactionSynchronizationManager事務同步管理器
這個類比較特別,雖然它的名字帶有Transaction以及Manager,但卻不是TransactionManager體系,它沒有任何繼承樹,因此它可以看作一個很特別的工具類。該類被稱為事務同步管理器。主要用于管理每一個線程當前所使用的數(shù)據(jù)庫事務連接資源和事務同步器(TransactionSynchronization)。
??一個線程在當前只能激活一個連接資源,因此如果需要綁定新的連接資源,那么需要將此前綁定的資源刪除(或者說保存起來,等新資源使用完畢之后再恢復此前的連接資源)。另外還能支持事務同步列表,事務同步必須由事務管理器通過initSynchronization()和clearSynchronization()來進行激活和停用,使用isSynchronizationActive()檢測當前是否具有事務同步。
??TransactionSynchronizationManager內部是通過很多ThreadLocal(線程本地變量)類型的屬性來實現(xiàn)為當前線程維護自己的資源的功能的,實現(xiàn)資源隔離。
部分屬性:
public abstract class TransactionSynchronizationManager {private static final Log logger = LogFactory.getLog(TransactionSynchronizationManager.class);// 事務資源// 一個ThreadLocal屬性,用于存放線程當前使用的數(shù)據(jù)庫資源// value是一個Map<Object, Object>,key為某個數(shù)據(jù)源DataSource ,value實際上就是連接ConnectionHolderprivate static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");/*** 事務同步* <p>* 一個ThreadLocal屬性,用于存放線程當前激活的事務同步器TransactionSynchronization* 每個線程都可以開啟多個事物同步,用于在處理事務的各個階段進行自定義擴展或者回調* <p>* TransactionSynchronization的同步回調功能類似于此前學習的@TransactionalEventListener*/private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations =new NamedThreadLocal<>("Transaction synchronizations");//一個ThreadLocal屬性,用于存放線程當前的事務的名稱private static final ThreadLocal<String> currentTransactionName =new NamedThreadLocal<>("Current transaction name");//一個ThreadLocal屬性,用于存放線程當前的事務的只讀狀態(tài)private static final ThreadLocal<Boolean> currentTransactionReadOnly =new NamedThreadLocal<>("Current transaction read-only status");//一個ThreadLocal屬性,用于存放線程當前的當前事務的隔離級別private static final ThreadLocal<Integer> currentTransactionIsolationLevel =new NamedThreadLocal<>("Current transaction isolation level");//一個ThreadLocal屬性,用于存放線程當前是否開啟了事務private static final ThreadLocal<Boolean> actualTransactionActive =new NamedThreadLocal<>("Actual transaction active");- obtainDataSource獲取數(shù)據(jù)源
獲取當前DataSourceTransactionManager實際使用的數(shù)據(jù)源,就是我們配置給事務管理器的DataSource
/*** @return 數(shù)據(jù)源(絕不為null)*/ protected DataSource obtainDataSource() {DataSource dataSource = getDataSource();Assert.state(dataSource != null, "No DataSource set");return dataSource; } /*** 就是我們配置的數(shù)據(jù)源*/ @Nullable private DataSource dataSource; /*** 返回此事務管理器內部的JDBC DataSource。*/ @Nullable public DataSource getDataSource() {return this.dataSource; }- getResource獲取已存在的連接
TransactionSynchronizationManager的方法,該方法檢索給定key綁定到當前線程的資源,實際上就是嘗試從resources屬性中獲綁定當前線程的從給定數(shù)據(jù)源獲已取到的連接資源ConnectionHolder。如果此前沒有獲取過此數(shù)據(jù)源的連接,那么將會得到一個null值,即如果是第一次進入事務方法,那么將返回null。這里resources屬性的value是一個Map<Object, Object>,這說明一個線程可以從不同的數(shù)據(jù)源中獲取資源,但是對于同一個數(shù)據(jù)源,只能保存一個的數(shù)據(jù)源。
public static Object getResource(Object key) {//如有必要,解開給定的資源句柄,否則按原樣返回給定的句柄,常用于從各種代理對象中獲取原始對象Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//真正獲取當前綁定的資源Object value = doGetResource(actualKey);if (value != null && logger.isTraceEnabled()) {logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +Thread.currentThread().getName() + "]");}return value;}- isExistingTransaction是否已存在事務
檢查是否已經開啟過事務,默認返回false,被具體的事務管理器子類重寫。
DataSourceTransactionManager的邏輯很簡單,判斷通過doGetTransaction方法獲取的DataSourceTransactionObject內部的數(shù)據(jù)庫連接connectionHolder屬性是否不為null,并且是否已經開啟了事務。我們說過如果當前線程是第一次進來,那么connectionHolder就是null。
- suspend掛起事務
掛起給定的事務。首先掛起當前線程的事務同步回調,然后再委派給doSuspend模板方法由子類來實現(xiàn)掛起當前事務,并且還會清空TransactionSynchronizationManager中保存的當前線程的事務信息。最終將會返回會被掛起的資源只有者SuspendedResourcesHolder。如果沒有事務同步也沒有事務,那么將會返回null。當前線程第一次進入事務方法時,默認將會返回null。
//AbstractPlatformTransactionManagerprotected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {//如果當前線程的事務同步處于活動狀態(tài),即存在綁定的TransactionSynchronization,則返回true。//如果是第一次因為進來,那么自然為falseif (TransactionSynchronizationManager.isSynchronizationActive()) {//掛起當前線程的所有事務同步回調,這類似于@TransactionalEventListener,并返回"被掛起"的回調List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();try {Object suspendedResources = null;if (transaction != null) {//掛起事務,由具體的子類實現(xiàn)suspendedResources = doSuspend(transaction);}//獲取當前事務的信息,并且清空各個ThreadLocal緩存中的當前線程的當前事務信息(恢復為默認值)//獲取并清空(設置為null)事物名稱String name = TransactionSynchronizationManager.getCurrentTransactionName();TransactionSynchronizationManager.setCurrentTransactionName(null);//獲取并清空(設置為false)事物只讀狀態(tài)boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);//獲取并清空(設置為null)事物隔離級別Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);//獲取并清空(設置為false)事物是否激活boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();TransactionSynchronizationManager.setActualTransactionActive(false);//將獲取的當前事物的信息存入一個SuspendedResourcesHolder對象中返回return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);}catch (RuntimeException | Error ex) {// doSuspend failed - original transaction is still active...doResumeSynchronization(suspendedSynchronizations);throw ex;}}//如果沒有事務同步但是開啟了事務,那么掛起事務else if (transaction != null) {// Transaction active but no synchronization active.//掛起事務,由具體的子類實現(xiàn)Object suspendedResources = doSuspend(transaction);//將掛起的資源存入一個SuspendedResourcesHolder對象中返回return new SuspendedResourcesHolder(suspendedResources);}else {// Neither transaction nor synchronization active.//事務或者事務同步均未激活,返回null,什么也不干return null;}}- isSynchronizationActive是否激活事務同步
?
TransactionSynchronizationManager的方法,用來判斷線程在當前是否已激活事務同步TransactionSynchronization。實際上就是synchronizations屬性中是否有綁定到當前線程的Set集合,如果有(不為null),那就說明存在事務同步。
- doSuspendSynchronization掛起事務同步
??
??該方法掛起當前線程在TransactionSynchronizationManager的synchronize并且將屬性中為當前線程保持的事務同步列表引用移除,最后返回被掛起的事務同步列表!
- doSuspend掛起事務
其核心就是doSuspend方法,該方法默認拋出異常,由子類自己實現(xiàn)!DataSourceTransactionManager重寫的方法很簡單,就是將DataSourceTransactionObject中的connectionHolder設置為null,并且將給定數(shù)據(jù)源綁定到當前線程的連接資源從TransactionSynchronizationManager的resources屬性中移除并返回,這就是DataSourceTransactionManager被掛起的連接資源,就是此前獲取的連接。從這里能夠看出,所謂的“掛起”,就是將當前的連接從綁定的線程本地變量中移除!
/*** DataSourceTransactionManager的方法* <p>* 掛起當前事務,返回當前的連接資源** @param transaction 掛起事務* @return 被掛起的資源*/ @Override protected Object doSuspend(Object transaction) {DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;//將當前的事務對象的connectionHolder設置為nulltxObject.setConnectionHolder(null);//將當前線程的綁定的當前數(shù)據(jù)源對應的連接同樣移除,并且返回被移除的連接資源return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }/*** TransactionSynchronizationManager的方法* <p>* 移除當前線程中給定key綁定的資源的值** @param key 就是當前數(shù)據(jù)源* @return 被移除的資源,就是連接*/ public static Object unbindResource(Object key) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//真正的移除指定的key對應的連接Object value = doUnbindResource(actualKey);if (value == null) {throw new IllegalStateException("No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");}return value; }/*** 事務資源*/ private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");/*** TransactionSynchronizationManager的方法* <p>* 移除當前線程中給定key綁定的資源的值。** @param actualKey 就是當前數(shù)據(jù)源* @return 被移除的資源,就是連接*/ @Nullable private static Object doUnbindResource(Object actualKey) {//獲取和當前線程綁定的數(shù)據(jù)庫資源mapMap<Object, Object> map = resources.get();if (map == null) {return null;}//從map中移除從當前數(shù)據(jù)源對應的連接緩存Object value = map.remove(actualKey);//如果map為空,則刪除整個ThreadLocal。if (map.isEmpty()) {resources.remove();}// Transparently suppress a ResourceHolder that was marked as void...if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {value = null;}if (value != null && logger.isTraceEnabled()) {logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +Thread.currentThread().getName() + "]");}//返回被移除的資源return value; }- newTransactionStatus開啟新事物
新建一個DefaultTransactionStatus實現(xiàn)并返回,內部持有我們?yōu)楫斍胺椒ㄅ渲玫氖聞諏傩曰蛘吣J屬性,以及保存著此前掛起的其他資源。新建一個DefaultTransactionStatus實現(xiàn)并返回,內部持有我們?yōu)楫斍胺椒ㄅ渲玫氖聞諏傩曰蛘吣J屬性,以及保存著此前掛起的其他資源。
/*** AbstractPlatformTransactionManager的方法* <p>* 為給定參數(shù)新創(chuàng)建一個TransactionStatus實例,實際類型為DefaultTransactionStatus** @param definition 為當前方法配置的事務定義* @param transaction 獲取的事務對象* @param newTransaction 是否是新事物* @param newSynchronization 是否開啟事務同步* @param debug 是否支持debug級別的日志* @param suspendedResources 被掛起的資源,比如此前的事務同步* @return DefaultTransactionStatus對象*/ protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {//如果newSynchronization為true并且當前線程沒有綁定的事務同步,那么確定開啟新事物同步//由于此前調用了suspend方法清理了此前的事務同步,因此一般都是需要開啟新事務同步,即為trueboolean actualNewSynchronization = newSynchronization &&!TransactionSynchronizationManager.isSynchronizationActive();//返回一個新建的DefaultTransactionStatus對象,該對象被用來表示新開啟的事務,是TransactionStatus的默認實現(xiàn)//內部包括了各種新開啟的事務狀態(tài),當然包括此前掛起的事務的資源return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization,definition.isReadOnly(), debug, suspendedResources); } /*DefaultTransactionStatus的屬性*/ /*** 從事務管理器獲取的內部事務* 對于DataSourceTransactionManager來說就是DataSourceTransactionObject*/ @Nullable private final Object transaction; /*** 是否是新事物*/ private final boolean newTransaction; /*** 是否開啟新事務同步*/ private final boolean newSynchronization; /*** 是否開啟新事務同步*/ private final boolean readOnly; /*** 是否支持debug日志級別*/ private final boolean debug; /*** 此前被掛起的事務資源*/ @Nullable private final Object suspendedResources; public DefaultTransactionStatus(@Nullable Object transaction, boolean newTransaction, boolean newSynchronization,boolean readOnly, boolean debug, @Nullable Object suspendedResources) {this.transaction = transaction;this.newTransaction = newTransaction;this.newSynchronization = newSynchronization;this.readOnly = readOnly;this.debug = debug;this.suspendedResources = suspendedResources; }- doBegin真正開啟事務
該方法是核心方法,當事務管理器決定實際開始新事務時,將調用此方法。此時之前可能沒有任何事務,或者先前的事務已被暫停。根據(jù)給定的事務定義TransactionDefinition,以及此前通過doGetTransaction方法返回的事務對象(也就是DataSourceTransactionObject),使用給定的語義開始一個新事務。不必關心應用傳播行為,因為抽象事務管理器已經處理了該行為。
??
大概步驟為:
- prepareConnectionForTransaction準備事務連接
使用給定的事務語義準備給定的Connection,就是將我們設置的隔離級別isolationLevel,只讀標志readOnly屬性賦給當前事務連接。如果我們配置的隔離級別屬性是ISOLATION_DEFAULT,即采用默認隔離級別,或者不是默認的隔離級別但是與連接的隔離級別一致,那么將返回null,否則將設置連接的隔離級別為指定的級別,并且返回從連接中獲取的隔離級別(如果有)。
/*** DataSourceUtils的方法* <p>* 使用給定的事務語義準備給定的Connection。** @param con 需要準備的連接* @param definition 適用的事務定義* @return 連接先前的隔離級別,可能為null*/ @Nullable public static Integer prepareConnectionForTransaction(Connection con, @Nullable TransactionDefinition definition)throws SQLException {Assert.notNull(con, "No Connection specified");boolean debugEnabled = logger.isDebugEnabled();// 設置只讀標志。if (definition != null && definition.isReadOnly()) {try {if (debugEnabled) {logger.debug("Setting JDBC Connection [" + con + "] read-only");}//設置連接只讀屬性con.setReadOnly(true);} catch (SQLException | RuntimeException ex) {Throwable exToCheck = ex;while (exToCheck != null) {if (exToCheck.getClass().getSimpleName().contains("Timeout")) {// Assume it's a connection timeout that would otherwise get lost: e.g. from JDBC 4.0throw ex;}exToCheck = exToCheck.getCause();}// "read-only not supported" SQLException -> ignore, it's just a hint anywaylogger.debug("Could not set JDBC Connection read-only", ex);}}// 應用特定的隔離級別(如果有)。Integer previousIsolationLevel = null;//如果存在隔離級別并且不等于默認配置,即不等于ISOLATION_DEFAULT(該級別的意思是使用數(shù)據(jù)庫的默認級別)if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {if (debugEnabled) {logger.debug("Changing isolation level of JDBC Connection [" + con + "] to " +definition.getIsolationLevel());}//獲取當前連接的隔離級別int currentIsolation = con.getTransactionIsolation();//如果手動設置的隔離級別不等于連接的隔離級別if (currentIsolation != definition.getIsolationLevel()) {//記錄連接的隔離級別previousIsolationLevel = currentIsolation;//連接的隔離級別手動設置為我們配置的隔離級別con.setTransactionIsolation(definition.getIsolationLevel());}}//返回此前的連接的隔離級別,可能為nullreturn previousIsolationLevel; }- prepareTransactionalConnection優(yōu)化只讀事務
??事務開始后立即準備事務連接,主要是對于只讀事務的優(yōu)化操作(需要手動開啟)。如果將事務管理器的"enforceReadOnly"標志設置為true(默認為false),并且事務定義指示只讀事務,則默認實現(xiàn)將執(zhí)行"SET TRANSACTION READ ONLY"這一個sql語句。
??"SET TRANSACTION READ ONLY"這個sql的意思就是告訴數(shù)據(jù)庫,此事務中的后續(xù)sql語句將只有查詢操作,不能進行DML操作。在"SET TRANSACTION READ ONLY"之后的查詢語句將不會查詢到該事物期間提交的內容,只能查詢到事務開始之前提交的內容,相當于查詢一個快照。進行只讀事務設置之后,將有效減輕數(shù)據(jù)庫壓力。對于同一個表進行更新操作時,只讀事務不會被阻塞,可以正常的執(zhí)行查詢操作,在只讀事務操作期間也不會影響其他事務!
上面說了這么多好處,很遺憾的是DataSourceTransactionManager的enforceReadOnly屬性默認為false,并且大部分開發(fā)者也不知道這個優(yōu)化,因此大多數(shù)情況下并不會執(zhí)行該sql語句,即不會進行優(yōu)化。如果要開啟,那么可以這么設置:
/*** 配置DataSourceTransactionManager* 用于管理某一個數(shù)據(jù)庫的事務*/ @Bean public DataSourceTransactionManager transactionManager() {DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager(druidDataSource());//設置只讀事務優(yōu)化dataSourceTransactionManager.setEnforceReadOnly(true);//傳入一個數(shù)據(jù)源return dataSourceTransactionManager; }當然如果你真的這么開啟了,并且你通過@Transactional注解設置了某個方法的事務的readOnly屬性為true,那么確實會執(zhí)行該方法,但是你講將會收到一個異常:“Connection is read-only. Queries leading to data modification are not allowed”。原因是什么呢?很簡單,在前面的prepareConnectionForTransaction方法中,連接被設置為只讀,然而在隨后的prepareTransactionalConnection方法中,執(zhí)行該sql語句的卻是executeUpdate方法,自然會拋出異常!所以說,這個優(yōu)化還不能隨便開。或者說,是因為不同的數(shù)據(jù)庫對于Spring的readOnly屬性的支持是不一樣的,mysql支持Spring的readOnly參數(shù),即支持JDBC的con.setReadOnly(true),因此就沒必要再設置enforceReadOnly為true,而oracle則僅支持在Oracle server的設置而非JDBC驅動的配置,因此不支持con.setReadOnly(true),所以實際上Spring的readOnly配置對于Oracle無效,所以Oracle數(shù)據(jù)庫可以開啟此優(yōu)化,mysql則不必要。
- determineTimeout確定超時時間
確定給定事務定義的實際超時時間。如果事務定義未指定非默認值,則將使用默認超時。
/*** AbstractPlatformTransactionManager的方法* <p>* 確定給定事務定義的實際超時時間。* 如果事務定義未指定非默認值,則將使用默認超時。** @param definition 事務定義* @return 實際使用的超時時間*/ protected int determineTimeout(TransactionDefinition definition) {//如果不是默認超時時間,那么使用指定的事件if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {return definition.getTimeout();}//否則使用默認超時return getDefaultTimeout(); }/*** AbstractPlatformTransactionManager的屬性* <p>* 默認超時時間(秒),默認值為-1,表示使用基礎事務系統(tǒng)的默認超時;*/ private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;public final int getDefaultTimeout() {return this.defaultTimeout; }- setTimeoutInSeconds設置超時deadline
這里實際上就是根據(jù)設置的值和當前時間轉換為未來的毫秒值并創(chuàng)建新Date配置給deadline屬性,在其他數(shù)據(jù)庫操作框架具體操作時將會獲取并應用該參數(shù)。比如mybatis,在執(zhí)行sql之前會獲取到超時時間,計算之后會通過Statement.setQueryTimeout方法來設置,也就是說這個超時時間是執(zhí)行sql之前的代碼執(zhí)行時間+sql執(zhí)行時間,如果執(zhí)行時間超過了設置時間就會拋出異常,這個異常就會被spring事務切面捕獲到最終導致事務回滾,而如果在sql執(zhí)行完畢之后的方法處理時間超過了這個超時時間,那么是不會進行回滾的,事務將會正常提交。
/*** ConnectionHolder的父類ResourceHolderSupport的方法* <p>* 設置此對象的超時(以秒為單位)。** @param seconds 到期前的秒數(shù)*/ public void setTimeoutInSeconds(int seconds) {setTimeoutInMillis(seconds * 1000L); }/*** ConnectionHolder的父類ResourceHolderSupport的屬性*/ @Nullable private Date deadline;/*** 設置此對象的超時(以毫秒為單位)。** @param millis 到期前的毫秒數(shù)*/ public void setTimeoutInMillis(long millis) {//根據(jù)當前時間和超時時間計算出到期的Datethis.deadline = new Date(System.currentTimeMillis() + millis); }- bindResource綁定資源到resources
對于新獲取的連接資源會被綁定到TransactionSynchronizationManager的resources線程本地變量屬性中(resources我們在此前就見過了)。key就是當前的屬性源DataSource,value就是ConnectionHolder。
/*** 事務資源*/ private static final ThreadLocal<Map<Object, Object>> resources =new NamedThreadLocal<>("Transactional resources");/*** TransactionSynchronizationManager的方法* <p>* 將給定key的給定資源value綁定到當前線程。* 對于DataSourceTransactionManager,key就是DataSource實例,value就是ConnectionHolder** @param key 將值綁定到的鍵(通常是資源工廠,比如dataSource)* @param value 要綁定的值(通常是活動資源對象,比如數(shù)據(jù)庫連接)* @throws IllegalStateException 如果已經有綁定到線程的值*/ public static void bindResource(Object key, Object value) throws IllegalStateException {Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);Assert.notNull(value, "Value must not be null");//獲取當前線程的本地資源mapMap<Object, Object> map = resources.get();//如果找不到,則設置一個Mapif (map == null) {map = new HashMap<>();resources.set(map);}//將actualKey和value存入map中,返回舊的valueObject oldValue = map.put(actualKey, value);// Transparently suppress a ResourceHolder that was marked as void...if (oldValue instanceof ResourceHolder && ((ResourceHolder) oldValue).isVoid()) {oldValue = null;}//如果已經有綁定到線程的當前key的值,則拋出異常if (oldValue != null) {throw new IllegalStateException("Already value [" + oldValue + "] for key [" +actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");}if (logger.isTraceEnabled()) {logger.trace("Bound value [" + value + "] for key [" + actualKey + "] to thread [" +Thread.currentThread().getName() + "]");} }- prepareSynchronization準備事務同步
??
該方法通常用在doBegin開啟新事物之后,用于準備事務同步,就是將當前事務的一系列屬性綁定到TransactionSynchronizationManager的對應的線程本地變量中。
- prepareTransactionStatus準備事務狀態(tài)
- prepareTransactionStatus方法和上面的startTransaction方法相比,其內部會調用newTransactionStatus和prepareSynchronization,但不會調用doBegin方法,因此不會真正的開啟事物。返回的TransactionStatus,其內部保存了其他的資源,比如被掛起的事務信息。
- 如果是第一次進入事務方法,即當前方法是最外層事務方法,并且傳播行為是PROPAGATION_SUPPORTS或PROPAGATION_NEVER或PROPAGATION_NOT_SUPPORTED,那么會調用該方法,它的newTransaction參數(shù)為false,suspendedResources參數(shù)為null。即這些傳播行為都不會真正的開啟數(shù)據(jù)庫級別的事務(不會獲取新的連接)。
- 如果是已存在外層事務,即當前方法是內層事務方法,并且傳播行為是PROPAGATION_NOT_SUPPORTED或者PROPAGATION_NESTED或者PROPAGATION_SUPPORTS或者PROPAGATION_REQUIRED或者PROPAGATION_MANDATORY,那么也有可能調用這個方法,它的newTransaction參數(shù)為false,suspendedResources參數(shù)為被掛起的外層事務資源。即這些傳播行為都不會真正的開啟數(shù)據(jù)庫級別的事務(不會獲取新的連接,對于普通事物來說)。
}
- 如果當前已存在事務,則handleExistingTransaction處理已存在事務
??
如果進入事務切面之后,獲取到了已存在的連接并且開啟了事務(通過isExistingTransaction方法判斷),那么將會執(zhí)行handleExistingTransaction方法執(zhí)行已存在事務時的邏輯,并返回一個TransactionStatus。該方法同樣將會根據(jù)此事務切面設置的事務傳播行為走不同的執(zhí)行流程,比如加入當前事務、新建事務、拋出異常等等邏輯。
??大概邏輯是:
transaction found for transaction marked with propagation ‘never’”。
-
- 首先調用suspend方法掛起外層事務,返回被掛起的資源。由于存在外層事務,所以這里的參數(shù)就是獲取的外層事務參數(shù)。
-
- 隨后調用prepareTransactionStatus方法返回一個新的TransactionStatus,并在適當時初始化事務同步。同樣,該方法和上面的startTransaction方法相比,其內部會調用newTransactionStatus和prepareSynchronization,但不會調用doBegin方法,因此不會真正的開啟事物。這里它的newTransaction參數(shù)為false,transaction參數(shù)為null,suspendedResources參數(shù)為被掛起的外層事務資源。
-
- 首先調用suspend方法掛起外層事務,返回被掛起的資源。由于存在外層事務,所以這里的參數(shù)就是獲取的外層事務參數(shù)。
-
- 隨后調用startTransaction方法真正的開啟一個數(shù)據(jù)庫級別的事務(將會獲取新的連接開啟一個新事物,舊的連接和事務則在上面的suspend方法中被掛起保存)。
如果當前存在事務,則創(chuàng)建一個新“事務”作為當前事務的嵌套事務來運行;如果當前沒有事務,則等價于PROPAGATION_REQUIRED,即會新建一個事務運行。
-
- 調用isNestedTransactionAllowed方法判斷是否允許PROPAGATION_NESTED行為,默認不允許,但是DataSourceTransactionManager重寫為允許。不允許就拋出異常:“Transaction manager does not allow nested transactions……”。
-
- 調用useSavepointForNestedTransaction方法判斷是否對“嵌套事務”使用保存點Savepoint來實現(xiàn):
-
-
- 如果允許,那么首先調用prepareTransactionStatus方法返回一個新的TransactionStatus,并在適當時初始化事務同步。這里它的newTransaction參數(shù)為false,transaction參數(shù)為外層事務,suspendedResources參數(shù)為null,newSynchronization參數(shù)為false。隨后調用createAndHoldSavepoint創(chuàng)建保存點,將使用數(shù)據(jù)庫的保存點的特性來實現(xiàn)“嵌套事務”,這是用語大部分普通事務。
-
-
-
- 否則將調用startTransaction方法通過在外層事務中嵌套的begin和commit/rollback調用來開啟真正的嵌套事務,不過通常僅用于JTA:如果存在預先存在的JTA事務,則可以在此處激活Spring同步。
-
-
- 那么這里同樣調用prepareTransactionStatus方法,這里它的newTransaction參數(shù)為false,transaction參數(shù)為外層事務,suspendedResources參數(shù)為null。
從源碼中我們能夠看到,如果傳播行為是PROPAGATION_NESTED:
- createAndHoldSavepoint創(chuàng)建保存點
該方法很簡單,最終會調用當前的JDBC連接Connection的setSavepoint方法創(chuàng)建一個保存點,并且被設置給當前TransactionStatus對象的savepoint屬性。
/*** AbstractTransactionStatus的方法* <p>* 創(chuàng)建一個保存點并將其保存在事務中。** @throws NestedTransactionNotSupportedException 如果基礎事務不支持保存點*/ public void createAndHoldSavepoint() throws TransactionException {setSavepoint(getSavepointManager().createSavepoint()); } /*** AbstractTransactionStatus的屬性*/ @Nullable private Object savepoint;/*** AbstractTransactionStatus的方法* <p>* 設置此事務的保存點,對PROPAGATION_NESTED有用。*/ protected void setSavepoint(@Nullable Object savepoint) {this.savepoint = savepoint; }getSavepointManager獲取獲取保存點管理器,實際上創(chuàng)建的內部事務對象都是SavepointManager接口的實現(xiàn),具有獲取保存點的方法!因此返回的實際上就是內部的事務對象,對于DataSourceTransactionManager來說創(chuàng)建的內部事務就是DataSourceTransactionObject。
/*** DefaultTransactionStatus的屬性* <p>* 獲取基礎事務對象的SavepointManager,實際上就是獲取的內部事務對象*/ @Nullable private final Object transaction;/*** DefaultTransactionStatus的方法* <p>* 獲取基礎事務對象的SavepointManager,實際上就是獲取的內部事務對象*/ @Override protected SavepointManager getSavepointManager() {//獲取內部事務,對于DataSourceTransactionManager來說創(chuàng)建的內部事務就是DataSourceTransactionObjectObject transaction = this.transaction;if (!(transaction instanceof SavepointManager)) {throw new NestedTransactionNotSupportedException("Transaction object [" + this.transaction + "] does not support savepoints");}return (SavepointManager) transaction; }createSavepoint用于從當前保存點管理器(事務對象)中創(chuàng)建一個保存點,實際上就是獲取事務對象里面的ConnectionHolder,然后在獲取ConnectionHolder里面的Connection,最后調用Connection.setSavepoint方法創(chuàng)建并獲取保存點。
/*** DataSourceTransactionObject的父類JdbcTransactionObjectSupport的方法* <p>* 創(chuàng)建一個JDBC 3.0保存點并返回它。*/ @Override public Object createSavepoint() throws TransactionException {//校驗規(guī)則并獲取,此前創(chuàng)建的ConnectionHolder,其內部保存了獲取的連接ConnectionConnectionHolder conHolder = getConnectionHolderForSavepoint();try {//如果不允許保存點,那么拋出異常,默認允許if (!conHolder.supportsSavepoints()) {throw new NestedTransactionNotSupportedException("Cannot create a nested transaction because savepoints are not supported by your JDBC driver");}//如果被設置為僅回滾,那么拋出異常if (conHolder.isRollbackOnly()) {throw new CannotCreateTransactionException("Cannot create savepoint for transaction which is already marked as rollback-only");}return conHolder.createSavepoint();} catch (SQLException ex) {throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);} }/*** DataSourceTransactionObject的父類JdbcTransactionObjectSupport的方法* <p>* 為了創(chuàng)建一個JDBC 3.0保存而獲取ConnectionHolder。*/ protected ConnectionHolder getConnectionHolderForSavepoint() throws TransactionException {//如果不允許保存點,那么拋出異常,默認允許if (!isSavepointAllowed()) {throw new NestedTransactionNotSupportedException("Transaction manager does not allow nested transactions");}//如果沒有ConnectionHolder,那么拋出異常,默認允許if (!hasConnectionHolder()) {throw new TransactionUsageException("Cannot create nested transaction when not exposing a JDBC transaction");}//返回此前創(chuàng)建的ConnectionHolder,其內部保存了獲取的連接return getConnectionHolder(); }//ConnectionHolder的方法的屬性/*** 保存點名稱的前綴。*/ public static final String SAVEPOINT_NAME_PREFIX = "SAVEPOINT_";/*** 從此連接中獲取的保存點的數(shù)量*/ private int savepointCounter = 0;/*** ConnectionHolder的方法* 為當前連接創(chuàng)建一個新的JDBC 3.0保存點,只用SAVEPOINT_+savepointCounter作為保存點的名稱** @return 新的保存點* @throws SQLException if thrown by the JDBC driver*/ public Savepoint createSavepoint() throws SQLException {//獲取數(shù)量自增1this.savepointCounter++;//獲取內部的JDBC連接,并通過連接設置一個保存點,返回創(chuàng)建的Savepointreturn getConnection().setSavepoint(SAVEPOINT_NAME_PREFIX + this.savepointCounter); }四、小結
createTransactionIfNecessary方法創(chuàng)建并返回一個TransactionInfo對象,并且在此過程中,將會調用getTransaction方法獲取事務TransactionStatus。
getTransaction方法就是Spring事務處理的核心方法之一,該方法根據(jù)配置的各種事務傳播行為以及是否存在外層事務做出不同的處理,方法執(zhí)行完畢將可能開啟了新事物,也可能沒有開啟,甚至拋出異常。
TransactionInfo內部保存了事務管理器transactionManager、事務屬性transactionAttribute、全路徑方法名joinpointIdentification。還保存了當前方法的事務transactionStatus,以及前一個方法的事務信息對象oldTransactionInfo。
TransactionStatus實際類型為DefaultTransactionStatus,它持有一個transaction內部事務對象、被掛起的事務資源以及一些事務的屬性,這個內部事務對象由事務管理器的實現(xiàn)各自創(chuàng)建,不同的事務管理器將會創(chuàng)建不同的類型,因此使用Object來表示,對于DataSourceTransactionManager來說,它創(chuàng)建的事務對象就是DataSourceTransactionObject。
DataSourceTransactionObject內部持有一個ConnectionHolder對象以及一些事務的屬性,ConnectionHolder對象內部持有一個為了配置數(shù)據(jù)庫事務而獲取的JDBC連接Connection,以及是否開啟了事務的標志transactionActive,以及其他屬性,比如保存點計數(shù)。
最終,我們?yōu)槟硞€方法定義的事務屬性,除了傳播行為之外(Spring提供的特性并自行處理),都會反應到Connection的對應操作上,比如隔離級別、超時時間,是否只讀等等,關鍵方法就是doBegin,該方法用于真正的開啟數(shù)據(jù)庫層面的事務。
我們用了很長的文章講解了createTransactionIfNecessary方法的邏輯和源碼,這是Spring 事務開啟的核心處理方法(可能并未真正的開啟事務),剩下的方法比如proceedWithInvocation、completeTransactionAfterThrowing、cleanupTransactionInfo、commitTransactionAfterReturning就是在開啟事物之后的處理方法,比如回滾、提交、恢復事務等等。
參考文章
總結
以上是生活随笔為你收集整理的Spring TX源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2020年中国互联网租车报告
- 下一篇: 2021年货节消费趋势报告