MSSQL · 实现分析 · SQL Server实现审计日志的方案探索
摘要
這篇文章介紹四種實(shí)現(xiàn)MSSQL Server審計(jì)日志功能的方法探索,即解析數(shù)據(jù)庫事務(wù)日志、SQL Profiler、SQL Audit以及Extended Event。詳細(xì)介紹了這四種方法的具體實(shí)現(xiàn),以及通過優(yōu)缺點(diǎn)的對比和總結(jié),最終得出結(jié)論,使用Extended Event實(shí)現(xiàn)審計(jì)日志是最好的選擇,為產(chǎn)品化選型提供參考。
審計(jì)日志需求分析
對于關(guān)系型數(shù)據(jù)庫來而言,在生產(chǎn)環(huán)境SQL Server數(shù)據(jù)庫實(shí)例中,審計(jì)日志是一個(gè)非常重要且必須的強(qiáng)需求功能,主要體現(xiàn)在以下幾個(gè)方面。
- 安全審計(jì)
- 問題排查
- 性能調(diào)優(yōu)
安全審計(jì)
在一些存取敏感信息的產(chǎn)品環(huán)境數(shù)據(jù)庫SQL Server實(shí)例中(比如:財(cái)務(wù)系統(tǒng)、設(shè)計(jì)到國家安全層面的數(shù)據(jù)庫系統(tǒng)),對數(shù)據(jù)操作要求十分謹(jǐn)慎,安全要求等級十分嚴(yán)密,需要對每一條數(shù)據(jù)操作語句進(jìn)行審計(jì),以便做到每次數(shù)據(jù)變動(dòng)或查看均可追溯。在這個(gè)場景中,對敏感信息操作的審計(jì)是基于數(shù)據(jù)安全性的要求。
問題排查
在日常生產(chǎn)系統(tǒng)管理維護(hù)過程中,我們經(jīng)常會(huì)遇到類似的場景和疑問:能否找到是誰在哪個(gè)時(shí)間點(diǎn)執(zhí)行了什么語句把數(shù)據(jù)XXX給刪除(更新)了呢?筆者在從事DBA行業(yè)的幾年工作經(jīng)歷過程中,無數(shù)次被問及到類似的問題。要解決這個(gè)場景中的問題,審計(jì)日志功能是不二選擇。
性能調(diào)優(yōu)
利用審計(jì)日志對數(shù)據(jù)庫系統(tǒng)進(jìn)行性能調(diào)優(yōu)是審計(jì)日志非常重要的功能和用途。比如,以下是幾個(gè)審計(jì)日志典型的應(yīng)用場景:
- 找出某段時(shí)間內(nèi)哪些語句導(dǎo)致了系統(tǒng)性能消耗嚴(yán)重(比如:CPU、IOPS等)
- 找出某段時(shí)間內(nèi)的TOP CPU SQL語句
- 找出某段時(shí)間內(nèi)的TOP IO SQL語句
- 找出某段時(shí)間內(nèi)的TOP Time Cost SQL語句
- 找出某段時(shí)間內(nèi)哪個(gè)用戶使用的數(shù)據(jù)庫系統(tǒng)資源最多
- 找出某段時(shí)間內(nèi)哪個(gè)應(yīng)用使用的數(shù)據(jù)庫系統(tǒng)資源最多
- ……
實(shí)現(xiàn)審計(jì)日志的方法
基于以上對審計(jì)日志的需求分析,我們了解到審計(jì)日志的功能是關(guān)系型數(shù)據(jù)至關(guān)重要的強(qiáng)需求,讓我們來看看SQL Server數(shù)據(jù)庫系統(tǒng)有哪些實(shí)現(xiàn)審計(jì)日志功能的方法和具體實(shí)現(xiàn),以及這些方法的優(yōu)缺點(diǎn)對比。
數(shù)據(jù)庫日志分析
在SQL Server數(shù)據(jù)庫事務(wù)日志中,記錄了每個(gè)事務(wù)的數(shù)據(jù)變更操作的詳細(xì)信息,包含誰在哪個(gè)時(shí)間點(diǎn)做了什么操作。所以,我們可以基于SQL Server數(shù)據(jù)庫事務(wù)日志的分析,來獲取數(shù)據(jù)變更的詳細(xì)審計(jì)日志信息。使用這個(gè)方法來實(shí)現(xiàn)審計(jì)日志功能的,有一家叫著ApexSQL的公司產(chǎn)品做的很不錯(cuò),產(chǎn)品ApexSQL Log就是通過數(shù)據(jù)庫事務(wù)日志來實(shí)現(xiàn)審計(jì)日志功能的產(chǎn)品,詳情參見:ApexSQL Log。附一張來自ApexSQL官網(wǎng)的截圖:
但是,由于SQL Server本身是微軟的閉源產(chǎn)品,對于事務(wù)日志格式外界很難知道,所以這個(gè)方法的實(shí)現(xiàn)門檻很高,實(shí)現(xiàn)難度極大。加之,有可能不同版本的SQL Server事務(wù)日志格式存在差異,必須要對每個(gè)版本的事務(wù)日志解析做相應(yīng)的適配,導(dǎo)致維護(hù)成本極高,產(chǎn)品功能延續(xù)性存在極大風(fēng)險(xiǎn)和挑戰(zhàn)。
SQL Profiler
SQL Profiler是微軟從SQL Server 2000開始引入的數(shù)據(jù)庫引擎跟蹤工具,具有使用界面操作的接口、使用SQL語句創(chuàng)建接口以及使用SMO編程創(chuàng)建接口。使用SQL Profiler,可以實(shí)現(xiàn)非常多的功能,比如:
- 圖形化監(jiān)控?cái)?shù)據(jù)庫引擎執(zhí)行的SQL語句(也可以將執(zhí)行語句保存到表中)
- 查看執(zhí)行語句實(shí)時(shí)的執(zhí)行計(jì)劃
- 數(shù)據(jù)庫引擎錯(cuò)誤信息排查
- 數(shù)據(jù)庫性能分析
- 阻塞,鎖等待、鎖升級及死鎖跟蹤
- 后臺(tái)收集查詢語句信息
- ……
所以,從功能完整性角度來說,我們完全可以使用SQL Profiler來實(shí)現(xiàn)就數(shù)據(jù)庫實(shí)例級別的審計(jì)日志的功能。那么接下來讓我們看看如何使用SQL Profiler實(shí)現(xiàn)審計(jì)日志的功能。
圖形化創(chuàng)建
開始 => 運(yùn)行 => 鍵入“Profiler” => 回車,打開Profiler工具后,點(diǎn)擊“New Trace” => Server Name => Authentication => Connect,如下圖所示:
然后,選擇General => Save to table => 選擇要保留到的實(shí)例名、數(shù)據(jù)庫名、架構(gòu)名和表名 => OK
接下來選擇要跟蹤的事件,Events Selection => SQL:StmtCompleted => Column Filters => LoginName => Not Like %sa% => OK => Run
使用SQL語句創(chuàng)建
使用圖形化界面創(chuàng)建SQL Profiler實(shí)現(xiàn)審計(jì)日志功能,簡單易用,很容易上手。但是,過程繁瑣、效率不高,難于自動(dòng)化。這個(gè)時(shí)候,就需要使用SQL語句來創(chuàng)建SQL Profiler功能,實(shí)現(xiàn)一鍵創(chuàng)建的方法了。
use master GOset nocount ondeclare @trace_folder nvarchar(256),@trace_file nvarchar(256) ,@max_files_size bigint,@stop_time datetime,@file_count int,@int_filter_cpu int,@int_filter_duration bigint,@int_filter_spid int,@set_trace_status int ;select @trace_folder=N'C:\Temp\perfmon',@max_files_size = 50 --max file size for each trace file,@file_count = 10 --max file count,@stop_time = '6/13/2017 10:50' --null: stop trace manully; specify time (stop at the specify time),@int_filter_cpu = NULL -- >= @int_filter_cpu ms will be traced. or else, skipped.--NULL: ignore this filter,@int_filter_duration = NULL --execution duration filter: millisecond--NULL: ignore this filter--,@int_filter_spid = 151 --integer: specify a spid to trace-- ,@set_trace_status = 1 --0: Stops the specified trace.; --1: Starts the specified trace.;--2: Closes the specified trace and deletes its definition from the server.; ;/*select * from sys.traces*/ --private variables declare@trace_id int,@do int,@loop int,@trace_event_id int,@trace_column_id int,@return_code tinyint,@return_decription varchar(200),@field_separator char(1); select @field_separator = ',' --trace columns list separator ;IF right(ltrim(rtrim(@trace_folder)), 1 ) <> '\' BEGINSELECT @trace_folder = ltrim(rtrim(@trace_folder)) + N'\' ;exec sys.xp_create_subdir @trace_folder END ;select@trace_file = @trace_folder + REPLACE(@@SERVERNAME, N'\', N'') ;IF @int_filter_spid IS NOT NULL BEGINselect@trace_file = @trace_file + cast(@int_filter_spid as varchar); END--select @trace_fileselect top 1@trace_id = id from sys.traces where path like @trace_file + N'%' if @trace_id is not null begin-- Start Trace (status 1 = start)EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status return endif OBJECT_ID('tempdb..#trace_event','u') is not nulldrop table #trace_event create table #trace_event (id int identity(1,1) not null primary key,trace_event_id int not null,trace_column_id int not null,event_name sysname null,trace_column_name sysname null );with trace_event as ( --select * from sys.trace_events order by trace_event_idselect is_trace = 1 , event_name = 'SQL:StmtCompleted',trace_column_list = 'NestLevel,ClientProcessID,EndTime, DatabaseID,GroupID,ServerName,SPID,DatabaseName,NTUserName,IntegerData2,RequestID,EventClass,SessionLoginName,NTDomainName,TextData,XactSequence, CPU,ApplicationName,Offset,LoginSid,TransactionID,IntegerData,Duration,SourceDatabaseID,LineNumber,ObjectID,Reads,RowCounts,Writes,IsSystem,ObjectName, LoginName,ObjectType,StartTime,HostName,EventSequence,' ), trace_column as(select *,trace_column_list_xml = CAST('<V><![CDATA[' + REPLACE(REPLACE(REPLACE(trace_column_list,CHAR(10),']]></V><V><![CDATA['),@field_separator,']]></V><V><![CDATA['),CHAR(13),']]></V><V><![CDATA[') + ']]></V>'as xml)from trace_eventwhere is_trace = 1 ) ,data as(select trace_column = T.C.value('(./text())[1]','sysname'),event_namefrom trace_column AS aCROSS APPLY trace_column_list_xml.nodes('./V') AS T(C) ) INSERT INTO #trace_event select trace_event_id = ev.trace_event_id,trace_column_id = col.trace_column_id,a.event_name,trace_column_name = a.trace_column from data as ainner join sys.trace_columns as colon a.trace_column = col.nameinner join sys.trace_events as evon a.event_name = ev.name where col.trace_column_id is not null order by ev.trace_event_id ;--select * from #trace_event---private variables select @trace_id = 0,@do = 1,@loop = @@ROWCOUNT,@trace_event_id = 0,@trace_column_id = 0,@return_code = 0,@return_decription = '' ;--create trace exec @return_code = sys.sp_trace_create @traceid = @trace_id OUTPUT , @options = 2 , @tracefile = @trace_file, @maxfilesize = @max_files_size, @stoptime = @stop_time, @filecount = @file_count ;select trace_id = @trace_id,[current_time] = getdate(),[stop_time] = @stop_time ;set@return_decription = case @return_code when 0 then 'No error.' when 1 then 'Unknown error.' when 10 then 'Invalid options. Returned when options specified are incompatible.' when 12 then 'File not created.' when 13 then 'Out of memory. Returned when there is not enough memory to perform the specified action.' when 14 then 'Invalid stop time. Returned when the stop time specified has already happened.' when 15 then 'Invalid parameters. Returned when the user supplied incompatible parameters.' else ''end ;raiserror('Trace create with: %s',10,1,@return_decription) with nowait--loop set trace event & event column while @do <= @loop beginselect top 1 @trace_event_id = trace_event_id,@trace_column_id = trace_column_idfrom #trace_eventwhere id = @do;--set trace eventexec sys.sp_trace_setevent @trace_id, @trace_event_id, @trace_column_id, 1raiserror('exec sys.sp_trace_setevent @trace_id, %d, %d, 1',10,1,@trace_event_id,@trace_column_id) with nowaitset @do = @do + 1; end--CPU >= 500/ cpu columnid = 18 IF @int_filter_cpu IS NOT NULLEXEC sys.sp_trace_setfilter @trace_id, 18, 0, 4, @int_filter_cpu--duration filter/ duration columnid=13 IF @int_filter_duration IS NOT NULLEXEC sys.sp_trace_setfilter @trace_id, 13, 0, 4, @int_filter_duration--spid filter/ spid columnid=12 IF @int_filter_spid IS NOT NULLexec sys.sp_trace_setfilter @trace_id, 12, 0, 0, @int_filter_spid--applicationName not like 'SQL Server Profiler%' EXEC sys.sp_trace_setfilter @trace_id, 10, 0, 7, N'SQL Server Profiler%'-- Start Trace (status 1 = start) EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status GO其中輸入?yún)?shù)表達(dá)的含義解釋如下:
@trace_folder:Trace文件存放的位置
@max_files_size:每一個(gè)Trace文件大小
@file_count:Trace滾動(dòng)最多的文件數(shù)量
@stop_time:Trace停止的時(shí)間
@int_filter_cpu:CPU過濾閾值,CPU使用率超過這個(gè)值會(huì)被記錄下來,單位毫秒
@int_filter_duration:執(zhí)行時(shí)間過濾閾值,執(zhí)行時(shí)間超過這個(gè)值會(huì)被記錄,單位毫秒
@set_trace_status:Trace的狀態(tài):0停止;1啟動(dòng);2刪除
SMO編程創(chuàng)建
SQL Profiler除了使用圖形化界面創(chuàng)建,使用系統(tǒng)存儲(chǔ)過程創(chuàng)建兩種方法以外,還可以使用SMO編程方法來創(chuàng)建。
SQL Audit
使用SQL Audit實(shí)現(xiàn)SQL Server審計(jì)日志功能需要以下三個(gè)步驟來完成:
- 創(chuàng)建實(shí)例級別的Audit并啟動(dòng)
- 創(chuàng)建數(shù)據(jù)庫級別的Audit Specification
- 讀取審計(jì)日志文件
創(chuàng)建實(shí)例級別Audit
使用Create Server Audit語句創(chuàng)建實(shí)例級別的Audit,方法如下:
USE [master] GO CREATE SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] TO FILE ( FILEPATH = N'C:\Temp\Audit',MAXSIZE = 10 MB,MAX_ROLLOVER_FILES = 10,RESERVE_DISK_SPACE = OFF ) WITH ( QUEUE_DELAY = 1000,ON_FAILURE = CONTINUE ) GO啟動(dòng)實(shí)例級別的Audit,代碼如下
USE [master] GO ALTER SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] WITH(STATE=ON) ; GO創(chuàng)建數(shù)據(jù)庫級別Audit Specification
實(shí)例級別Audit創(chuàng)建完畢后,接下來是對需要審計(jì)的數(shù)據(jù)庫建立對于的Audit Specification,方法如下:
USE [testdb] GO CREATE DATABASE AUDIT SPECIFICATION [Audit_Spec_for_TestDB] FOR SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] ADD (SELECT, INSERT, UPDATE, DELETE, EXECUTE ON DATABASE::[testdb] BY [public]) WITH (STATE = ON); GO由于SQL Audit Specification是基于數(shù)據(jù)庫級別的,所以存在以下場景的維護(hù)性復(fù)雜度增加:
- 用戶需要審計(jì)實(shí)例中某些或者所有數(shù)據(jù)庫,必須在每個(gè)需要審計(jì)的數(shù)據(jù)庫下創(chuàng)建對象
- 用戶實(shí)例有新數(shù)據(jù)庫創(chuàng)建,并需要審計(jì)日志功能時(shí),必須在新的數(shù)據(jù)庫下創(chuàng)建對象
讀取審計(jì)日志文件
最后,我們需要將審計(jì)日志文件中存放的內(nèi)容讀取出來,使用SQL Server提供的系統(tǒng)函數(shù)sys.fn_get_audit_file,方法如下:
DECLARE @AuditFilePath sysname ; SELECT @AuditFilePath = audit_file_path FROM sys.dm_server_audit_status WHERE name = 'Audit_Svr_User_Defined_for_Testing' SELECT statement,* FROM sys.fn_get_audit_file(@AuditFilePath,default,default) ;Extended Event
微軟SQL Server產(chǎn)品長期的規(guī)劃是逐漸使用Extended Event來替換SQL Profiler工具,因?yàn)镋xtended Event更加輕量級,性能消耗比SQL Profiler大幅降低,因此對用戶系統(tǒng)性能影響也大幅減輕。在審計(jì)日志的應(yīng)用場景中,只需要在實(shí)例級別創(chuàng)建一個(gè)Extended Event Session對象,然后啟用即可。既滿足了功能性的需求,又能夠做到很好后期維護(hù),不需要為某一個(gè)數(shù)據(jù)庫創(chuàng)建相應(yīng)對象,對實(shí)例的性能消耗大幅降低到5%左右。
創(chuàng)建Extended Event Session
使用Create Event Session On Server語句創(chuàng)建基于實(shí)例級別的Extended Event。語句如下:
USE master GO CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER ADD EVENT sqlserver.sql_statement_completed ( ACTION ( sqlserver.database_id,sqlserver.database_name,sqlserver.session_id, sqlserver.username, sqlserver.client_hostname,sqlserver.client_app_name,sqlserver.sql_text, sqlserver.query_hash,sqlserver.query_plan_hash,sqlserver.plan_handle,sqlserver.tsql_stack,sqlserver.is_system,package0.collect_system_time) WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM' AND sqlserver.username <> 'sa' AND (NOT sqlserver.like_i_sql_unicode_string(sqlserver.client_app_name, '%IntelliSense'))AND sqlserver.is_system = 0) ADD TARGET package0.asynchronous_file_target ( SET FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel', MAX_FILE_SIZE = 10,MAX_ROLLOVER_FILES = 100 ) WITH (EVENT_RETENTION_MODE = NO_EVENT_LOSS,MAX_DISPATCH_LATENCY = 5 SECONDS ); GO啟用Extended Event Session
Extended Event Session對象創(chuàng)建完畢后,需要啟動(dòng)這個(gè)session對象,方法如下:
USE master GO -- We need to enable event session to capture event and event data ALTER EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER STATE = START; GO讀取審計(jì)日志文件
Extend Event生成審計(jì)日志文件以后,我們可以使用sys.fn_xe_file_target_read_file系統(tǒng)函數(shù)來讀取,然后分析event_data列所記錄的詳細(xì)信息。
USE master GO SELECT * FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', null, null, null)方案對比
根據(jù)前面章節(jié)“實(shí)現(xiàn)審計(jì)日志的方法”部分的介紹,我們從可靠性、對象級別、可維護(hù)性、開銷和對數(shù)據(jù)庫系統(tǒng)的影響五個(gè)方面來總結(jié)這四種技術(shù)的優(yōu)缺點(diǎn)。
- 可靠性:這四種實(shí)現(xiàn)審計(jì)日志的方法可靠性都有保障,如果使用數(shù)字化衡量可維護(hù)性,得滿分100分
- 對象級別:SQL Profiler和Extended Event是基于實(shí)例級別的技術(shù)方案;解析事務(wù)日志解析和SQL Audit方法是基于數(shù)據(jù)庫級別的技術(shù),一旦有數(shù)據(jù)庫創(chuàng)建或者刪除操作,需要做相應(yīng)的適配,所以維護(hù)成本也相對高。基于數(shù)據(jù)庫級別的方案得分為0,基于實(shí)例級別得分為100
- 維護(hù)性:基于實(shí)例級別的實(shí)現(xiàn)方法可維護(hù)性(得分100)顯然優(yōu)于基于數(shù)據(jù)庫級別(得分為0)的實(shí)現(xiàn)方式
- 開銷:SQL Profiler對數(shù)據(jù)庫系統(tǒng)開銷很大,大概20%左右(得分100 - 20 = 80),其他三種開銷較小5%左右(得分100 - 5 = 95)
- 影響:開銷大的技術(shù)方案自然影響就大,反之亦然。得分與開銷部分類似。
四種技術(shù)方案優(yōu)缺點(diǎn)匯總?cè)缦卤硭?#xff1a;
以下是對四種實(shí)現(xiàn)審計(jì)日志方法五個(gè)維度打分,得分統(tǒng)計(jì)匯總?cè)缦卤硭?#xff1a;
將匯總得分做成雷達(dá)圖,如下圖所示:
從雷達(dá)圖我們可以很清楚的看到,綜合考慮可靠性、可維護(hù)性、系統(tǒng)開銷和影響來看,使用Extended Event實(shí)現(xiàn)審計(jì)日志的方法是最優(yōu)的選擇。
最后總結(jié)
本期分享了SQL Server實(shí)現(xiàn)審計(jì)日志功能的四種技術(shù)方案和詳細(xì)實(shí)現(xiàn),并從可靠性、可維護(hù)性、對象級別、系統(tǒng)開銷和影響五個(gè)維度分析了四種方案各自的優(yōu)缺點(diǎn),最后的結(jié)論是使用Extended Event實(shí)現(xiàn)審計(jì)日志方法是最優(yōu)選擇,以此來為我們的產(chǎn)品化做出正確的選擇。
總結(jié)
以上是生活随笔為你收集整理的MSSQL · 实现分析 · SQL Server实现审计日志的方案探索的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: nas做服务器虚拟化共享存储,NAS虚拟
- 下一篇: 物联网资料大全