如何管理和记录 SSIS 各个 Task 的开始执行时间和结束时间以及 Task 中添加|删除|修改的记录数...
開篇語
在這篇日志中?如何在 ETL 項目中統(tǒng)一管理上百個 SSIS 包的日志和包配置框架?我介紹到了包級別的日志管理框架,那么這個主要是針對包這一個層級的 Log 信息,包括包開始執(zhí)行和結(jié)束時間,以及各個包的執(zhí)行成功或者失敗狀態(tài)。
但是我們可以更加深一層次的將日志記錄 Logging 以及數(shù)據(jù)信息 Auditing 信息延伸到包中的重要 Task 中。
通常情況下,SSIS 包從各個數(shù)據(jù)源加載數(shù)據(jù)到 Staging 表中,數(shù)據(jù)源可以是文件,也可以是其它數(shù)據(jù)庫。然后經(jīng)過數(shù)據(jù)倉庫 SCD 以及 Lookup 等操作,將 Staging 中的數(shù)據(jù)清理并整理加載到各個維度以及事實表中。
假設(shè)我們需要知道在當(dāng)前操作中,各個 Staging 表加載了多少數(shù)據(jù),使用了多長時間。各個處理維度和事實的 Task 使用了多少時間,新增了多少數(shù)據(jù),修改了多少數(shù)據(jù)。這些我們也是有能力做到的,如果再配合?如何在 ETL 項目中統(tǒng)一管理上百個 SSIS 包的日志和包配置框架?這篇文章中提到的包級別日志記錄,那么我們將非常清晰的知道我們的 SSIS 包無論是在包級別,還是在各個重要 Task 級別的各種日志,數(shù)據(jù)信息。這些信息對于我們的包維護,性能分析,錯誤糾正,錯誤修復(fù)都是非常有價值的。
比如,我可以很輕松的通過自定義的報表瀏覽哪些 Task 在同等記錄情況下最耗時間,各個 Task 在整個包的執(zhí)行過程中所用的時間比。
不同的 ETL 項目在 Auditing 上會采取不同的策略,比如以文件加載為主的 ETL 是允許有部分錯誤數(shù)據(jù)加載失敗的,但是以數(shù)據(jù)倉庫為主的 ETL 則不希望出現(xiàn)錯誤數(shù)據(jù)加載的。因此在設(shè)計 Auditing 的時候要考慮到這些情況,比如設(shè)計的時候多出一個 失敗數(shù)據(jù)總數(shù)的記錄用于跟蹤文件數(shù)據(jù)等。
在這里只講如何實現(xiàn) Auditing,簡單的介紹一下核心操作,大家可以在這個基礎(chǔ)之上去擴充。
關(guān)鍵點
實現(xiàn) Auditing 的關(guān)鍵點就是要借用控制流 Task 中的 Event Handler 下的 OnPostExecute 和 OnPreExecute 功能。
通過這樣兩個事件我們很容易實現(xiàn)對 Task 執(zhí)行前后表數(shù)據(jù)變化的操作記錄。
數(shù)據(jù)源,目標表及其它數(shù)據(jù)庫對象
測試數(shù)據(jù)源是 AdventureWorksLT2012
BIWORK_SSIS 數(shù)據(jù)庫中的目標表?
IF OBJECT_ID('dbo.SalesOrderDetail') IS NOT NULL DROP TABLE dbo.SalesOrderDetail GOCREATE TABLE [dbo].[SalesOrderDetail]([SalesOrderID] [int] NOT NULL,[SalesOrderDetailID] [int] NOT NULL,[OrderQty] [smallint] NOT NULL,[ProductID] [int] NOT NULL,[UnitPrice] [money] NOT NULL,[UnitPriceDiscount] [money] NOT NULL,[LineTotal] [numeric](38, 6) NOT NULL,[rowguid] [uniqueidentifier] NOT NULL,[ModifiedDate] [datetime] NOT NULL ) ON [PRIMARY] GOTask 執(zhí)行狀態(tài)表
EXECUTION_ID 應(yīng)該使用?如何在 ETL 項目中統(tǒng)一管理上百個 SSIS 包的日志和包配置框架?中的 PROCESS_LOG_ID, 這樣就將 SSIS 包日志和 TASK 關(guān)聯(lián)起來了。
USE BIWORK_SSIS GOIF OBJECT_ID('TASK_EXECUTION_STATUS') IS NOT NULL DROP TABLE TASK_EXECUTION_STATUS GOCREATE TABLE TASK_EXECUTION_STATUS (EXECUTION_ID NVARCHAR(255), PACKAGE_NAME NVARCHAR(100),TASK_ID NVARCHAR(250),TASK_NAME NVARCHAR(250),TABLE_NAME NVARCHAR(250),ExistingRowsBefore BIGINT,StartTime DATETIME,DeletedRows BIGINT,UpdatedRows BIGINT,InsertedRows BIGINT,ExistingRowsAfter BIGINT,EndTime DATETIME,ExecutionStatus INT )獲取表的條數(shù)
IF OBJECT_ID('dbo.GET_TABLE_COUNT') IS NOT NULL DROP PROCEDURE dbo.GET_TABLE_COUNT GOCREATE PROCEDURE dbo.GET_TABLE_COUNT @TABLE_NAME NVARCHAR(50), @ROW_COUNT BIGINT OUTPUT AS BEGIN SELECT @ROW_COUNT = SUM(PART.rows) FROM sys.tables TBLINNER JOIN sys.partitions PART ON TBL.object_id = PART.object_idINNER JOIN sys.indexes IDX ON PART.object_id = IDX.object_idAND PART.index_id = IDX.index_idWHERE TBL.name = @TABLE_NAMEAND IDX.index_id < 2GROUP BY TBL.object_id, TBL.name RETURN @ROW_COUNT END GO記錄時間
這個存儲過程用來每次在執(zhí)行 Task 之前獲取目標表中的條數(shù),并且插入 Task 啟動時間 -
IF OBJECT_ID('dbo.USP_INSERT_TASK_EXECUTION','P') IS NOT NULL DROP PROCEDURE dbo.USP_INSERT_TASK_EXECUTION GOCREATE PROCEDURE USP_INSERT_TASK_EXECUTION@TARGET_TABLE_NAME NVARCHAR(50),@EXECUTION_ID NVARCHAR(255) , @PACKAGE_NAME NVARCHAR(100),@TASK_ID NVARCHAR(255),@TASK_NAME NVARCHAR(250) AS BEGINDECLARE @ExistingRowsBefore BIGINTEXECUTE dbo.GET_TABLE_COUNT@TABLE_NAME = @TARGET_TABLE_NAME,@ROW_COUNT = @ExistingRowsBefore OUTPUT INSERT INTO TASK_EXECUTION_STATUS(EXECUTION_ID, PACKAGE_NAME,TASK_ID,TASK_NAME,TABLE_NAME,ExistingRowsBefore,StartTime,DeletedRows,UpdatedRows,InsertedRows,ExistingRowsAfter,EndTime,ExecutionStatus)VALUES(@EXECUTION_ID, @PACKAGE_NAME,@TASK_ID,@TASK_NAME,@TARGET_TABLE_NAME,@ExistingRowsBefore,GETDATE(),NULL, --@DeletedRows,NULL, --@UpdatedRows,NULL, --@InsertedRows,NULL, --@ExistingRowsAfterNULL, --@EndTime0 -- In process ) END更新 Task 狀態(tài)表?
當(dāng) @DeletedRows = -1 的時候,表明操作是 Truncate 操作。
IF OBJECT_ID('dbo.USP_UPDATE_TASK_EXECUTION' ) IS NOT NULL DROP PROCEDURE dbo.USP_UPDATE_TASK_EXECUTION GOCREATE PROCEDURE dbo.USP_UPDATE_TASK_EXECUTION @ExecutionID NVARCHAR(250), @TaskID NVARCHAR(250), @DeletedRows BIGINT, @UpdatedRows BIGINT, @InsertedRows BIGINT AS BEGINUPDATE dbo.TASK_EXECUTION_STATUSSET DeletedRows = (CASE WHEN @DeletedRows = -1 THEN ExistingRowsBefore ELSE @DeletedRows END),UpdatedRows = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE @UpdatedRows END),InsertedRows = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE @InsertedRows END),ExistingRowsAfter = (CASE WHEN @DeletedRows = -1 THEN 0 ELSE (ExistingRowsBefore + @InsertedRows - @DeletedRows) END),EndTime = GETDATE(),ExecutionStatus = 1WHERE EXECUTION_ID = @ExecutionIDAND TASK_ID = @TaskID END GOSSIS 包中的流程實現(xiàn)
SSIS 包 - 第二個和第三個 Task 的功能完全一樣,只為了演示的目的。
EST_TRUNCATE_SALES_ORDER_DETAIL Task - 在加載數(shù)據(jù)之前刪除表數(shù)據(jù)。
它的 OnPreExecute 事件中添加了一個 Execute SQL Task 組件用來向 Task Execution 表插入操作前的記錄。
調(diào)用 USP_INSERT_TASK_EXECUTION 存儲過程根據(jù)表名查詢記錄數(shù)。
參數(shù) Mapping 關(guān)系,注意這里要用 Source ID , Source Name 而不是 Task ID, Task Name。因為 Task 是指當(dāng)前執(zhí)行這些 SQL 的 Task 自身,而我們要監(jiān)控的不是這個事件下的 Task ,而是控制流中的 Task 組件。
EST_TRUNCATE_SALES_ORDER_DETAIL 中 OnPostExecute 的配置 -
這里就是 Update 操作了,因為 EST_TRUNCATE_SALES_ORDER_DETAIL?是 Truncate 表操作,所以這里給了 DeletedRows = -1。
更新的時候直接根據(jù) Execution ID 和 Task ID 就可以了。
第二個 Task 也要配置 OnPreExecute 和 OnPostExecute 事件,也就是說每一個你需要監(jiān)控的 Task 都要配置。感覺比較復(fù)雜,但是一次配置完成以后,受用可是長期的。
要注意的是第二個 Task 是從數(shù)據(jù)源加載數(shù)據(jù),這樣需要在加載的過程中獲取記錄數(shù),通過 ROW COUNT 可以實現(xiàn)將數(shù)據(jù)流的條數(shù)賦值給變量保存。
另外要注意的是 - 這個變量的 SCOPE 是控制流組件自身,即作用域。因為可能要有很多 Task 需要用到記錄條數(shù)的變量,全部放到包級別中這個變量會非常多,并且容易出錯。可以理解為 InsertedRows 是局部變量,它的生命周期就是 Task 本身。
如果創(chuàng)建的變量位于包級別 SCOPE,可以點擊下方的小方框 Move?到當(dāng)前 Task 的 SCOPE 中。
變量的賦值。
OnPreExecute 的配置和上面的 Task 一樣,復(fù)制一份即可,這里是 OnPostExecute 的配置。
需要什么變量就記錄什么變量,就配置什么變量。
后面的兩個 Task 一模一樣,只是為了測試使用。
運行兩次的結(jié)果,數(shù)據(jù)條數(shù)的記錄是非常連貫的。
記錄 SCD 的修改和新增條數(shù)只需要在相應(yīng)的地方添加 ROW COUNT 組件來捕獲即可。
當(dāng)然除了使用 ROW COUNT 組件,在某些特定的情況下也可以使用 @@ROWCOUNT 來獲取新增,刪除或者修改所影響到的條數(shù)。
DECLARE @UpdateRowCnt INT DECLARE @InsertRowCnt INT--Inserting records from Source to Destination which does not exists insert into dbo.Client(ClientName,Country,Town) Select clientName, Country, Town from dbo.ClientSource S WHERE NOT EXISTS ( Select 1 from dbo.Client CL WHERE CL.ClientName=S.ClientName) SELECT @InsertRowCnt=@@ROWCOUNT--Update Already existing records from Source Update CL set CL.ClientType=S.CLientType from dbo.Client CL INNER JOIN dbo.ClientSource S ON Cl.ClientName=S.ClientName SELECT @UpdateRowCnt=@@ROWCOUNT最后一個問題
如果每次都在各個 Task 中的 OnPreExecute 和 OnPostExecute 中配置非常麻煩,有沒有改進的方法。
答案是有的。
我提供一個思路,有興趣的話可以自動動手嘗試 -
Task 級別的 OnPreExecute 和 OnPostExecute 事件是當(dāng) Task 被執(zhí)行前后被觸發(fā)的,要注意的是包級別的 OnPreExecute 和 OnPostExecute 也是可以捕獲 Task 級別的 OnPreExecute 和 OnPostExecute 事件。
可以定義一張表,表中記錄需要被處理的 Task 名稱,然后在包級別的 OnPreExecute 和 OnPostExecute 中處理 各個 Task 的 Auditing 信息。不在列表上的,就可以不用處理。
同時還要注意 Task 同步的問題,若是很多 Task 同時執(zhí)行,并行執(zhí)行的話,就需要在各自 Task 中定義好變量來記錄然后再賦值給 Package 級別的變量可以避免這一問題。
與本文相關(guān)的文章
如何在 ETL 項目中統(tǒng)一管理上百個 SSIS 包的日志和包配置框架
更多 BI 文章請參看?BI 系列隨筆列表 (SSIS, SSRS, SSAS, MDX, SQL Server)????? 如果覺得這篇文章看了對您有幫助,請幫助推薦,以方便他人在 BIWORK 博客推薦欄中快速看到這些文章。
總結(jié)
以上是生活随笔為你收集整理的如何管理和记录 SSIS 各个 Task 的开始执行时间和结束时间以及 Task 中添加|删除|修改的记录数...的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 微信小程序周报(第八期)
- 下一篇: 百度网盘不限速下载神器献给你