javascript
MyBatis—Spring 动态数据源事务的处理
在一般的 Spring 應用中,如果底層數據庫訪問采用的是 MyBatis,那么在大多數情況下,只使用一個單獨的數據源,Spring 的事務管理在大多數情況下都是有效的。然而,在一些復雜的業務場景下,如需要在某一時刻訪問不同的數據庫,由于 Spring 對于事務管理實現的方式,可能不能達到預期的效果。本文將簡要介紹 Spring 中事務的實現方式,并對以 MyBatis 為底層數據庫訪問的系統為例,提供多數據源事務處理的解決方案
Spring 事務的實現原理
常見地,在 Spring 中添加事務的方式通常都是在對應的方法或類上加上 @Transactional 注解顯式地將這部分處理加上事務,對于 @Transactional 注解,Spring 會在 org.springframework.transaction.annotation.AnnotationTransactionAttributeSource 定義方法攔截的匹配規則(即 AOP 部分中的 PointCut),而具體的處理邏輯(即 AOP 中的 Advice)則是在 org.springframework.transaction.interceptor.TransactionInterceptor 中定義
具體事務執行的調用鏈路如下
Spring 對于事務切面采取的具體行為實現如下:
public class TransactionInterceptor
extends TransactionAspectSupport
implements MethodInterceptor, Serializable {
// 這里的方法定義為 MethodInterceptor,即 AOP 實際調用點
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
// invokeWithinTransaction 為父類 TransactionAspectSupport 定義的方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
}
繼續進入 TransactionAspectSupport 的 invokeWithinTransaction 方法:
public abstract class TransactionAspectSupport
implements BeanFactoryAware, InitializingBean {
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 省略響應式事務和編程式事務的處理邏輯
// 當前事務管理的實際
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
/*
檢查在當前的執行上下文中,是否需要創建新的事務,這是因為當前執行的業務處理可能在上一個已經開始
的事務處理中
*/
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation(); // 實際業務代碼的業務處理
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex); // 出現異常的回滾處理
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 如果沒有出現異常,則提交本次事務
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
}
在獲取事務信息對象時,首先需要獲取到對應的事務狀態對象 TransactionStatus,這個狀態對象決定了 Spring 后續要對當前事務采取的何種行為,具體代碼在 org.springframework.transaction.support.AbstractPlatformTransactionManager#getTransaction
// 這里的 definition 是通過解析 @Transactional 注解中的屬性得到的配置對象
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
/*
這里獲取事務相關的對象(如持有的數據庫連接等),具體由子類來定義相關的實現
*/
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果當前已經在一個事務中,那么需要按照定義的屬性采取對應的行為
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 需要重新開啟一個新的事務的情況,具體在 org.springframework.transaction.TransactionDefinition 有相關的定義
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 開啟一個新的事務
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
在 AbstractPlatformTransactionManager 中已經定義了事務處理的大體框架,而實際的事務實現則交由具體的子類實現,在一般情況下,由 org.springframework.jdbc.datasource.DataSourceTransactionManager 采取具體的實現
主要關注的點在于對于事務信息對象的創建,事務的開啟、提交回滾操作,具體對應的代碼如下:
事務信息對象的創建代碼:
protected Object doGetTransaction() {
/*
簡單地理解,DataSourceTransactionObject 就是一個持有數據庫連接的資源對象
*/
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
/*
TransactionSynchronizationManager 是用于管理在事務執行過程相關的信息對象的一個工具類,基本上
這個類持有的事務信息貫穿了整個 Spring 事務管理
*/
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
開啟事務對應的源代碼:
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
/*
如果當前事務對象沒有持有數據庫連接,則需要從對應的 DataSource 中獲取對應的連接
*/
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
/*
由于當前的事務已經交由 Spring 進行管理,那么在這種情況下,原有數據庫連接的自動提交
必須是關閉的,因為如果開啟了自動提交,那么實際上就相當于每一次的 SQL 都會執行一次事務的提交,
這種情況下事務的管理沒有意義
*/
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
/*
如果是新創建的事務,那么需要綁定這個數據庫連接對象到這個事務中,使得后續再進來的業務處理
能夠順利地進入原有的事務中
*/
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
事務提交相關的代碼:
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
/*
一些事務提交時的鉤子方法,使得第三方的數據庫持久話框架(如 MyBatis)的
事務能夠被 Spring 管理
*/
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
// 事務正常提交后的鉤子方法
triggerAfterCommit(status);
}
finally {
// 事務正常提交后有關資源清理的鉤子方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
事務回滾的相關代碼:
private void doRollbackOnCommitException(DefaultTransactionStatus status, Throwable ex) throws TransactionException {
try {
if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback after commit exception", ex);
}
doRollback(status);
}
else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Marking existing transaction as rollback-only after commit exception", ex);
}
doSetRollbackOnly(status);
}
}
catch (RuntimeException | Error rbex) {
logger.error("Commit exception overridden by rollback exception", ex);
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw rbex;
}
// 一些事務相關的鉤子方法
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
MyBatis 與 Spring 事務的整合
在 MyBatis 中,實際獲取連接是通過 BaseExecutor 中 Transaction 屬性來獲取對應的連接,實際上 MyBatis 執行時并不會意識到當前上下文是否處于一個事務中,在整合到 Spring 的過程中,默認的 Transaction 實現類為 org.mybatis.spring.transaction.SpringManagedTransaction:
public class SpringManagedTransaction implements Transaction {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringManagedTransaction.class);
private final DataSource dataSource;
private Connection connection;
private boolean isConnectionTransactional;
private boolean autoCommit;
public SpringManagedTransaction(DataSource dataSource) {
notNull(dataSource, "No DataSource specified");
this.dataSource = dataSource;
}
/**
* {@inheritDoc}
*/
@Override
public Connection getConnection() throws SQLException {
if (this.connection == null) {
openConnection();
}
return this.connection;
}
/*
從當前的數據源對象 dataSource 中獲取一個連接對象,而結合上文 Spring 中對于事務的處理,如果已經將
dataSource 屬性綁定到了當前的線程,那么在這里就會獲取到原有創建事務時已經創建的連接,而不是從頭重新生成一個連接
*/
private void openConnection() throws SQLException {
this.connection = DataSourceUtils.getConnection(this.dataSource);
this.autoCommit = this.connection.getAutoCommit();
/*
這里的目的是為了處理 MyBatis 部分關于事務提交的處理,因為 MyBatis 會將自己的事務處理放入到 Spring 事務中的
鉤子方法中進行處理,如果此時持有的連接對象與整個 Spring 事務持有的連接對象一致時,由于 MyBatis 的事務提交會
早于 Spring 的事務提交(triggerBeforeCommit() 鉤子方法),從而導致 Spring 在提交事務時出現事務重復提交的異常
*/
this.isConnectionTransactional = DataSourceUtils.isConnectionTransactional(this.connection, this.dataSource);
LOGGER.debug(() -> "JDBC Connection [" + this.connection + "] will"
+ (this.isConnectionTransactional ? " " : " not ") + "be managed by Spring");
}
@Override
public void commit() throws SQLException {
if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
LOGGER.debug(() -> "Committing JDBC Connection [" + this.connection + "]");
this.connection.commit();
}
}
@Override
public void rollback() throws SQLException {
if (this.connection != null && !this.isConnectionTransactional && !this.autoCommit) {
LOGGER.debug(() -> "Rolling back JDBC Connection [" + this.connection + "]");
this.connection.rollback();
}
}
@Override
public void close() throws SQLException {
DataSourceUtils.releaseConnection(this.connection, this.dataSource);
}
@Override
public Integer getTimeout() throws SQLException {
ConnectionHolder holder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (holder != null && holder.hasTimeout()) {
return holder.getTimeToLiveInSeconds();
}
return null;
}
}
而 MyBatis 對于 Transaction 中的提交處理,需要將其整合到 Spring 中,是通過向 TransactionSynchronizationManager 注冊 TransactionSynchronization 來實現的,在 MyBatis 中,實際的實現類為 SqlSessionSynchronization:
private static final class SqlSessionSynchronization extends TransactionSynchronizationAdapter {
private final SqlSessionHolder holder; // 當前持有的 SqlSession
/*
用于綁定到 TransactionSynchronizationManager 的 Key 對象,由于 Spring 對于 Bean 的單例處理,實際上每次
都是唯一的 SqlSessionFactory 實例對象,因此在 TransactionSynchronizationManager 中的 ThreadLocal 可以通過
這個對象找到當前線程綁定的實際 Value 對象
*/
private final SqlSessionFactory sessionFactory;
private boolean holderActive = true;
public SqlSessionSynchronization(SqlSessionHolder holder, SqlSessionFactory sessionFactory) {
notNull(holder, "Parameter 'holder' must be not null");
notNull(sessionFactory, "Parameter 'sessionFactory' must be not null");
this.holder = holder;
this.sessionFactory = sessionFactory;
}
@Override
public int getOrder() {
// order right before any Connection synchronization
return DataSourceUtils.CONNECTION_SYNCHRONIZATION_ORDER - 1;
}
@Override
public void suspend() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
@Override
public void resume() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
}
}
@Override
public void beforeCommit(boolean readOnly) {
/*
注意 Spring 事務中的 triggerBeforeCommit() 鉤子方法,在事務提交前會依次檢查 TransactionSynchronizationManager 中綁定的 TransactionSynchronization,并在事務實際提交前(即當前事務信息是新開啟的事務)前調用每個 TransactionSynchronization 的 beforeCommit 方法
*/
if (TransactionSynchronizationManager.isActualTransactionActive()) {
try {
LOGGER.debug(() -> "Transaction synchronization committing SqlSession [" + this.holder.getSqlSession() + "]");
/*
由于 SqlSession 最終的方法調用會委托給對應的 Executor 進行處理,而 executor 的 commit()
則會繼續調用 Transaction 對象的 commit() 方法,從而實現與上文 SpringManagedTransaction 對象整合
*/
this.holder.getSqlSession().commit();
} catch (PersistenceException p) {
if (this.holder.getPersistenceExceptionTranslator() != null) {
DataAccessException translated = this.holder.getPersistenceExceptionTranslator()
.translateExceptionIfPossible(p);
if (translated != null) {
throw translated;
}
}
throw p;
}
}
}
/*
triggerBeforeCompletion() 鉤子方法
*/
@Override
public void beforeCompletion() {
// Issue #18 Close SqlSession and deregister it now
// because afterCompletion may be called from a different thread
if (!this.holder.isOpen()) {
LOGGER
.debug(() -> "Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(sessionFactory);
this.holderActive = false;
LOGGER.debug(() -> "Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
this.holder.getSqlSession().close();
}
}
/*
triggerAfterCompletion() 鉤子方法,主要是為了清理相關 ThreadLocal 綁定的資源對象
*/
@Override
public void afterCompletion(int status) {
if (this.holderActive) {
// afterCompletion may have been called from a different thread
// so avoid failing if there is nothing in this one
LOGGER
.debug(() -> "Transaction synchronization deregistering SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResourceIfPossible(sessionFactory);
this.holderActive = false;
LOGGER.debug(() -> "Transaction synchronization closing SqlSession [" + this.holder.getSqlSession() + "]");
this.holder.getSqlSession().close();
}
this.holder.reset();
}
}
為了使得 MyBatis 在執行的過程中能夠 Spring 進行管理,因此需要代理實際執行的 SqlSession,實際執行類為 SqlSessionTemplate,在執行的過程中,實際行為在 SqlSessionInterceptor 中定義:
// InvocationHandler 為 JDK 動態代理的部分,定義了代理類需要采取的相關行為
private class SqlSessionInterceptor implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
/*
getSqlSession 為 SqlSessionUtil 的靜態方法,實際上在執行過程中也是通過 TransactionSynchronizationManager 來感知當前上下文所處的事務信息,當處于同一個事務中時,則會通過 sqlSessionFactory
作為 key 來獲取之前的 SqlSession,從而保證事務的正常運行
*/
SqlSession sqlSession = getSqlSession(SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType, SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
// force commit even on non-dirty sessions because some databases require
// a commit/rollback before calling close()
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
// 省略部分異常處理代碼
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}
}
getSqlSession 是 SqlSessionUtil 的靜態方法,實際源代碼如下所示:
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory,
ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);
/*
如果能在 TransactionSynchronizationManager 中找到和當前 SqlSessionFactory 綁定的 SqlSession
信息,則說明當前可能處于一個事務中
*/
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
/*
執行到這里,說明要么此時是第一次進入事務,或者當前的執行方式是以非事務的形式執行的,但無論是那種形式,都需要創建一個新的 SqlSession
*/
LOGGER.debug(() -> "Creating a new SqlSession");
session = sessionFactory.openSession(executorType);
/*
如果當前是以事務的形式執行的,則需要將創建的 SqlSession 注冊到當前事務上下文中
*/
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
private static SqlSession sessionHolder(ExecutorType executorType, SqlSessionHolder holder) {
SqlSession session = null;
/*
holder 在注冊到 TransactionSynchronizationManager 中時就會將 synchronizedWithTransaction
設置為 true,因此實際上只要注冊到了 TransactionSynchronizationManager 中則說明已經在一個事務中了
*/
if (holder != null && holder.isSynchronizedWithTransaction()) {
if (holder.getExecutorType() != executorType) {
throw new TransientDataAccessResourceException(
"Cannot change the ExecutorType when there is an existing transaction");
}
holder.requested();
LOGGER.debug(() -> "Fetched SqlSession [" + holder.getSqlSession() + "] from current transaction");
session = holder.getSqlSession();
}
return session;
}
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
/*
TransactionSynchronizationManager.isSynchronizationActive() 檢查當前是否處于一個事務上下文中,這個屬性
會在創建事務的時候進行初始化
*/
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
/*
注冊 sessionFactory 到事務上下文,使得能夠被后續的處理感知
*/
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
/*
注冊一個 TransactionSynchronization,這個 TransactionSynchronization 相關的方法會在 Spring 事務的鉤子方法中被調用
*/
TransactionSynchronizationManager
.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true); // 與上面 sessionHolder 同步
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session
+ "] was not registered for synchronization because synchronization is not active");
}
具體整合關系如下圖所示:
動態數據源的處理
基本處理
一般在 Spring 中實現動態數據源都是基于 AbstractRoutingDataSource 并實現 determineCurrentLookupKey 來實現的,在實現的過程中,AbstractRoutingDataSource 會持有一個關于數據源 DataSource 的映射關系,通過 determineCurrentLookupKey 作為 key 來決定實際要采取的實際數據源。這種方式相當于多累加了一層,在一般的使用場景下可能不會有什么問題,但是當涉及到事務時,可能會出現一些不可思議的問題
假如現在我們有兩個數據源:MySQL 和 PostgreSQL,我們可以定義自己的數據源枚舉類(當然直接使用字符串也可以,但是使用枚舉會更好)DataSourceType:
public enum DataSourceType {
MYSQL,
POSTGRESQL
}
現在,我們需要在系統中定義我們自己的實際數據源,這里為了簡便,直接使用 DataSourceBuilder 的方式進行構建:
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
@Configuration
public class DataSourceConfig {
/*
MySQL 數據源
*/
@Bean(name = "mysqlDataSource")
public DataSource mysqlDataSource() {
return DataSourceBuilder.create()
.url("jdbc:mysql://127.0.0.1:3306/lxh_db")
.username("root")
.password("12345678")
.type(DruidDataSource.class)
.build();
}
/*
PostgreSQL 數據源
*/
@Bean(name = "psqlDataSource")
public DataSource psqlDataSource() {
return DataSourceBuilder.create()
.url("jdbc:postgresql://127.0.0.1:5432/lxh_db")
.username("postgres")
.password("12345678")
.type(DruidDataSource.class)
.build();
}
}
為了實現動態數據源,我們需要繼承 AbstractRoutingDataSource,并實現 determineCurrentLookupKey 方法。為了能夠動態地改變當前執行上下文的數據源類型,我們使用一個 ThreadLocal 來存儲當前需要的數據源類型:
public class DataSourceHolder {
private static final ThreadLocal<DataSourceType> dataSourceHolder = new ThreadLocal<>();
public static void setCurDataSource(DataSourceType type) {
dataSourceHolder.set(type);
}
public static DataSourceType getCurDataSource() {
return dataSourceHolder.get();
}
}
之后,我們重新定義我們自己的動態數據源類型:
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
public class DynamicDataSource
extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return DataSourceHolder.getCurDataSource();
}
}
現在我們的動態數據源還沒有實際的 DataSource 映射,因此我們在實例化 DynamicDataSource 時需要手動注冊:
import com.google.common.collect.ImmutableMap;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.util.Map;
@Configuration
public class DataSourceConfig {
/*
由于我們已經在系統中定義了多個 DataSource,因此我們需要使用 @Primary 注解來標記當前定義的 DataSource 是實際需要用到的 DataSource
*/
@Primary
@Bean(name = "dynamicDataSource")
public DataSource dynamicDataSource(@Qualifier("mysqlDataSource") DataSource mysqlDataSource,
@Qualifier("psqlDataSource") DataSource psqlDataSource) {
DynamicDataSource dataSource = new DynamicDataSource();
// 綁定目標 key 到實際數據源的映射關系,并將它們注冊到我們的動態數據源中
Map<Object, Object> dataSourceMap = ImmutableMap.builder()
.put(DataSourceType.MYSQL, mysqlDataSource)
.put(DataSourceType.POSTGRESQL, psqlDataSource)
.build();
dataSource.setTargetDataSources(dataSourceMap);
// 當通過 key 無法找到對應的數據源時,默認的數據源類型
dataSource.setDefaultTargetDataSource(mysqlDataSource);
return dataSource;
}
}
這樣做就可以使用我們的動態數據源了,在使用前,只需要調用 DataSourceHolder.setCurDataSource 來進行數據源切換即可:
public class XXService {
@Resource
private BBService bbService;
public void handler() {
DataSourceType prevType = DataSourceHolder.getCurDataSource();
DataSourceHolder.setCurDataSource(DataSourceType.XXX); // 設置當前的數據源類型
bbService.handler(); // bbService 在處理時就會使用 XXX 對應的數據源
DataSourceHolder.setCurDataSource(prevType); // 還原回之前的數據源
}
}
進一步簡化
上面動態數據源的使用似乎有些繁瑣,我們可以使用 AOP 來簡化這個步驟,由于我們無法在運行中得知用戶需要使用的數據源類型,因此我們只能要求用戶決定。為了達到這一目的,我們可以自己定義一個注解來標記用戶希望使用的數據源類型:
import java.lang.annotation.*;
/**
* 用于定義處理上下文的所需要持有的數據源類型
*/
@Inherited
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface DataSource {
DataSourceType value() default DataSourceType.MYSQL;
}
這樣,用戶如果希望在 XXService 服務中都使用 MySQL 數據源,而在 BBService 中都使用 PostrgreSQL 數據源,可以這么做:
@Service
@DataSource(MYSQL)
public class XXService {
}
@Service
@DataSource(POSTGRESQL)
public class BBService {
}
現在我們已經定義了需要攔截的位置,還需要定義相關的行為來達到自動切換數據源上下文的目的:
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.xhliu.springtransaction.annotation.DataSource;
import org.xhliu.springtransaction.datasource.DataSourceHolder;
import org.xhliu.springtransaction.datasource.DataSourceType;
@Aspect
@Component
public class DataSourceAspect {
private final static Logger log = LoggerFactory.getLogger(DataSourceAspect.class);
@Around("@annotation(org.xhliu.springtransaction.annotation.DataSource)")
public Object dataSourceSelect(ProceedingJoinPoint pjp) throws Throwable {
DataSourceType prevType = DataSourceHolder.getCurDataSource();
// 獲取當前用戶需要使用的動態數據源類型
DataSource dataSource = parseDataSourceAnno(pjp);
try {
log.debug("當前執行的上下文中,數據源的所屬類型: {}", dataSource.value());
DataSourceHolder.setCurDataSource(dataSource.value());
return pjp.proceed();
} finally {
// 最終需要還原回一開始的數據源
DataSourceHolder.setCurDataSource(prevType);
}
}
private static DataSource parseDataSourceAnno(ProceedingJoinPoint pjp) {
MethodSignature signature = (MethodSignature) pjp.getSignature();
DataSource dataSource = signature.getMethod().getDeclaredAnnotation(DataSource.class);
if (dataSource != null) return dataSource;
Object target = pjp.getTarget();
return target.getClass().getDeclaredAnnotation(DataSource.class);
}
}
修改 MyBatis 事務的行為
基本處理
由于 Spring 事務是通過 TransactionSynchronizationManager 的 ThreadLocal 綁定 DataSource 和對應的 Connection 來實現事務的上下文檢測,因此我們創建的 DataSource 在事務的執行過程中是無法再動態地切換數據源。為了解決這一問題,我們需要重新定義 MyBatis 事務的處理邏輯,使得它能夠動態地切換數據源
我們定義自己的 DynamicTransaction 來替換現有的 SpringManagedTransaction:
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransaction;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.datasource.DelegatingDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.xhliu.springtransaction.datasource.DataSourceType;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 用于定義在當前 MyBatis 處理上下文中,正在被使用的事務對象類型,由于現有的 {@link SpringManagedTransaction}
* 實現只能綁定到一個數據源,在基于 {@link AbstractRoutingDataSource} 的數據源中,當同屬于一個事務時,無法切換到希望的
* 數據源,為此,需要定義一個特殊的事務類型來替換現有的事務類型,從而實現在一個事務中能夠切換數據源的效果
*
* @author lxh
*/
public class DynamicTransaction
extends SpringManagedTransaction {
// 緩存當前數據源之間的映射關系
private final Map<DataSourceType, Transaction> txMap = new ConcurrentHashMap<>();
// 實際當前系統中持有的動態數據源對象
private final DataSource dataSource;
public DynamicTransaction(DataSource dataSource) {
super(dataSource);
this.dataSource = dataSource;
}
@Override
public Connection getConnection() throws SQLException {
Connection connection = getConnection(DynamicDataSourceUtils.determineDataSourceType());
if (TransactionSynchronizationManager.isActualTransactionActive()) {
connection.setAutoCommit(false); // 如果當前已經持有了事務,那么獲取到的連接應當都是非自動提交的
}
return connection;
}
@Override
public void commit() throws SQLException {
/*
由于該方法的調用發生在 Spring 事務提交之前 `triggerBeforeCommit` 鉤子方法
*/
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
if (!entry.getValue().getConnection().getAutoCommit()) {
entry.getValue().getConnection().commit();
}
}
}
@Override
public void rollback() throws SQLException {
/*
前面提到,MyBatis 整合 Spring 的事務過程中是通過 AbstractPlatformTransactionManager 的鉤子方法實現的,
在回滾時如果能夠檢測到事務存活,那么說明此時事務依舊被 Spring 管理,因此此時這部分的處理不應當被回滾
*/
if (TransactionSynchronizationManager.isActualTransactionActive()) return;
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
entry.getValue().getConnection().rollback();
}
}
@Override
public void close() throws SQLException {
if (TransactionSynchronizationManager.isActualTransactionActive()) return;
for (Map.Entry<DataSourceType, Transaction> entry : txMap.entrySet()) {
DataSourceUtils.releaseConnection(entry.getValue().getConnection(), curDataSource(entry.getKey()));
}
}
private Connection getConnection(DataSourceType type) throws SQLException {
if (txMap.containsKey(type)) {
return txMap.get(type).getConnection();
}
txMap.put(type, new SpringManagedTransaction(curDataSource(type)));
return txMap.get(type).getConnection();
}
private DataSource curDataSource(DataSourceType type) {
DataSource curDS = dataSource;
/*
由于有可能存在代理,因此需要不斷剝離現有數據源對象,直到獲取到實際的數據源對象
*/
while (curDS instanceof DelegatingDataSource) {
curDS = ((DelegatingDataSource) curDS).getTargetDataSource();
}
/*
對于動態數據源對象,需要通過對應 Key 獲取到對應的實際 DataSource 對象
*/
if (curDS instanceof AbstractRoutingDataSource) {
Map<Object, DataSource> dss = ((AbstractRoutingDataSource) curDS).getResolvedDataSources();
return dss.getOrDefault(type, ((AbstractRoutingDataSource) curDS).getResolvedDefaultDataSource());
}
return curDS; // 其它一般情況的數據源。。。。
}
}
為了使得 MyBatis能夠使用我們自定義的 Transaction,我們需要重新配置 MyBatis 的 TransactionFactory,因此我們需要重新定義自己的 TransactionFactory:
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.apache.ibatis.transaction.Transaction;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import javax.sql.DataSource;
/**
* 重新定義 MyBatis 中的事務工廠,使得自定義的動態數據源事務能夠被 MyBatis 加載
*
* @author lxh
*/
public class DynamicTransactionFactory
extends SpringManagedTransactionFactory {
@Override
public Transaction newTransaction(DataSource dataSource,
TransactionIsolationLevel level,
boolean autoCommit) {
return new DynamicTransaction(dataSource);
}
}
現在,我們要做的是替換現有 MyBatis 中的 TransactionFactory,這個配置是在 MybatisAutoConfiguration (如果是第三方的擴展的 MyBatis,則是在其對應的 **AutoConfiguration 中)中完成的配置:
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
factory.setDataSource(dataSource);
factory.setVfs(SpringBootVFS.class);
if (StringUtils.hasText(this.properties.getConfigLocation())) {
factory.setConfigLocation(this.resourceLoader.getResource(this.properties.getConfigLocation()));
}
/*
應用相關的 Configuration,包括 Mapper 的路徑,日志等配置信息
*/
applyConfiguration(factory);
// 省略部分代碼。。。。
/*
這里是 MyBatis 提供的一個擴展點,用于修改 SqlSessionFactoryBean 的相關配置屬性,如 TransactionFactory 等相關信息,具體詳情可以查看 org.mybatis.spring.boot.autoconfigure.SqlSessionFactoryBeanCustomizer
*/
applySqlSessionFactoryBeanCustomizers(factory);
return factory.getObject();
}
由于 MyBatis 已經提供了相關的擴展點,因此我們可以由此將我們自定義的 TransactionFactory 替換掉 MyBatis 中默認的 TransactionFactory:
import org.apache.ibatis.transaction.TransactionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.boot.autoconfigure.SqlSessionFactoryBeanCustomizer;
public class TransactionSqlSessionFactoryBeanCustomizer
implements SqlSessionFactoryBeanCustomizer {
private final TransactionFactory txFactory;
public TransactionSqlSessionFactoryBeanCustomizer(TransactionFactory txFactory) {
this.txFactory = txFactory;
}
@Override
public void customize(SqlSessionFactoryBean factoryBean) {
factoryBean.setTransactionFactory(txFactory);
}
}
我們需要將這個類添加到 Spring 上下文中,使得 Spring 能夠發現并實例化它(這里我們使用注解的形式):
import org.mybatis.spring.SqlSessionFactoryBean;
import org.apache.ibatis.transaction.TransactionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.xhliu.springtransaction.transaction.DynamicTransactionFactory;
/**
* @author lxh
*/
@Configuration
public class MyBatisConfig {
@Bean(name = "dynamicTransactionFactory")
public TransactionFactory dynamicTransactionFactory() {
return new DynamicTransactionFactory();
}
@Bean(name = "dynamicDataSourceCustomizer")
public SqlSessionFactoryBeanCustomizer
dynamicDataSourceCustomizer(
@Qualifier("dynamicTransactionFactory") TransactionFactory dynamicTransactionFactory
) {
return new TransactionSqlSessionFactoryBeanCustomizer(dynamicTransactionFactory);
}
}
現在每個組件的關系如下:
一些可能出現的問題
TransactionFactory 無法注冊
在一些低版本的 MyBatis 或者第三方 MyBatis 組件中,可能使用 SqlSessionFactoryBeanCustomizer 來配置 SqlSessionFactoryBean,在這種情況下,最佳的解決方式是提高 MyBatis 的版本,但是在一些三方組件中,這部分可能很難發生變化(不再維護或者其它原因無法修改),這種情況下,需要我們手動替換 SqlSessionFactory 的定義,比如我們創建自己的 MineMyBatisAutoConfiguration:
/**
* @author lxh
*/
@Configuration
public class MineMyBatisAutoConfiguration {
private final static Logger log = LoggerFactory.getLogger(MineMyBatisAutoConfiguration.class);
private final MybatisProperties properties;
private final Interceptor[] interceptors;
private final TypeHandler[] typeHandlers;
private final LanguageDriver[] languageDrivers;
private final ResourceLoader resourceLoader;
private final DatabaseIdProvider databaseIdProvider;
private final List<ConfigurationCustomizer> configurationCustomizers;
private final List<SqlSessionFactoryBeanCustomizer> sqlSessionFactoryBeanCustomizers;
public MineMyBatisAutoConfiguration(
MybatisProperties properties,
ObjectProvider<Interceptor[]> interceptorsProvider,
ObjectProvider<TypeHandler[]> typeHandlersProvider,
ObjectProvider<LanguageDriver[]> languageDriversProvider,
ResourceLoader resourceLoader,
ObjectProvider<DatabaseIdProvider> databaseIdProvider,
ObjectProvider<List<ConfigurationCustomizer>> configurationCustomizersProvider,
ObjectProvider<List<SqlSessionFactoryBeanCustomizer>> sqlSessionFactoryBeanCustomizers
) {
this.properties = properties;
this.interceptors = interceptorsProvider.getIfAvailable();
this.typeHandlers = typeHandlersProvider.getIfAvailable();
this.languageDrivers = languageDriversProvider.getIfAvailable();
this.resourceLoader = resourceLoader;
this.databaseIdProvider = databaseIdProvider.getIfAvailable();
this.configurationCustomizers = configurationCustomizersProvider.getIfAvailable();
this.sqlSessionFactoryBeanCustomizers = sqlSessionFactoryBeanCustomizers.getIfAvailable();
}
@Bean
@ConditionalOnMissingBean
public SqlSessionFactory sqlSessionFactory(DataSource dataSource) throws Exception {
SqlSessionFactoryBean factory = new SqlSessionFactoryBean();
// 三方組件相關的源碼。。。。
/*
將 SqlSessionFactoryBeanCustomizer 配置到當前的 SqlSessionFactoryBean,使得我們現有的
TransactionFactory 的配置能夠生效
*/
applySqlSessionFactoryBeanCustomizers(factory);
return factory.getObject();
}
private void applySqlSessionFactoryBeanCustomizers(SqlSessionFactoryBean factory) {
if (!CollectionUtils.isEmpty(this.sqlSessionFactoryBeanCustomizers)) {
for (SqlSessionFactoryBeanCustomizer customizer : this.sqlSessionFactoryBeanCustomizers) {
customizer.customize(factory);
}
}
}
private void applyConfiguration(SqlSessionFactoryBean factory) {
org.apache.ibatis.session.Configuration configuration = this.properties.getConfiguration();
if (configuration == null && !StringUtils.hasText(this.properties.getConfigLocation())) {
configuration = new org.apache.ibatis.session.Configuration();
}
if (configuration != null && !CollectionUtils.isEmpty(this.configurationCustomizers)) {
for (ConfigurationCustomizer customizer : this.configurationCustomizers) {
customizer.customize(configuration);
}
}
factory.setConfiguration(configuration);
}
}
由于配置類的加載順序問題,可能需要手動地修改配置類定義的順序,由于 Spring 會首先加載被 @ComponentScan 注解修飾的配置類,因此在啟動類中需要將這個類作為最開始掃描的基類,從而不會被其它 MyBatis 組件替換:
/*
強制將 MineMyBatisAutoConfiguration 的配置類定義放到最前
*/
@ComponentScan(basePackageClasses = {MineMyBatisAutoConfiguration.class, DemoApplication.class})
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
死鎖
實際上,由于 Spring 事務會綁定 DataSource 作為事務的關鍵信息對象,同時會通過 DataSource 的 getConnection() 方法作為此 DataSource 對應事務的唯一連接,這在原有的事務處理中是沒有問題的。然而,由于我們修改了 MyBatis 獲取數據庫連接的方式,使得它不再是直接當前線程綁定的事務信息中的連接了,也就是說,MyBatis 獲取到的 Connection 和 Spring 事務中的存活的 Connection 不再是同一個。在這種情況下,Spring 事務在等待 MyBatis 處理的結束去釋放連接,而 MyBatis 獲取數據又需要重新從 DataSource 中再獲取一次(一般是通過數據庫連接池,如果此時連接池中的連接數已經被耗盡了,那么此時 MyBatis 的處理會被阻塞),而 MyBatis 的阻塞又會導致 Spring 事務中的數據庫連接無法被釋放,這可能導致 MyBatis 永遠無法再獲取到新的連接!
具體情況如下圖所示:
回想一下死鎖出現的幾個條件:持有互斥鎖、持有并等待、非搶占式以及構成循環回路。盡管在這個問題中并不存在實際意義上的互斥鎖,但是對于連接池的請求也間接地相當于希望獲取互斥鎖,同時內部的兩個獲取連接的操作也在形式上滿足了其余的幾個條件。
為了解決死鎖,只需要去掉其中的一個條件即可,最佳的條件去除就是互斥鎖。經過上文的分析,出現死鎖的原因是因為一個事務中多次獲取了連接,我們只需要保證在一個事務中不會出現對同一個數據源多次獲取連接即可
首先,我們需要確保在一個事務中綁定的 DataSource 為我們實際需要獲取連接的數據源,而不是 AbstractRoutingDataSource(綁定該數據源就會使得后續 MyBatis 在獲取連接時重新獲取一次),因此,我們需要修改現在的 DataSourceTransactionManager,使得它能夠綁定到正確的實際數據源:
import org.jetbrains.annotations.NotNull;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.jdbc.datasource.DelegatingDataSource;
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import javax.sql.DataSource;
import java.util.Map;
public class DynamicDataSourceTransactionManager
extends DataSourceTransactionManager {
public DynamicDataSourceTransactionManager(DataSource dataSource) {
super(dataSource);
}
@NotNull
@Override
protected DataSource obtainDataSource() {
/*
剝離 AbstractRoutingDataSource,使得事務能夠綁定實際的 DataSource,后續的 MyBatis 獲取連接時即可通過 DataSource 獲取到當前事務上下文中關聯的數據庫連接
*/
DataSource curDataSource = super.obtainDataSource();
while (curDataSource instanceof DelegatingDataSource) {
curDataSource = ((DelegatingDataSource) curDataSource).getTargetDataSource();
}
if (curDataSource instanceof AbstractRoutingDataSource) {
Map<Object, DataSource> dss = ((AbstractRoutingDataSource) curDataSource).getResolvedDataSources();
return dss.getOrDefault(DynamicDataSourceUtils.determineDataSourceType(),
((AbstractRoutingDataSource) curDataSource).getResolvedDefaultDataSource());
}
assert curDataSource != null;
return curDataSource;
}
}
為了使得這個事務管理能夠生效,我們需要替換現有的 DataSourceTransactionManager Bean 定義:
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.transaction.TransactionManagerCustomizers;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
@Configuration
public class TransactionConfiguration {
/*
由于 spring-jdbc 定義的 DataSourceTransactionManager 是被 @ConditionalOnMissingBean 修飾的,因此我們
在這里直接定義 Bean 就可以重新覆蓋原有的 DataSourceTransactionManager 定義
*/
@Bean(name = "dynamicDataSourceTransactionManager")
public DataSourceTransactionManager dynamicDataSourceTransactionManager(
@Qualifier("dynamicDataSource") DataSource dataSource,
ObjectProvider<TransactionManagerCustomizers> transactionManagerCustomizers
) {
DynamicDataSourceTransactionManager transactionManager = new DynamicDataSourceTransactionManager(dataSource);
transactionManagerCustomizers.ifAvailable((customizers) -> customizers.customize(transactionManager));
return transactionManager;
}
}
現在,死鎖的問題便得到了順利地解決
多線程中的事務
由于 Spring 的事務信息是通過 ThreadLocal 控制的,因此在不同的線程中,Spring 事務便不能很好地工作,為了解決這個問題,我們可以在線程執行任務前將現有線程關聯的事務信息綁定到當前工作線程,當出現異常時,我們可以將這個事務信息標記為 “只能回滾”,從而達到整體的一致性的目標
以下面的例子為例:
public TransactionStatus run() {
/*
txManager 為創建任務時必須的 DataSourceTransactionManager 事務管理對象
resource 為之前事務所在線程綁定的資源對象,我們知道就是 DataSourceTransactionObject,持有數據庫連接的信息對象,
這樣,當前線程中后續的 MyBatis 組件在獲取連接時也能夠復用現有的數據庫連接
*/
Object key = txManager.getResourceFactory();
TransactionSynchronizationManager.bindResource(key, resource);
TransactionStatus status = txManager.getTransaction(definition);
try {
runnable.run();
} catch (Throwable t) {
log.debug("任務執行出現異常", t);
status.setRollbackOnly(); // 出現異常時將整個事務設置為只能回滾的狀態
} finally {
// 移除與當前線程執行的關聯關系,避免任務執行過程中的資源混亂
TransactionSynchronizationManager.unbindResource(key);
}
return status;
}
具體 demo 地址:https://github.com/LiuXianghai-coder/Spring-Study/tree/master/spring-transaction
總結
以上是生活随笔為你收集整理的MyBatis—Spring 动态数据源事务的处理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 调试分析Linux 0.00引导程序
- 下一篇: [python]常用配置读取方法