javascript
Spring事务源码分析
首先看例子,這例子摘抄自開濤的跟我學spring3。
@Test public void testPlatformTransactionManager() { DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setIsolationLevel(TransactionDefinition.ISOLATION_READ_COMMITTED); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED); TransactionStatus status = txManager.getTransaction(def); try { jdbcTemplate.update(INSERT_SQL, "test"); txManager.commit(status); } catch (RuntimeException e) { txManager.rollback(status); } } |
重要的代碼在上面高亮處。
在執行jdbcTemplate.update的時候使用的是datasource.getConection獲取連接。
實際上,
- 在執行txManager.getTransaction(def);的時候,應該會設置:conection.setAutoConmmit(false)。
- 在執行txManager.commit(status);的時候,應該是執行conection.commit();
- 在執行txManager. rollback (status);的時候,應該是執行conection. rollback ();
但是,Spring是如何保證,txManager中的conn就是jdbcTemplate中的conn的呢。從這點出發,開始看源代碼。
因為是執行的jdbc操作,這里的txManager是DataSourceTransactionManager。我們來看代碼:
getTransaction方法:
getTransaction方法在DataSourceTransactionManager的超類中,也就是AbstractPlatformTransactionManager,我們來看方法:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { ????????Object transaction = doGetTransaction(); ? ????????// Cache debug flag to avoid repeated checks. ????????boolean debugEnabled = logger.isDebugEnabled(); ? ????????if (definition == null) { ????????????// Use defaults if no transaction definition given. ????????????definition = new DefaultTransactionDefinition(); ????????} ? ????????if (isExistingTransaction(transaction)) { ????????????// Existing transaction found -> check propagation behavior to find out how to behave. ????????????return handleExistingTransaction(definition, transaction, debugEnabled); ????????} ? ????????// Check definition settings for new transaction. ????????if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { ????????????throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); ????????} ? ????????// No existing transaction found -> check propagation behavior to find out how to proceed. ????????if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { ????????????throw new IllegalTransactionStateException( ????????????????????"No existing transaction found for transaction marked with propagation 'mandatory'"); ????????} ????????else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || ????????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || ????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { ????????????SuspendedResourcesHolder suspendedResources = suspend(null); ????????????if (debugEnabled) { ????????????????logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); ????????????} ????????????try { ????????????????boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); ????????????????DefaultTransactionStatus status = newTransactionStatus( ????????????????????????definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); ????????????????doBegin(transaction, definition); ????????????????prepareSynchronization(status, definition); ????????????????return status; ????????????} ????????????catch (RuntimeException ex) { ????????????????resume(null, suspendedResources); ????????????????throw ex; ????????????} ????????????catch (Error err) { ????????????????resume(null, suspendedResources); ????????????????throw err; ????????????} ????????} ????????else { ????????????// Create "empty" transaction: no actual transaction, but potentially synchronization. ????????????boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); ????????????return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); ????????} ????} |
?
先看第一句,
Object transaction = doGetTransaction();
方法在AbstractPlatformTransactionManager中,方法為:
protected abstract Object doGetTransaction() throws TransactionException;
這是典型的模板方法設計模式,AbstractPlatformTransactionManager作為抽象類,定義了getTransaction方法,并且設置為final,然后方法內部調用的部分方法是protected abstract的,交給子類去實現。
我們來看在DataSourceTransactionManager類中的doGetTransaction方法的定義:
@Override ????protected Object doGetTransaction() { ????????DataSourceTransactionObject txObject = new DataSourceTransactionObject(); ????????txObject.setSavepointAllowed(isNestedTransactionAllowed()); ????????ConnectionHolder conHolder = ????????????????(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource); ????????txObject.setConnectionHolder(conHolder, false); ????????return txObject; ????} |
?
注意這里,是new了一個DataSourceTransactionObject對象,重要的是高亮的兩句。txObject中有一個ConnectionHolder對象,這么說來,在這一步的時候有可能已經在事務對象(DataSourceTransactionObject)中,保存了一個ConnectionHolder對象,顧名思義,ConnectionHolder中必然有Connection。如果是這樣,我們只要確定,在執行jdbc操作的時候使用的Connection和這個ConnectionHolder中的是同一個就可以了。我們先看ConnectionHolder的結構。
確實如我們所想。
我們再看TransactionSynchronizationManager.getResource(this.dataSource);代碼如何獲取ConnectionHolder的。
TransactionSynchronizationManager這個名字,應該是支持多線程并發讀取的。我們看代碼。
public static Object getResource(Object key) { ????????Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key); ????????Object value = doGetResource(actualKey); ????????if (value != null && logger.isTraceEnabled()) { ????????????logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + ????????????????????Thread.currentThread().getName() + "]"); ????????} ????????return value; ????} |
看Object value = doGetResource(actualKey);代碼:
private static Object doGetResource(Object actualKey) { ????????Map<Object, Object> map = resources.get(); ????????if (map == null) { ????????????return null; ????????} ????????Object value = map.get(actualKey); ????????// Transparently remove ResourceHolder that was marked as void... ????????if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { ????????????map.remove(actualKey); ????????????// Remove entire ThreadLocal if empty... ????????????if (map.isEmpty()) { ????????????????resources.remove(); ????????????} ????????????value = null; ????????} ????????return value; ????} |
高亮代碼,看起來就是從一個map中獲取了返回的結果,獲取的時候使用的key是上一個方法傳入的datasource。
看看這個map是什么。
private static final ThreadLocal<Map<Object, Object>> resources = ????????????new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); |
看來是ThreadLocal對象。
那么這個對象是在什么時候初始化的呢。
經過查看是在這個方法:
public static void bindResource(Object key, Object value) throws IllegalStateException { |
那么那個地方調了這個方法呢?
經過查看,又回到了DataSourceTransactionManager類:
@Override ????protected void doResume(Object transaction, Object suspendedResources) { ????????ConnectionHolder conHolder = (ConnectionHolder) suspendedResources; ????????TransactionSynchronizationManager.bindResource(this.dataSource, conHolder); ????} |
但是這個是在事務執行完畢的時候執行的,所以如果我們是第一次在當前線程執行事務,那么回到最初的代碼:
public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { ????????Object transaction = doGetTransaction(); ? ????????// Cache debug flag to avoid repeated checks. ????????boolean debugEnabled = logger.isDebugEnabled(); ? ????????if (definition == null) { ????????????// Use defaults if no transaction definition given. ????????????definition = new DefaultTransactionDefinition(); ????????} ? ????????if (isExistingTransaction(transaction)) { ????????????// Existing transaction found -> check propagation behavior to find out how to behave. ????????????return handleExistingTransaction(definition, transaction, debugEnabled); ????????} ? ????????// Check definition settings for new transaction. ????????if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { ????????????throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); ????????} ? ????????// No existing transaction found -> check propagation behavior to find out how to proceed. ????????if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { ????????????throw new IllegalTransactionStateException( ????????????????????"No existing transaction found for transaction marked with propagation 'mandatory'"); ????????} ????????else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || ????????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || ????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { ????????????SuspendedResourcesHolder suspendedResources = suspend(null); ????????????if (debugEnabled) { ????????????????logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); ????????????} ????????????try { ????????????????boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); ????????????????DefaultTransactionStatus status = newTransactionStatus( ????????????????????????definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); ????????????????doBegin(transaction, definition); ????????????????prepareSynchronization(status, definition); ????????????????return status; ????????????} ????????????catch (RuntimeException ex) { ????????????????resume(null, suspendedResources); ????????????????throw ex; ????????????} ????????????catch (Error err) { ????????????????resume(null, suspendedResources); ????????????????throw err; ????????????} ????????} ????????else { ????????????// Create "empty" transaction: no actual transaction, but potentially synchronization. ????????????boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); ????????????return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); ????????} ????} |
Object transaction = doGetTransaction();
這里的transaction中應該是沒有connection的。
繼續往下看:
if (isExistingTransaction(transaction)) { ????????????// Existing transaction found -> check propagation behavior to find out how to behave. ????????????return handleExistingTransaction(definition, transaction, debugEnabled); ????????} |
其中,isExistingTransaction:
@Override ????protected boolean isExistingTransaction(Object transaction) { ????????DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; ????????return (txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive()); ????} |
?
這是是判斷txObject種有沒有ConnectionHolder,也就是當前線程是否已經執行過事務。
我們忽略有的情況,主要看沒有的情況,也就是說當前線程第一次處理事務的情況。
繼續看最初的代碼,主要看這段:
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || ????????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || ????????????definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { ????????????SuspendedResourcesHolder suspendedResources = suspend(null); ????????????if (debugEnabled) { ????????????????logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); ????????????} ????????????try { ????????????????boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); ???????????????? ????????????????????????definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); DefaultTransactionStatus status = newTransactionStatus( ????????????????doBegin(transaction, definition); ????????????????prepareSynchronization(status, definition); ????????????????return status; ????????????} ????????????catch (RuntimeException ex) { ????????????????resume(null, suspendedResources); ????????????????throw ex; ????????????} ????????????catch (Error err) { ????????????????resume(null, suspendedResources); ????????????????throw err; ????????????} ????????} |
看doBegin(transaction, definition);
@Override ????protected void doBegin(Object transaction, TransactionDefinition definition) { ????????DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; ????????Connection con = null; ? ????????try { ????????????if (txObject.getConnectionHolder() == null || ????????????????????txObject.getConnectionHolder().isSynchronizedWithTransaction()) { ????????????????Connection newCon = this.dataSource.getConnection(); ????????????????if (logger.isDebugEnabled()) { ????????????????????logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); ????????????????} ????????????????txObject.setConnectionHolder(new ConnectionHolder(newCon), true); ????????????} ? ????????????txObject.getConnectionHolder().setSynchronizedWithTransaction(true); ????????????con = txObject.getConnectionHolder().getConnection(); ? ????????????Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); ????????????txObject.setPreviousIsolationLevel(previousIsolationLevel); ? ????????????// Switch to manual commit if necessary. This is very expensive in some JDBC drivers, ????????????// so we don't want to do it unnecessarily (for example if we've explicitly ????????????// configured the connection pool to set it already). ????????????if (con.getAutoCommit()) { ????????????????txObject.setMustRestoreAutoCommit(true); ????????????????if (logger.isDebugEnabled()) { ????????????????????logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); ????????????????} ????????????????con.setAutoCommit(false); ????????????} ????????????txObject.getConnectionHolder().setTransactionActive(true); ? ????????????int timeout = determineTimeout(definition); ????????????if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { ????????????????txObject.getConnectionHolder().setTimeoutInSeconds(timeout); ????????????} ? ????????????// Bind the session holder to the thread. ????????????if (txObject.isNewConnectionHolder()) { ????????????????TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); ????????????} ????????} ? ????????catch (Throwable ex) { ????????????if (txObject.isNewConnectionHolder()) { ????????????????DataSourceUtils.releaseConnection(con, this.dataSource); ????????????????txObject.setConnectionHolder(null, false); ????????????} ????????????throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); ????????} ????} |
這里新建了一個Connection,并且將這個Connection綁定到了TransactionSynchronizationManager中,也就是上面的:
private static final ThreadLocal<Map<Object, Object>> resources = ????????????new NamedThreadLocal<Map<Object, Object>>("Transactional resources"); |
?
至此,我們只需要確定,我們使用jdbcTemplate.update的時候,connection也是從TransactionSynchronizationManager獲取的就好。
在JdbcTemplate中,我們找到它使用獲得Connection的方式是:
Connection con = DataSourceUtils.getConnection(getDataSource()); |
也就是:
public static Connection doGetConnection(DataSource dataSource) throws SQLException { ????????Assert.notNull(dataSource, "No DataSource specified"); ? ????????ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource); ????????if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) { ????????????conHolder.requested(); ????????????if (!conHolder.hasConnection()) { ????????????????logger.debug("Fetching resumed JDBC Connection from DataSource"); ????????????????conHolder.setConnection(dataSource.getConnection()); ????????????} ????????????return conHolder.getConnection(); ????????} ????????// Else we either got no holder or an empty thread-bound holder here. ? ????????logger.debug("Fetching JDBC Connection from DataSource"); ????????Connection con = dataSource.getConnection(); ? ????????if (TransactionSynchronizationManager.isSynchronizationActive()) { ????????????logger.debug("Registering transaction synchronization for JDBC Connection"); ????????????// Use same Connection for further JDBC actions within the transaction. ????????????// Thread-bound object will get removed by synchronization at transaction completion. ????????????ConnectionHolder holderToUse = conHolder; ????????????if (holderToUse == null) { ????????????????holderToUse = new ConnectionHolder(con); ????????????} ????????????else { ????????????????holderToUse.setConnection(con); ????????????} ????????????holderToUse.requested(); ????????????TransactionSynchronizationManager.registerSynchronization( ????????????????????new ConnectionSynchronization(holderToUse, dataSource)); ????????????holderToUse.setSynchronizedWithTransaction(true); ????????????if (holderToUse != conHolder) { ????????????????TransactionSynchronizationManager.bindResource(dataSource, holderToUse); ????????????} ????????} ? ????????return con; ????} |
?
至此,可以發現:JdbcTemplate在執行sql的時候獲取的Conncetion和Transaction的doBegin獲取的Conncetion都是從TransactionSynchronizationManager獲取的。也就是一個線程對一個Datasource只保持了一個Conn。
?
這里才發現我的理解錯誤了。我原以為只要是使用DataSource的getConnection執行的sql都可以被Spring事務管理,還以為Spring對DataSource使用了裝飾器模式添加了邏輯,原來是我想錯了,只有使用Spirng的JdbcTemplate或者DataSourceUtils.getConnection類獲得的連接才會被Spring事務管理。
如下代碼:
?
@Transactional ????public void transactionTest() throws SQLException { ???????? ????????Connection conn = DataSourceUtils.getConnection(ds); ????????try { ????????????PreparedStatement st = conn.prepareStatement("update t_person t set t.age = ? where t.id = 1"); ????????????st.setInt(1, 1000); ????????????st.execute(); ????????????throw new RuntimeException(); ????????} ????????finally{ ????????????//conn.close(); ????????} ???????? } |
?
因為最后拋出了RuntimeException,測試結果顯示,最終Spring會將這個事務回滾。
注意注釋的那句代碼,常理來說我們應該執行關閉,但是關閉之后Spring怎么執行rollback呢,如果放開這句代碼,其實Spring仍然可以執行rollback,因為close只是將conn還給連接池,并沒有真正的釋放鏈接。但是如果遇到連接真的被關閉,那么在關閉的時候會觸發自動提交。所以這里還是不要關閉。交給Spring事務去關閉。
這種寫法很難理解,所以盡量不要使用吧。
?
如果改為:
Connection conn = ds.getConnection();
經過測試,不能回滾。
使用jdbcTemp的方式很簡潔,而且能正常回滾:
jdbcTemplate.execute("update t_person t set t.age = 800 where t.id = 1");
hrow new RuntimeException();
轉載于:https://www.cnblogs.com/xiaolang8762400/p/7407283.html
總結
以上是生活随笔為你收集整理的Spring事务源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka消息模拟器
- 下一篇: maven jetty 插件 允许修改