javascript
Spring整合mybatis中的sqlSession是如何做到线程隔离的?
轉(zhuǎn)載自??Spring整合mybatis中的sqlSession是如何做到線程隔離的?
項(xiàng)目中常常使用mybatis配合spring進(jìn)行數(shù)據(jù)庫(kù)操作,但是我們知道,數(shù)據(jù)的操作是要求做到線程安全的,而且按照原來的jdbc的使用方式,每次操作完成之后都要將連接關(guān)閉,但是實(shí)際使用中我們并沒有這么干。
更讓人疑惑的點(diǎn)是,spring中默認(rèn)使用單例形式來加載bean,而往往我們也不會(huì)改變這種默認(rèn),所以,是所有線程共享數(shù)據(jù)連接?
讓我們來看看真相!
自然是要個(gè)栗子的:
我們來看下spring中配置mybatis數(shù)據(jù)庫(kù)操作bean(使用 druid 連接池):
????<bean?id="dataSource"?class="com.alibaba.druid.pool.DruidDataSource"><property?name="url"?value="${jdbc.url}"?/><property?name="driverClassName"?value="${jdbc.driver}"?/><property?name="username"?value="${jdbc.username}"?/><property?name="password"?value="${jdbc.password}"?/></bean><bean?id="sqlSessionFactory"?class="org.mybatis.spring.SqlSessionFactoryBean"><property?name="dataSource"?ref="dataSource"?/><property?name="configLocation"?value="classpath:mybatis-config.xml"?/></bean><!--?scope="prototype"?另說,另討論,我們先以mapper形式看一下?--><bean?id="sqlSession"?class="org.mybatis.spring.SqlSessionTemplate"><constructor-arg?index="0"?ref="sqlSessionFactory"?/></bean><!--?事務(wù)?--><bean?name="transactionManager"class="org.springframework.jdbc.datasource.DataSourceTransactionManager"><property?name="dataSource"?ref="dataSource"></property></bean>而在java代碼中使用則是使用依賴注入直接使用 @resource sqlSession, 如下:
????@Resourceprivate?SqlSessionTemplate?sqlSession;@Overridepublic?User?getUser(Map<String,?String>?cond)?{//?此句執(zhí)行db查詢User?result?=?sqlSession.selectOne(NAME_SPACE+?".getUser",?cond);return?result;}這個(gè)sqlSession就是直接去操作數(shù)據(jù)庫(kù)了看起來是這樣,是在bean初始化的時(shí)候依賴注入的!
所以,難道每次進(jìn)入該操作的時(shí)候,sqlSession 的實(shí)例都會(huì)變化嗎?答案是否定的。
那么,肯定就是往下使用的時(shí)候才發(fā)生的變化唄!
再往下走,可以看到,調(diào)用了一個(gè)代理來進(jìn)行具體的查詢!
??//?org/mybatis/spring/SqlSessionTemplate.selectOne()public?<T>?T?selectOne(String?statement,?Object?parameter)?{return?this.sqlSessionProxy.<T>?selectOne(statement,?parameter);}為啥要用代理呢?自己直接查不就行了嗎?其實(shí),用代理是有好處的,那就可以可以進(jìn)行另外的包裝!
代理是怎么生成的呢?其實(shí)只要看一下 SqlSessionTemplate 的構(gòu)造方法就知道了!
/***?Constructs?a?Spring?managed?{@code?SqlSession}?with?the?given*?{@code?SqlSessionFactory}?and?{@code?ExecutorType}.*?A?custom?{@code?SQLExceptionTranslator}?can?be?provided?as?an*?argument?so?any?{@code?PersistenceException}?thrown?by?MyBatis*?can?be?custom?translated?to?a?{@code?RuntimeException}*?The?{@code?SQLExceptionTranslator}?can?also?be?null?and?thus?no*?exception?translation?will?be?done?and?MyBatis?exceptions?will?be*?thrown**?@param?sqlSessionFactory*?@param?executorType*?@param?exceptionTranslator*/public?SqlSessionTemplate(SqlSessionFactory?sqlSessionFactory,?ExecutorType?executorType,PersistenceExceptionTranslator?exceptionTranslator)?{notNull(sqlSessionFactory,?"Property?'sqlSessionFactory'?is?required");notNull(executorType,?"Property?'executorType'?is?required");this.sqlSessionFactory?=?sqlSessionFactory;this.executorType?=?executorType;this.exceptionTranslator?=?exceptionTranslator;//?生成代理?SqlSessionInterceptor?為?InvocationHandlerthis.sqlSessionProxy?=?(SqlSession)?newProxyInstance(SqlSessionFactory.class.getClassLoader(),new?Class[]?{?SqlSession.class?},new?SqlSessionInterceptor());}從上面的代碼,看不到細(xì)節(jié),但是,大致還是知道代理的具體實(shí)現(xiàn)了!即使用 SqlSessionInterceptor 去處理具體查詢邏輯!
我們來看下 SqlSessionInterceptor 的實(shí)現(xiàn)!
/***?Proxy?needed?to?route?MyBatis?method?calls?to?the?proper?SqlSession?got*?from?Spring's?Transaction?Manager*?It?also?unwraps?exceptions?thrown?by?{@code?Method#invoke(Object,?Object...)}?to*?pass?a?{@code?PersistenceException}?to?the?{@code?PersistenceExceptionTranslator}.*/private?class?SqlSessionInterceptor?implements?InvocationHandler?{public?Object?invoke(Object?proxy,?Method?method,?Object[]?args)?throws?Throwable?{SqlSession?sqlSession?=?getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,SqlSessionTemplate.this.executorType,SqlSessionTemplate.this.exceptionTranslator);try?{Object?result?=?method.invoke(sqlSession,?args);if?(!isSqlSessionTransactional(sqlSession,?SqlSessionTemplate.this.sqlSessionFactory))?{//?force?commit?even?on?non-dirty?sessions?because?some?databases?require//?a?commit/rollback?before?calling?close()sqlSession.commit(true);}return?result;}?catch?(Throwable?t)?{Throwable?unwrapped?=?unwrapThrowable(t);if?(SqlSessionTemplate.this.exceptionTranslator?!=?null?&&?unwrapped?instanceof?PersistenceException)?{//?release?the?connection?to?avoid?a?deadlock?if?the?translator?is?no?loaded.?See?issue?#22closeSqlSession(sqlSession,?SqlSessionTemplate.this.sqlSessionFactory);sqlSession?=?null;Throwable?translated?=?SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException)?unwrapped);if?(translated?!=?null)?{unwrapped?=?translated;}}throw?unwrapped;}?finally?{if?(sqlSession?!=?null)?{closeSqlSession(sqlSession,?SqlSessionTemplate.this.sqlSessionFactory);}}}}SqlSessionInterceptor 是 SqlSessionTemplate 的內(nèi)部類,目的只有一個(gè),就是處理多個(gè) session 的db操作!
所有請(qǐng)求都被 invoke() 攔截,從而做相應(yīng)處理:
-
進(jìn)入請(qǐng)求,先生成一個(gè)新的sqlSession,為本次db操作做準(zhǔn)備;
-
通過反射調(diào)用請(qǐng)求進(jìn)來的方法,將 sqlSession 回調(diào),進(jìn)行復(fù)雜查詢及結(jié)果映射;
-
如果需要立即提交事務(wù),do it;
-
如果出現(xiàn)異常,包裝異常信息,重新拋出;
-
操作完成后,關(guān)閉本次session;
到這里,其實(shí)我們好像已經(jīng)明白了,其實(shí)外面的 sqlSession 單例,并不會(huì)影響具體的db操作控制,所以不用擔(dān)心session的線程安全問題!
不過,還有個(gè)點(diǎn)值得考慮下,如果我一次請(qǐng)求里有多次數(shù)據(jù)庫(kù)操作,難道我真的要?jiǎng)?chuàng)建多個(gè)sqlSession或者說數(shù)據(jù)庫(kù)連接?不會(huì)吧!
如果這個(gè)問題得不到解決,可能你并不真正了解session的定義了!
所以我們需要繼續(xù)看一下 session 到底是怎么獲取的?
getSqlSession() 方法是在 SqlSessionUtils 中實(shí)現(xiàn)的!如下:
/***?Gets?an?SqlSession?from?Spring?Transaction?Manager?or?creates?a?new?one?if?needed.*?Tries?to?get?a?SqlSession?out?of?current?transaction.?If?there?is?not?any,?it?creates?a?new?one.*?Then,?it?synchronizes?the?SqlSession?with?the?transaction?if?Spring?TX?is?active?and*?<code>SpringManagedTransactionFactory</code>?is?configured?as?a?transaction?manager.**?@param?sessionFactory?a?MyBatis?{@code?SqlSessionFactory}?to?create?new?sessions*?@param?executorType?The?executor?type?of?the?SqlSession?to?create*?@param?exceptionTranslator?Optional.?Translates?SqlSession.commit()?exceptions?to?Spring?exceptions.*?@throws?TransientDataAccessResourceException?if?a?transaction?is?active?and?the*?????????????{@code?SqlSessionFactory}?is?not?using?a?{@code?SpringManagedTransactionFactory}*?@see?SpringManagedTransactionFactory*/public?static?SqlSession?getSqlSession(SqlSessionFactory?sessionFactory,?ExecutorType?executorType,?PersistenceExceptionTranslator?exceptionTranslator)?{notNull(sessionFactory,?"No?SqlSessionFactory?specified");notNull(executorType,?"No?ExecutorType?specified");SqlSessionHolder?holder?=?(SqlSessionHolder)?TransactionSynchronizationManager.getResource(sessionFactory);//?如果已經(jīng)有holder,則直接返回,復(fù)用連接if?(holder?!=?null?&&?holder.isSynchronizedWithTransaction())?{if?(holder.getExecutorType()?!=?executorType)?{throw?new?TransientDataAccessResourceException("Cannot?change?the?ExecutorType?when?there?is?an?existing?transaction");}holder.requested();if?(logger.isDebugEnabled())?{logger.debug("Fetched?SqlSession?["?+?holder.getSqlSession()?+?"]?from?current?transaction");}return?holder.getSqlSession();}if?(logger.isDebugEnabled())?{logger.debug("Creating?a?new?SqlSession");}SqlSession?session?=?sessionFactory.openSession(executorType);//?Register?session?holder?if?synchronization?is?active?(i.e.?a?Spring?TX?is?active)////?Note:?The?DataSource?used?by?the?Environment?should?be?synchronized?with?the//?transaction?either?through?DataSourceTxMgr?or?another?tx?synchronization.//?Further?assume?that?if?an?exception?is?thrown,?whatever?started?the?transaction?will//?handle?closing?/?rolling?back?the?Connection?associated?with?the?SqlSession.if?(TransactionSynchronizationManager.isSynchronizationActive())?{Environment?environment?=?sessionFactory.getConfiguration().getEnvironment();if?(environment.getTransactionFactory()?instanceof?SpringManagedTransactionFactory)?{if?(logger.isDebugEnabled())?{logger.debug("Registering?transaction?synchronization?for?SqlSession?["?+?session?+?"]");}holder?=?new?SqlSessionHolder(session,?executorType,?exceptionTranslator);TransactionSynchronizationManager.bindResource(sessionFactory,?holder);TransactionSynchronizationManager.registerSynchronization(new?SqlSessionSynchronization(holder,?sessionFactory));holder.setSynchronizedWithTransaction(true);holder.requested();}?else?{if?(TransactionSynchronizationManager.getResource(environment.getDataSource())?==?null)?{if?(logger.isDebugEnabled())?{logger.debug("SqlSession?["?+?session?+?"]?was?not?registered?for?synchronization?because?DataSource?is?not?transactional");}}?else?{throw?new?TransientDataAccessResourceException("SqlSessionFactory?must?be?using?a?SpringManagedTransactionFactory?in?order?to?use?Spring?transaction?synchronization");}}}?else?{if?(logger.isDebugEnabled())?{logger.debug("SqlSession?["?+?session?+?"]?was?not?registered?for?synchronization?because?synchronization?is?not?active");}}return?session;}如上獲取 sqlSession 邏輯,主要分兩種情況!
如果存在holder,則返回原有的sqlSession,到于這個(gè)holder我們稍后再說;
如果沒有,則創(chuàng)建一個(gè)新連接!
所以,看起來情況還不是太糟,至少有復(fù)用的概念了!
那么問題來了,復(fù)用?如何做到線程安全?所以我們要看下 SqlSessionHolder 的實(shí)現(xiàn)了!
獲取holder是通過 TransactionSynchronizationManager.getResource(sessionFactory); 獲取的:
public?static?Object?getResource(Object?key)?{Object?actualKey?=?TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//?實(shí)際獲取Object?value?=?doGetResource(actualKey);if?(value?!=?null?&&?logger.isTraceEnabled())?{logger.trace("Retrieved?value?["?+?value?+?"]?for?key?["?+?actualKey?+?"]?bound?to?thread?["?+Thread.currentThread().getName()?+?"]");}return?value;}private?static?Object?doGetResource(Object?actualKey)?{Map<Object,?Object>?map?=?resources.get();if?(map?==?null)?{return?null;}Object?value?=?map.get(actualKey);//?Transparently?remove?ResourceHolder?that?was?marked?as?void...if?(value?instanceof?ResourceHolder?&&?((ResourceHolder)?value).isVoid())?{map.remove(actualKey);//?Remove?entire?ThreadLocal?if?empty...if?(map.isEmpty())?{resources.remove();}value?=?null;}return?value;}咱們忽略對(duì) key 的處理,實(shí)際是直接調(diào)用 doGetResource() 獲取holder。而 doGetResource() 中,則使用了 resources 來保存具體的 kv。 resources 明顯是個(gè)共享變量,但是看起來這里沒有任何的加鎖操作!這是為何?
只要看一下 resources 的定義就知道了,其實(shí)現(xiàn)為 ThreadLocal, 所以是線程安全了!
private?static?final?ThreadLocal<Map<Object,?Object>>?resources?=new?NamedThreadLocal<Map<Object,?Object>>("Transactional?resources");在新的請(qǐng)求進(jìn)來時(shí),自然是沒有值的,所以直接返回null.而后續(xù)進(jìn)入,則獲取緩存返回!
而對(duì)于沒有獲取到 holder 的情況,則需要重新創(chuàng)建一個(gè) session 了!
這里獲取session由DefaultSqlSessionFactory 進(jìn)行創(chuàng)建!如下:
//?org.apache.ibatis.session.defaults.DefaultSqlSessionFactory.openSession()public?SqlSession?openSession(ExecutorType?execType)?{return?openSessionFromDataSource(execType,?null,?false);}private?SqlSession?openSessionFromDataSource(ExecutorType?execType,?TransactionIsolationLevel?level,?boolean?autoCommit)?{Transaction?tx?=?null;try?{final?Environment?environment?=?configuration.getEnvironment();//?SpringManagedTransactionFactoryfinal?TransactionFactory?transactionFactory?=?getTransactionFactoryFromEnvironment(environment);tx?=?transactionFactory.newTransaction(environment.getDataSource(),?level,?autoCommit);final?Executor?executor?=?configuration.newExecutor(tx,?execType);return?new?DefaultSqlSession(configuration,?executor,?autoCommit);}?catch?(Exception?e)?{closeTransaction(tx);?//?may?have?fetched?a?connection?so?lets?call?close()throw?ExceptionFactory.wrapException("Error?opening?session.??Cause:?"?+?e,?e);}?finally?{ErrorContext.instance().reset();}}創(chuàng)建 session 幾件事:
-
根據(jù)環(huán)境配置,開啟一個(gè)新事務(wù),該事務(wù)管理器會(huì)負(fù)責(zé)后續(xù)jdbc連接管理工作;
-
根據(jù)事務(wù)創(chuàng)建一個(gè) Executor,備用;
-
用DefaultSqlSession 將 executor 包裝后返回,用于后續(xù)真正的db操作;
至此,真正的 sqlSession 已經(jīng)創(chuàng)建成功!返回后,就可以真正使用了!
等等,創(chuàng)建的session好像并沒有保存,那么還是那個(gè)問題,每個(gè)sql都會(huì)創(chuàng)建一個(gè) sqlSession ? 好吧,是這樣的!前面的holder,只是用于存在事務(wù)操作的連接!(holder的理解出了偏差哦)
但是有一點(diǎn),這里雖然創(chuàng)建了多個(gè) sqlSession 實(shí)例,但是并不意味著有多個(gè)db連接,具體使用db連接時(shí),則一般會(huì)會(huì)使用連接池來進(jìn)行優(yōu)化!如前面提到的 druid 就是個(gè)不錯(cuò)的選擇!
真實(shí)的jdbc連接獲取,是在進(jìn)行真正的 query 時(shí),才進(jìn)行調(diào)用 getConnection() 進(jìn)行接入!
具體則是在 doQuery() 時(shí),進(jìn)行st的組裝時(shí)調(diào)用的 ,如下:
//?SimpleExecutor.prepareStatement()private?Statement?prepareStatement(StatementHandler?handler,?Log?statementLog)?throws?SQLException?{Statement?stmt;//?獲取?jdbc?連接,返回?java.sql.ConnectionConnection?connection?=?getConnection(statementLog);stmt?=?handler.prepare(connection);handler.parameterize(stmt);return?stmt;}//?調(diào)用?BaseExecutor.getConnection()protected?Connection?getConnection(Log?statementLog)?throws?SQLException?{//?SpringManagedTransaction?管理?connectionConnection?connection?=?transaction.getConnection();if?(statementLog.isDebugEnabled())?{return?ConnectionLogger.newInstance(connection,?statementLog,?queryStack);}?else?{return?connection;}}通過前面通過事務(wù)管理工廠創(chuàng)建的 SpringManagedTransaction 進(jìn)行 connection 獲取!一個(gè)事務(wù)管理器只會(huì)存在一次獲取數(shù)據(jù)庫(kù)連接的操作!
public?Connection?getConnection()?throws?SQLException?{if?(this.connection?==?null)?{openConnection();}return?this.connection;}//?而?SpringManagedTransaction?又將connection交由?DataSourceUtils?進(jìn)行管理!//?org/springframework/jdbc/datasource/DataSourceUtilspublic?static?Connection?getConnection(DataSource?dataSource)?throws?CannotGetJdbcConnectionException?{try?{//?真正的連接獲取return?doGetConnection(dataSource);}catch?(SQLException?ex)?{throw?new?CannotGetJdbcConnectionException("Could?not?get?JDBC?Connection",?ex);}}/***?Actually?obtain?a?JDBC?Connection?from?the?given?DataSource.*?Same?as?{@link?#getConnection},?but?throwing?the?original?SQLException.*?<p>Is?aware?of?a?corresponding?Connection?bound?to?the?current?thread,?for?example*?when?using?{@link?DataSourceTransactionManager}.?Will?bind?a?Connection?to?the?thread*?if?transaction?synchronization?is?active?(e.g.?if?in?a?JTA?transaction).*?<p>Directly?accessed?by?{@link?TransactionAwareDataSourceProxy}.*?@param?dataSource?the?DataSource?to?obtain?Connections?from*?@return?a?JDBC?Connection?from?the?given?DataSource*?@throws?SQLException?if?thrown?by?JDBC?methods*?@see?#doReleaseConnection*/public?static?Connection?doGetConnection(DataSource?dataSource)?throws?SQLException?{Assert.notNull(dataSource,?"No?DataSource?specified");ConnectionHolder?conHolder?=?(ConnectionHolder)?TransactionSynchronizationManager.getResource(dataSource);if?(conHolder?!=?null?&&?(conHolder.hasConnection()?||?conHolder.isSynchronizedWithTransaction()))?{conHolder.requested();if?(!conHolder.hasConnection())?{logger.debug("Fetching?resumed?JDBC?Connection?from?DataSource");conHolder.setConnection(dataSource.getConnection());}return?conHolder.getConnection();}//?Else?we?either?got?no?holder?or?an?empty?thread-bound?holder?here.logger.debug("Fetching?JDBC?Connection?from?DataSource");//?通過接入的dataSource進(jìn)行連接獲取,這里將會(huì)是最終的jdbc連接Connection?con?=?dataSource.getConnection();if?(TransactionSynchronizationManager.isSynchronizationActive())?{logger.debug("Registering?transaction?synchronization?for?JDBC?Connection");//?Use?same?Connection?for?further?JDBC?actions?within?the?transaction.//?Thread-bound?object?will?get?removed?by?synchronization?at?transaction?completion.ConnectionHolder?holderToUse?=?conHolder;if?(holderToUse?==?null)?{holderToUse?=?new?ConnectionHolder(con);}else?{holderToUse.setConnection(con);}holderToUse.requested();TransactionSynchronizationManager.registerSynchronization(new?ConnectionSynchronization(holderToUse,?dataSource));holderToUse.setSynchronizedWithTransaction(true);if?(holderToUse?!=?conHolder)?{TransactionSynchronizationManager.bindResource(dataSource,?holderToUse);}}return?con;}上面的實(shí)現(xiàn)主要做三件事:
再次確認(rèn),是否存在事務(wù)處理,holder是否存在,如果有則復(fù)用;
如果沒有,那再?gòu)臄?shù)據(jù)源處獲取連接;
獲取新連接成功后,檢查如果存在事務(wù),則將新獲取的連接放入holder中保存起來,以備下次使用;
獲取jdbc連接后,就可以真正發(fā)起execute()查詢了。
數(shù)據(jù)庫(kù)連接的疑問算是解答了!我們發(fā)現(xiàn),外部的框架并沒有多少為我們節(jié)省db連接的動(dòng)作!而是把最終 getConnection() 交給 datasource 數(shù)據(jù)源!
而真正解決我們連接復(fù)用的問題的,是像 Druid 這樣的連接池組件!所以,咱們可以單獨(dú)來看這些中間件了!
?
總結(jié)
以上是生活随笔為你收集整理的Spring整合mybatis中的sqlSession是如何做到线程隔离的?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 舒兰市属于哪个省市 舒兰市在哪
- 下一篇: 心术宗小满闯祸是几集 了解一下该集的剧情