mysql源码解读——事务管理
一、事務(wù)(Transaction)
事務(wù)是什么?按照書上說的就是系統(tǒng)的一套操作為了保持?jǐn)?shù)據(jù)的完整性必須符合ACID的特性,即原子性(Atomic)、一致性(Consistency)、隔離性(Isolation)、 持久性(Durability)。原子性比較好理解,操作要么全執(zhí)行完成,要么全不執(zhí)行完,實(shí)現(xiàn)這種方式就要支持回滾操作。而一致性指的是事務(wù)在改變狀態(tài)時(shí),要保證所有的訪問得到的結(jié)果是相同的。一致性有強(qiáng)弱之分,還有區(qū)塊鏈中常用的最終一致性。隔離性就比較好理解了,其實(shí)就是事務(wù)間的操作(并發(fā)時(shí))不能互相影響,常見的如臟讀、幻讀和不可重復(fù)讀。最后就是持久性,也即持久化的影響最終數(shù)據(jù)。
更詳細(xì)的事務(wù)的相關(guān)知識(shí)可參閱相關(guān)數(shù)據(jù)庫的知識(shí),推薦看一下《數(shù)據(jù)密集型應(yīng)用系統(tǒng)設(shè)計(jì)》。
在MySql中事務(wù)的隔離有四種,由低到高是:
1、讀未提交(read-uncommitted):都可能產(chǎn)生,即臟讀、幻讀和不可重復(fù)讀都可能產(chǎn)生。
2、不可重復(fù)讀(read-committed):它不會(huì)產(chǎn)生臟讀
3、可重復(fù)讀(repeatable-read):可能產(chǎn)生幻讀,這種是MySql默認(rèn)的事務(wù)隔離級(jí)別。
4、串行化(serializable):最高隔離方式,各種結(jié)果均無可能產(chǎn)生
二、MySql中的應(yīng)用
在MySql中可以使用命令來配置上面的四種事務(wù)級(jí)別:
SET [GLOBAL | SESSION] TRANSACTION ISOLATION LEVEL{READ UNCOMMITTED| READ COMMITTED| REPEATABLE READ| SERIALIZABLE}GLOBAL | SESSION代表全局和本次會(huì)話的設(shè)置,這里的設(shè)置都是設(shè)置的默認(rèn)事務(wù)隔離級(jí)別。
同樣在前面提到的回滾,可以使用ROLLBACK來實(shí)現(xiàn),當(dāng)然復(fù)雜的事務(wù)可能需要分步來實(shí)現(xiàn)。
設(shè)置的具體的代碼下面分析。
三、源碼分析
在這里看一下,InnoDB相關(guān)源碼的分析:
int mysql_execute_command(THD *thd, bool first_level) { ...... switch (lex->sql_command) {case SQLCOM_PREPARE: {mysql_sql_stmt_prepare(thd);break;}case SQLCOM_EXECUTE: {mysql_sql_stmt_execute(thd);break;}case SQLCOM_DEALLOCATE_PREPARE: {mysql_sql_stmt_close(thd);break;}case SQLCOM_EMPTY_QUERY:my_ok(thd);break; ...... case SQLCOM_REPLACE: case SQLCOM_INSERT: case SQLCOM_REPLACE_SELECT: case SQLCOM_INSERT_SELECT: case SQLCOM_DELETE: case SQLCOM_DELETE_MULTI: case SQLCOM_UPDATE: case SQLCOM_UPDATE_MULTI: case SQLCOM_CREATE_TABLE: case SQLCOM_CREATE_INDEX: case SQLCOM_DROP_INDEX: case SQLCOM_ASSIGN_TO_KEYCACHE: case SQLCOM_PRELOAD_KEYS: case SQLCOM_LOAD: {assert(first_table == all_tables && first_table != nullptr);assert(lex->m_sql_cmd != nullptr);res = lex->m_sql_cmd->execute(thd);break; } } ...... /* report error issued during command execution * / if (thd->killed) thd->send_kill_message(); if (thd->is_error() ||(thd->variables.option_bits & OPTION_MASTER_SQL_ERROR))trans_rollback_stmt(thd); //如果錯(cuò)誤就回滾 else {/* If commit fails, we should be able to reset the OK status. * /thd->get_stmt_da()->set_overwrite_status(true);trans_commit_stmt(thd); //否則提交thd->get_stmt_da()->set_overwrite_status(false); } ...... }提交的代碼在前面分析過,這里重點(diǎn)看一下相關(guān)的事務(wù)方面的提交控制:
/**Commit the single statement transaction.@note Note that if the autocommit is on, then the following callinside InnoDB will commit or rollback the whole transaction(= the statement). The autocommit mechanism built into InnoDBis based on counting locks, but if the user has used LOCKTABLES then that mechanism does not know to do the commit.@param[in] thd Current thread@param[in] ignore_global_read_lock Allow commit to complete even if aglobal read lock is active. This can beused to allow changes to internal tables(e.g. slave status tables, analyzetable).@retval false Success@retval true Failure */bool trans_commit_stmt(THD *thd, bool ignore_global_read_lock) {DBUG_TRACE;int res = false;/*We currently don't invoke commit/rollback at end ofa sub-statement. In future, we perhaps should takea savepoint for each nested statement, and release thesavepoint when statement has succeeded.*/assert(!thd->in_sub_stmt);/*Some code in MYSQL_BIN_LOG::commit and ha_commit_low() is not safefor attachable transactions.*/assert(!thd->is_attachable_ro_transaction_active());thd->get_transaction()->merge_unsafe_rollback_flags();if (thd->get_transaction()->is_active(Transaction_ctx::STMT)) {res = ha_commit_trans(thd, false, ignore_global_read_lock);if (!thd->in_active_multi_stmt_transaction())trans_reset_one_shot_chistics(thd);} else if (tc_log)res = tc_log->commit(thd, false);if (res == false && !thd->in_active_multi_stmt_transaction())if (thd->rpl_thd_ctx.session_gtids_ctx().notify_after_transaction_commit(thd))LogErr(WARNING_LEVEL, ER_TRX_GTID_COLLECT_REJECT);/* In autocommit=1 mode the transaction should be marked as complete in P_S */assert(thd->in_active_multi_stmt_transaction() ||thd->m_transaction_psi == nullptr);thd->get_transaction()->reset(Transaction_ctx::STMT);return res; }/**Rollback the single statement transaction.@param thd Current thread@retval false Success@retval true Failure */ bool trans_rollback_stmt(THD *thd) {DBUG_TRACE;/*We currently don't invoke commit/rollback at end ofa sub-statement. In future, we perhaps should takea savepoint for each nested statement, and release thesavepoint when statement has succeeded.對(duì)當(dāng)前保存點(diǎn)的處理目前暫時(shí)只處理主處理*/assert(!thd->in_sub_stmt);/*Some code in MYSQL_BIN_LOG::rollback and ha_rollback_low() is not safefor attachable transactions.對(duì)不安全的代碼進(jìn)行處理*/assert(!thd->is_attachable_ro_transaction_active());thd->get_transaction()->merge_unsafe_rollback_flags();if (thd->get_transaction()->is_active(Transaction_ctx::STMT)) {ha_rollback_trans(thd, false); //看這個(gè)函數(shù)if (!thd->in_active_multi_stmt_transaction())trans_reset_one_shot_chistics(thd);} else if (tc_log)tc_log->rollback(thd, false);if (!thd->owned_gtid_is_empty() && !thd->in_active_multi_stmt_transaction()) {/*To a failed single statement transaction on auto-commit mode,we roll back its owned gtid if it does not modifynon-transational table or commit its owned gtid if it has modifiednon-transactional table when rolling back it if binlog is disabled,as we did when binlog is enabled.We do not need to check if binlog is enabled here, since we alreadyreleased its owned gtid in MYSQL_BIN_LOG::rollback(...) right beforethis if binlog is enabled.*/if (thd->get_transaction()->has_modified_non_trans_table(Transaction_ctx::STMT))gtid_state->update_on_commit(thd);elsegtid_state->update_on_rollback(thd);}它的調(diào)用如下內(nèi)容:
/**@param[in] thd Thread handle.@param[in] all Session transaction if true, statementotherwise.@param[in] ignore_global_read_lock Allow commit to complete even if aglobal read lock is active. This can beused to allow changes to internal tables(e.g. slave status tables).@retval0 ok@retval1 transaction was rolled back@retval2 error during commit, data may be inconsistent@todoSince we don't support nested statement transactions in 5.0,we can't commit or rollback stmt transactions while we are insidestored functions or triggers. So we simply do nothing now.TODO: This should be fixed in later ( >= 5.1) releases.5.1以后支持嵌套事務(wù) */int ha_commit_trans(THD *thd, bool all, bool ignore_global_read_lock) {int error = 0;THD_STAGE_INFO(thd, stage_waiting_for_handler_commit);bool run_slave_post_commit = false;bool need_clear_owned_gtid = false;/*Save transaction owned gtid into table before transaction prepareif binlog is disabled, or binlog is enabled and log_slave_updatesis disabled with slave SQL thread or slave worker thread.提交自身事物*/std::tie(error, need_clear_owned_gtid) = commit_owned_gtids(thd, all);/*'all' means that this is either an explicit commit issued byuser, or an implicit commit issued by a DDL.*/Transaction_ctx *trn_ctx = thd->get_transaction();Transaction_ctx::enum_trx_scope trx_scope =all ? Transaction_ctx::SESSION : Transaction_ctx::STMT;/*"real" is a nick name for a transaction for which a commit willmake persistent changes. E.g. a 'stmt' transaction inside a 'all'transation is not 'real': even though it's possible to commit it,the changes are not durable as they might be rolled back if theenclosing 'all' transaction is rolled back.*/bool is_real_trans = all || !trn_ctx->is_active(Transaction_ctx::SESSION);Ha_trx_info *ha_info = trn_ctx->ha_trx_info(trx_scope);XID_STATE *xid_state = trn_ctx->xid_state();DBUG_TRACE;DBUG_PRINT("info", ("all=%d thd->in_sub_stmt=%d ha_info=%p is_real_trans=%d",all, thd->in_sub_stmt, ha_info, is_real_trans));/*We must not commit the normal transaction if a statementtransaction is pending. Otherwise statement transactionflags will not get propagated to its normal transaction'scounterpart.*/assert(!trn_ctx->is_active(Transaction_ctx::STMT) || !all);DBUG_EXECUTE_IF("pre_commit_error", {error = true;my_error(ER_UNKNOWN_ERROR, MYF(0));});/*When atomic DDL is executed on the slave, we would like toto update slave applier state as part of DDL's transaction.Call Relay_log_info::pre_commit() hook to do this before DDLgets committed in the following block.Failed atomic DDL statements should've been marked as executed/committedduring statement rollback, though some like GRANT may continue untilthis point.When applying a DDL statement on a slave and the statement is filteredout by a table filter, we report an error "ER_SLAVE_IGNORED_TABLE" towarn slave applier thread. We need to save the DDL statement's gtidinto mysql.gtid_executed system table if the binary log is disabledon the slave and gtids are enabled.*/if (is_real_trans && is_atomic_ddl_commit_on_slave(thd) &&(!thd->is_error() ||(thd->is_operating_gtid_table_implicitly &&thd->get_stmt_da()->mysql_errno() == ER_SLAVE_IGNORED_TABLE))) {run_slave_post_commit = true;error = error || thd->rli_slave->pre_commit();DBUG_EXECUTE_IF("rli_pre_commit_error", {error = true;my_error(ER_UNKNOWN_ERROR, MYF(0));});DBUG_EXECUTE_IF("slave_crash_before_commit", {/* This pre-commit crash aims solely at atomic DDL */DBUG_SUICIDE();});}//子事務(wù)處理if (thd->in_sub_stmt) {assert(0);/*Since we don't support nested statement transactions in 5.0,we can't commit or rollback stmt transactions while we are insidestored functions or triggers. So we simply do nothing now.TODO: This should be fixed in later ( >= 5.1) releases.*/if (!all) return 0;/*We assume that all statements which commit or rollback main transactionare prohibited inside of stored functions or triggers. So they shouldbail out with error even before ha_commit_trans() call. To be 100% safelet us throw error in non-debug builds.*/my_error(ER_COMMIT_NOT_ALLOWED_IN_SF_OR_TRG, MYF(0));return 2;}MDL_request mdl_request;bool release_mdl = false;//兩階段提交處理if (ha_info && !error) {uint rw_ha_count = 0;bool rw_trans;DBUG_EXECUTE_IF("crash_commit_before", DBUG_SUICIDE(););/*skip 2PC if the transaction is empty and it is not marked as started (whichcan happen when the slave's binlog is disabled)*/if (ha_info->is_started())rw_ha_count = ha_check_and_coalesce_trx_read_only(thd, ha_info, all);trn_ctx->set_rw_ha_count(trx_scope, rw_ha_count);/* rw_trans is true when we in a transaction changing data */rw_trans = is_real_trans && (rw_ha_count > 0);DBUG_EXECUTE_IF("dbug.enabled_commit", {const char act[] = "now signal Reached wait_for signal.commit_continue";assert(!debug_sync_set_action(thd, STRING_WITH_LEN(act)));};);DEBUG_SYNC(thd, "ha_commit_trans_before_acquire_commit_lock");if (rw_trans && !ignore_global_read_lock) {/*Acquire a metadata lock which will ensure that COMMIT is blockedby an active FLUSH TABLES WITH READ LOCK (and vice versa:COMMIT in progress blocks FTWRL).We allow the owner of FTWRL to COMMIT; we assume that it knowswhat it does.*/MDL_REQUEST_INIT(&mdl_request, MDL_key::COMMIT, "", "",MDL_INTENTION_EXCLUSIVE, MDL_EXPLICIT);DBUG_PRINT("debug", ("Acquire MDL commit lock"));if (thd->mdl_context.acquire_lock(&mdl_request,thd->variables.lock_wait_timeout)) {ha_rollback_trans(thd, all);return 1;}release_mdl = true;DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock");}if (rw_trans && stmt_has_updated_trans_table(ha_info) &&check_readonly(thd, true)) {ha_rollback_trans(thd, all);error = 1;goto end;}if (!trn_ctx->no_2pc(trx_scope) && (trn_ctx->rw_ha_count(trx_scope) > 1))error = tc_log->prepare(thd, all);}/*The state of XA transaction is changed to Prepared, intermediately.It's going to change to the regular NOTR at the end.The fact of the Prepared state is of interest to binary logger.*/if (!error && all && xid_state->has_state(XID_STATE::XA_IDLE)) {assert(thd->lex->sql_command == SQLCOM_XA_COMMIT &&static_cast<Sql_cmd_xa_commit *>(thd->lex->m_sql_cmd)->get_xa_opt() ==XA_ONE_PHASE);xid_state->set_state(XID_STATE::XA_PREPARED);}if (error || (error = tc_log->commit(thd, all))) {ha_rollback_trans(thd, all);error = 1;goto end;} /*Mark multi-statement (any autocommit mode) or single-statement(autocommit=1) transaction as rolled back */ #ifdef HAVE_PSI_TRANSACTION_INTERFACEif (is_real_trans && thd->m_transaction_psi != nullptr) {MYSQL_COMMIT_TRANSACTION(thd->m_transaction_psi);thd->m_transaction_psi = nullptr;} #endifDBUG_EXECUTE_IF("crash_commit_after",if (!thd->is_operating_gtid_table_implicitly)DBUG_SUICIDE();); end:if (release_mdl && mdl_request.ticket) {/*We do not always immediately release transactional locksafter ha_commit_trans() (see uses of ha_enable_transaction()),thus we release the commit blocker lock as soon as it'snot needed.*/DBUG_PRINT("debug", ("Releasing MDL commit lock"));thd->mdl_context.release_lock(mdl_request.ticket);}/* Free resources and perform other cleanup even for 'empty' transactions. */if (is_real_trans) {trn_ctx->cleanup();thd->tx_priority = 0;}if (need_clear_owned_gtid) {thd->server_status &= ~SERVER_STATUS_IN_TRANS;/*Release the owned GTID when binlog is disabled, or binlog isenabled and log_slave_updates is disabled with slave SQL threador slave worker thread.*/if (error)gtid_state->update_on_rollback(thd);elsegtid_state->update_on_commit(thd);} else {if (has_commit_order_manager(thd) && error) {gtid_state->update_on_rollback(thd);}}if (run_slave_post_commit) {DBUG_EXECUTE_IF("slave_crash_after_commit", DBUG_SUICIDE(););thd->rli_slave->post_commit(error != 0);/*SERVER_STATUS_IN_TRANS may've been gained by pre_commit alonewhen the main DDL transaction is filtered out of execution.In such case the status has to be reset now.TODO: move/refactor this handling onto trans_commit/commit_implicit()the caller level.*/thd->server_status &= ~SERVER_STATUS_IN_TRANS;} else {DBUG_EXECUTE_IF("slave_crash_after_commit", {if (thd->slave_thread && thd->rli_slave &&thd->rli_slave->current_event &&thd->rli_slave->current_event->get_type_code() ==binary_log::XID_EVENT &&!thd->is_operating_substatement_implicitly &&!thd->is_operating_gtid_table_implicitly)DBUG_SUICIDE();});}return error; }在上述的代碼中,可以看到對(duì)2PC的處理,其中的Prepare和Commit對(duì)就著兩個(gè)階段,這兩個(gè)階段中對(duì)相關(guān)的具體的事物的操作進(jìn)行了更深一步的處理。在處理的過程中,不斷的處理因?yàn)楦鞣N異常導(dǎo)致的ha_rollback_trans,用來回滾相關(guān)的數(shù)據(jù)。
/*Prepare the transaction in the transaction coordinator.This function will prepare the transaction in the storage engines(by calling @c ha_prepare_low) what will write a prepare recordto the log buffers.@retval 0 success@retval 1 error */ int MYSQL_BIN_LOG::prepare(THD *thd, bool all) {DBUG_TRACE;assert(opt_bin_log);/*The applier thread explicitly overrides the value of sql_log_binwith the value of log_slave_updates.*/assert(thd->slave_thread ? opt_log_slave_updates: thd->variables.sql_log_bin);/*Set HA_IGNORE_DURABILITY to not flush the prepared record of thetransaction to the log of storage engine (for example, InnoDBredo log) during the prepare phase. So that we can flush preparedrecords of transactions to the log of storage engine in a groupright before flushing them to binary log during binlog groupcommit flush stage. Reset to HA_REGULAR_DURABILITY at thebeginning of parsing next command.*/thd->durability_property = HA_IGNORE_DURABILITY;int error = ha_prepare_low(thd, all);return error; } int ha_prepare_low(THD *thd, bool all) {int error = 0;Transaction_ctx::enum_trx_scope trx_scope =all ? Transaction_ctx::SESSION : Transaction_ctx::STMT;Ha_trx_info *ha_info = thd->get_transaction()->ha_trx_info(trx_scope);DBUG_TRACE;if (ha_info) {for (; ha_info && !error; ha_info = ha_info->next()) {int err = 0;handlerton *ht = ha_info->ht();/*Do not call two-phase commit if this particulartransaction is read-only. This allows for simplerimplementation in engines that are always read-only.*/if (!ha_info->is_trx_read_write()) continue;if ((err = ht->prepare(ht, thd, all))) {char errbuf[MYSQL_ERRMSG_SIZE];my_error(ER_ERROR_DURING_COMMIT, MYF(0), err,my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));error = 1;}assert(!thd->status_var_aggregated);thd->status_var.ha_prepare_count++;}DBUG_EXECUTE_IF("crash_commit_after_prepare", DBUG_SUICIDE(););}return error; }在static int binlog_init(void *p)可以知道binlog_hton->prepare = binlog_prepare;則其函數(shù):
static int binlog_prepare(handlerton *, THD *thd, bool all) {DBUG_TRACE;if (!all) {thd->get_transaction()->store_commit_parent(mysql_bin_log.m_dependency_tracker.get_max_committed_timestamp());}return all && is_loggable_xa_prepare(thd) ? mysql_bin_log.commit(thd, true): 0; }然后再調(diào)用數(shù)據(jù)庫中的提交:
static int innodb_init(void *p) {DBUG_TRACE; ......innobase_hton->prepare = innobase_xa_prepare;innobase_hton->rollback_by_xid = innobase_rollback_by_xid;...... } static int innobase_xa_prepare(handlerton *hton, /*!< in: InnoDB handlerton */THD *thd, /*!< in: handle to the MySQL thread ofthe user whose XA transaction shouldbe prepared */bool prepare_trx) /*!< in: true - preparetransaction false - the currentSQL statement ended */ {trx_t *trx = check_trx_exists(thd);assert(hton == innodb_hton_ptr);thd_get_xid(thd, (MYSQL_XID *)trx->xid);innobase_srv_conc_force_exit_innodb(trx);TrxInInnoDB trx_in_innodb(trx);if (trx_in_innodb.is_aborted() ||DBUG_EVALUATE_IF("simulate_xa_failure_prepare_in_engine", 1, 0)) {innobase_rollback(hton, thd, prepare_trx);return (convert_error_code_to_mysql(DB_FORCED_ABORT, 0, thd));}if (!trx_is_registered_for_2pc(trx) && trx_is_started(trx)) {log_errlog(ERROR_LEVEL, ER_INNODB_UNREGISTERED_TRX_ACTIVE);}if (prepare_trx ||(!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {/* We were instructed to prepare the whole transaction, orthis is an SQL statement end and autocommit is on */ut_ad(trx_is_registered_for_2pc(trx));dberr_t err = trx_prepare_for_mysql(trx);ut_ad(err == DB_SUCCESS || err == DB_FORCED_ABORT);if (err == DB_FORCED_ABORT) {innobase_rollback(hton, thd, prepare_trx);return (convert_error_code_to_mysql(DB_FORCED_ABORT, 0, thd));}} else {/* We just mark the SQL statement ended and do not do atransaction prepare *//* If we had reserved the auto-inc lock for sometable in this SQL statement we release it now */lock_unlock_table_autoinc(trx);/* Store the current undo_no of the transaction so that weknow where to roll back if we have to roll back the nextSQL statement */trx_mark_sql_stat_end(trx);}if (thd_sql_command(thd) != SQLCOM_XA_PREPARE &&(prepare_trx ||!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {/* For mysqlbackup to work the order of transactions in binlogand InnoDB must be the same. Consider the situationthread1> prepare; write to binlog; ...<context switch>thread2> prepare; write to binlog; committhread1> ... commitThe server guarantees that writes to the binary logand commits are in the same order, so we do not haveto handle this case. */}return (0); } /** Does the transaction prepare for MySQL. @param[in, out] trx Transaction instance to prepare */ dberr_t trx_prepare_for_mysql(trx_t *trx) {trx_start_if_not_started_xa(trx, false);TrxInInnoDB trx_in_innodb(trx, true);if (trx_in_innodb.is_aborted() &&trx->killed_by != std::this_thread::get_id()) {return (DB_FORCED_ABORT);}/* For GTID persistence we need update undo segment. */auto db_err = trx_undo_gtid_add_update_undo(trx, true, false);if (db_err != DB_SUCCESS) {return (db_err);}trx->op_info = "preparing";trx_prepare(trx);trx->op_info = "";return (DB_SUCCESS); }前面的BINLOG中已經(jīng)調(diào)用到了Commit函數(shù),這里看最后調(diào)用innodb的commit:
/** Commits a transaction in an InnoDB database. */ void innobase_commit_low(trx_t *trx) /*!< in: transaction handle */ {if (trx_is_started(trx)) {const dberr_t error MY_ATTRIBUTE((unused)) = trx_commit_for_mysql(trx);// This is ut_ad not ut_a, because previously we did not have an assert// and nobody has noticed for a long time, so probably there is no much// harm in silencing this error. OTOH we believe it should no longer happen// after adding `true` as a second argument to TrxInInnoDB constructor call,// so we'd like to learn if the error can still happen.ut_ad(DB_SUCCESS == error);}trx->will_lock = 0; } /** Does the transaction commit for MySQL.@return DB_SUCCESS or error number */ dberr_t trx_commit_for_mysql(trx_t *trx) /*!< in/out: transaction */ {DEBUG_SYNC_C("trx_commit_for_mysql_checks_for_aborted");TrxInInnoDB trx_in_innodb(trx, true);if (trx_in_innodb.is_aborted() &&trx->killed_by != std::this_thread::get_id()) {return (DB_FORCED_ABORT);}/* Because we do not do the commit by sending an Innobasesig to the transaction, we must here make sure that trx has beenstarted. */dberr_t db_err = DB_SUCCESS;switch (trx->state) {case TRX_STATE_NOT_STARTED:case TRX_STATE_FORCED_ROLLBACK:ut_d(trx->start_file = __FILE__);ut_d(trx->start_line = __LINE__);trx_start_low(trx, true);/* fall through */case TRX_STATE_ACTIVE:case TRX_STATE_PREPARED:trx->op_info = "committing";/* For GTID persistence we need update undo segment. */db_err = trx_undo_gtid_add_update_undo(trx, false, false);if (db_err != DB_SUCCESS) {return (db_err);}if (trx->id != 0) {trx_update_mod_tables_timestamp(trx);}trx_commit(trx);MONITOR_DEC(MONITOR_TRX_ACTIVE);trx->op_info = "";return (DB_SUCCESS);case TRX_STATE_COMMITTED_IN_MEMORY:break;}ut_error;return (DB_CORRUPTION); }/** Commits a transaction. */ void trx_commit(trx_t *trx) /*!< in/out: transaction */ {mtr_t *mtr;mtr_t local_mtr;DBUG_EXECUTE_IF("ib_trx_commit_crash_before_trx_commit_start",DBUG_SUICIDE(););if (trx_is_rseg_updated(trx)) {mtr = &local_mtr;DBUG_EXECUTE_IF("ib_trx_commit_crash_rseg_updated", DBUG_SUICIDE(););mtr_start_sync(mtr);} else {mtr = nullptr;}trx_commit_low(trx, mtr); } /** Commits a transaction and a mini-transaction. @param[in,out] trx Transaction @param[in,out] mtr Mini-transaction (will be committed), or null if trx made no modifications */ void trx_commit_low(trx_t *trx, mtr_t *mtr) {assert_trx_nonlocking_or_in_list(trx);ut_ad(!trx_state_eq(trx, TRX_STATE_COMMITTED_IN_MEMORY));ut_ad(!mtr || mtr->is_active());/* undo_no is non-zero if we're doing the final commit. */if (trx->fts_trx != nullptr && trx->undo_no != 0 &&trx->lock.que_state != TRX_QUE_ROLLING_BACK) {dberr_t error;ut_a(!trx_is_autocommit_non_locking(trx));error = fts_commit(trx);/* FTS-FIXME: Temporarily tolerate DB_DUPLICATE_KEYinstead of dying. This is a possible scenario if thereis a crash between insert to DELETED table committingand transaction committing. The fix would be able toreturn error from this function */if (error != DB_SUCCESS && error != DB_DUPLICATE_KEY) {/* FTS-FIXME: once we can return values from thisfunction, we should do so and signal an errorinstead of just dying. */ut_error;}}bool serialised;if (mtr != nullptr) {mtr->set_sync();serialised = trx_write_serialisation_history(trx, mtr);/* The following call commits the mini-transaction, making thewhole transaction committed in the file-based world, at thislog sequence number. The transaction becomes 'durable' whenwe write the log to disk, but in the logical sense the commitin the file-based data structures (undo logs etc.) happenshere.NOTE that transaction numbers, which are assigned only totransactions with an update undo log, do not necessarily comein exactly the same order as commit lsn's, if the transactionshave different rollback segments. To get exactly the sameorder we should hold the kernel mutex up to this point,adding to the contention of the kernel mutex. However, ifa transaction T2 is able to see modifications made bya transaction T1, T2 will always get a bigger transactionnumber and a bigger commit lsn than T1. *//*--------------*/DBUG_EXECUTE_IF("trx_commit_to_the_end_of_log_block", {const size_t space_left = mtr->get_expected_log_size();mtr_commit_mlog_test_filling_block(*log_sys, space_left);});mtr_commit(mtr);DBUG_PRINT("trx_commit", ("commit lsn at " LSN_PF, mtr->commit_lsn()));DBUG_EXECUTE_IF("ib_crash_during_trx_commit_in_mem", if (trx_is_rseg_updated(trx)) {log_make_latest_checkpoint();DBUG_SUICIDE();});/*--------------*/} else {serialised = false;} #ifdef UNIV_DEBUG/* In case of this function is called from a stack executingTHD::release_resources -> ...innobase_connection_close() ->trx_rollback_for_mysql... -> .mysql's thd does not seem to havethd->debug_sync_control defined any longer. However the stackis possible only with a prepared trx not updating any data.*/if (trx->mysql_thd != nullptr && trx_is_redo_rseg_updated(trx)) {DEBUG_SYNC_C("before_trx_state_committed_in_memory");} #endiftrx_commit_in_memory(trx, mtr, serialised); } /** Assign the transaction its history serialisation number and write theupdate UNDO log record to the assigned rollback segment.@return true if a serialisation log was written */ static bool trx_write_serialisation_history(trx_t *trx, /*!< in/out: transaction */mtr_t *mtr) /*!< in/out: mini-transaction */ {/* Change the undo log segment states from TRX_UNDO_ACTIVE to someother state: these modifications to the file data structure definethe transaction as committed in the file based domain, at theserialization point of the log sequence number lsn obtained below. *//* We have to hold the rseg mutex because update log headers haveto be put to the history list in the (serialisation) order of theUNDO trx number. This is required for the purge in-memory datastructures too. */bool own_redo_rseg_mutex = false;bool own_temp_rseg_mutex = false;/* Get rollback segment mutex. */if (trx->rsegs.m_redo.rseg != nullptr && trx_is_redo_rseg_updated(trx)) {trx->rsegs.m_redo.rseg->latch();own_redo_rseg_mutex = true;}mtr_t temp_mtr;if (trx->rsegs.m_noredo.rseg != nullptr && trx_is_temp_rseg_updated(trx)) {trx->rsegs.m_noredo.rseg->latch();own_temp_rseg_mutex = true;mtr_start(&temp_mtr);temp_mtr.set_log_mode(MTR_LOG_NO_REDO);}/* If transaction involves insert then truncate undo logs. */if (trx->rsegs.m_redo.insert_undo != nullptr) {trx_undo_set_state_at_finish(trx->rsegs.m_redo.insert_undo, mtr);}if (trx->rsegs.m_noredo.insert_undo != nullptr) {trx_undo_set_state_at_finish(trx->rsegs.m_noredo.insert_undo, &temp_mtr);}bool serialised = false;/* If transaction involves update then add rollback segmentsto purge queue. */if (trx->rsegs.m_redo.update_undo != nullptr ||trx->rsegs.m_noredo.update_undo != nullptr) {/* Assign the transaction serialisation number and add theserollback segments to purge trx-no sorted priority queueif this is the first UNDO log being written to assignedrollback segments. */trx_undo_ptr_t *redo_rseg_undo_ptr =trx->rsegs.m_redo.update_undo != nullptr ? &trx->rsegs.m_redo : nullptr;trx_undo_ptr_t *temp_rseg_undo_ptr =trx->rsegs.m_noredo.update_undo != nullptr ? &trx->rsegs.m_noredo: nullptr;/* Will set trx->no and will add rseg to purge queue. */serialised = trx_serialisation_number_get(trx, redo_rseg_undo_ptr,temp_rseg_undo_ptr);/* It is not necessary to obtain trx->undo_mutex here becauseonly a single OS thread is allowed to do the transaction commitfor this transaction. */if (trx->rsegs.m_redo.update_undo != nullptr) {page_t *undo_hdr_page;undo_hdr_page =trx_undo_set_state_at_finish(trx->rsegs.m_redo.update_undo, mtr);/* Delay update of rseg_history_len if we plan to addnon-redo update_undo too. This is to avoid immediateinvocation of purge as we need to club these 2 segmentswith same trx-no as single unit. */bool update_rseg_len = !(trx->rsegs.m_noredo.update_undo != nullptr);/* Set flag if GTID information need to persist. */auto undo_ptr = &trx->rsegs.m_redo;trx_undo_gtid_set(trx, undo_ptr->update_undo, false);trx_undo_update_cleanup(trx, undo_ptr, undo_hdr_page, update_rseg_len,(update_rseg_len ? 1 : 0), mtr);}DBUG_EXECUTE_IF("ib_trx_crash_during_commit", DBUG_SUICIDE(););if (trx->rsegs.m_noredo.update_undo != nullptr) {page_t *undo_hdr_page;undo_hdr_page = trx_undo_set_state_at_finish(trx->rsegs.m_noredo.update_undo, &temp_mtr);ulint n_added_logs = (redo_rseg_undo_ptr != nullptr) ? 2 : 1;trx_undo_update_cleanup(trx, &trx->rsegs.m_noredo, undo_hdr_page, true,n_added_logs, &temp_mtr);}}if (own_redo_rseg_mutex) {trx->rsegs.m_redo.rseg->unlatch();own_redo_rseg_mutex = false;}if (own_temp_rseg_mutex) {trx->rsegs.m_noredo.rseg->unlatch();own_temp_rseg_mutex = false;mtr_commit(&temp_mtr);}MONITOR_INC(MONITOR_TRX_COMMIT_UNDO);/* Update the latest MySQL binlog name and offset informationin trx sys header only if MySQL binary logging is on and cloneis has ensured commit order at final stage. */if (Clone_handler::need_commit_order()) {trx_sys_update_mysql_binlog_offset(trx, mtr);}return (serialised); } /** Adds the update undo log header as the first in the history list, andfrees the memory object, or puts it to the list of cached update undo logsegments. @param[in] trx Trx owning the update undo log @param[in] undo_ptr Update undo log. @param[in] undo_page Update undo log header page, x-latched @param[in] update_rseg_history_len If true: update rseg history len else skip updating it. @param[in] n_added_logs Number of logs added @param[in] mtr Mini-transaction */ void trx_undo_update_cleanup(trx_t *trx, trx_undo_ptr_t *undo_ptr,page_t *undo_page, bool update_rseg_history_len,ulint n_added_logs, mtr_t *mtr) {trx_rseg_t *rseg;trx_undo_t *undo;undo = undo_ptr->update_undo;rseg = undo_ptr->rseg;ut_ad(mutex_own(&(rseg->mutex)));trx_purge_add_update_undo_to_history(trx, undo_ptr, undo_page, update_rseg_history_len, n_added_logs, mtr);UT_LIST_REMOVE(rseg->update_undo_list, undo);undo_ptr->update_undo = nullptr;if (undo->state == TRX_UNDO_CACHED) {UT_LIST_ADD_FIRST(rseg->update_undo_cached, undo);MONITOR_INC(MONITOR_NUM_UNDO_SLOT_CACHED);} else {ut_ad(undo->state == TRX_UNDO_TO_PURGE);trx_undo_mem_free(undo);} } /** Adds the update undo log as the first log in the history list. Removes theupdate undo log segment from the rseg slot if it is too big for reuse. */ void trx_purge_add_update_undo_to_history(trx_t *trx, /*!< in: transaction */trx_undo_ptr_t *undo_ptr, /*!< in/out: update undo log. */page_t *undo_page, /*!< in: update undo log header page,x-latched */bool update_rseg_history_len,/*!< in: if true: update rseg historylen else skip updating it. */ulint n_added_logs, /*!< in: number of logs added */mtr_t *mtr) /*!< in: mtr */ {trx_undo_t *undo;trx_rseg_t *rseg;trx_rsegf_t *rseg_header;trx_ulogf_t *undo_header;undo = undo_ptr->update_undo;rseg = undo->rseg;rseg_header = trx_rsegf_get(undo->rseg->space_id, undo->rseg->page_no,undo->rseg->page_size, mtr);undo_header = undo_page + undo->hdr_offset;if (undo->state != TRX_UNDO_CACHED) {ulint hist_size; #ifdef UNIV_DEBUGtrx_usegf_t *seg_header = undo_page + TRX_UNDO_SEG_HDR; #endif /* UNIV_DEBUG *//* The undo log segment will not be reused */if (UNIV_UNLIKELY(undo->id >= TRX_RSEG_N_SLOTS)) {ib::fatal(ER_IB_MSG_1165) << "undo->id is " << undo->id;}trx_rsegf_set_nth_undo(rseg_header, undo->id, FIL_NULL, mtr);MONITOR_DEC(MONITOR_NUM_UNDO_SLOT_USED);hist_size =mtr_read_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE, MLOG_4BYTES, mtr);ut_ad(undo->size == flst_get_len(seg_header + TRX_UNDO_PAGE_LIST));mlog_write_ulint(rseg_header + TRX_RSEG_HISTORY_SIZE,hist_size + undo->size, MLOG_4BYTES, mtr);}/* Add the log as the first in the history list */flst_add_first(rseg_header + TRX_RSEG_HISTORY,undo_header + TRX_UNDO_HISTORY_NODE, mtr);if (update_rseg_history_len) {trx_sys->rseg_history_len.fetch_add(n_added_logs);if (trx_sys->rseg_history_len.load() >srv_n_purge_threads * srv_purge_batch_size) {srv_wake_purge_thread_if_not_active();}}/* Update maximum transaction number for this rollback segment. */mlog_write_ull(rseg_header + TRX_RSEG_MAX_TRX_NO, trx->no, mtr);/* Write the trx number to the undo log header */mlog_write_ull(undo_header + TRX_UNDO_TRX_NO, trx->no, mtr);/* Write information about delete markings to the undo log header */if (!undo->del_marks) {mlog_write_ulint(undo_header + TRX_UNDO_DEL_MARKS, FALSE, MLOG_2BYTES, mtr);}/* Write GTID information if there. */trx_undo_gtid_write(trx, undo_header, undo, mtr, false);if (rseg->last_page_no == FIL_NULL) {rseg->last_page_no = undo->hdr_page_no;rseg->last_offset = undo->hdr_offset;rseg->last_trx_no = trx->no;rseg->last_del_marks = undo->del_marks;} }代碼有點(diǎn)亂,分成兩部分來看,結(jié)合具體的代碼調(diào)用棧最好,這樣就不會(huì)出現(xiàn)調(diào)用找不到相關(guān)的代碼。
四、總結(jié)
事務(wù)或者相關(guān)的實(shí)現(xiàn)方式,是最近這些年一直挺火的話題,包括分布式中對(duì)數(shù)據(jù)結(jié)果的共識(shí),數(shù)據(jù)存儲(chǔ)的形式,通信中對(duì)結(jié)果的操作等等,都或多或少需要這些事務(wù)相關(guān)的行為。深入數(shù)據(jù)操作的具體的過程,研究每一個(gè)細(xì)節(jié),并把它公式化,可驗(yàn)證化,就可以最大可能的保證數(shù)據(jù)的安全性。總不能和銀行打交道,操作了一半,錢沒了,也沒啥結(jié)果,這個(gè)估計(jì)誰也不能接受吧。
深入研究,不是只看表面,這需要不斷的抽象總結(jié),最終形成理論知識(shí)。
努力吧,歸來的少年!
總結(jié)
以上是生活随笔為你收集整理的mysql源码解读——事务管理的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用Java实现JVM第五章《指令集和解释
- 下一篇: ubuntu16.04编译安装mysql