【详谈 Delta Lake 】系列技术专题 之 特性(Features)
簡介: 本文翻譯自大數據技術公司 Databricks 針對數據湖 Delta Lake 的系列技術文章。眾所周知,Databricks 主導著開源大數據社區 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為數據湖核心存儲引擎方案給企業帶來諸多的優勢。本系列技術文章,將詳細展開介紹 Delta Lake。
前言
本文翻譯自大數據技術公司 Databricks 針對數據湖 Delta Lake 系列技術文章。眾所周知,Databricks 主導著開源大數據社區 Apache Spark、Delta Lake 以及 ML Flow 等眾多熱門技術,而 Delta Lake 作為數據湖核心存儲引擎方案給企業帶來諸多的優勢。
此外,阿里云和 Apache Spark 及 Delta Lake 的原廠 Databricks 引擎團隊合作,推出了基于阿里云的企業版全托管 Spark 產品——Databricks 數據洞察,該產品原生集成企業版 Delta Engine 引擎,無需額外配置,提供高性能計算能力。有興趣的同學可以搜索` Databricks 數據洞察`或`阿里云 Databricks `進入官網,或者直接訪問https://www.aliyun.com/product/bigdata/spark 了解詳情。
譯者:張鵬(卓昇),阿里云計算平臺事業部技術專家
Delta Lake 技術系列 - 特性(Features)
——使用 Delta Lake 穩定的特性來可靠的管理您的數據
目錄
- Chapter-01 ?為什么使用 Delta Lake 的 MERGE 功能?
- Chapter-02 ?使用 Python API 在 Delta Lake 數據表上進行簡單,可靠的更新和刪除操作
- Chapter-03 ?大型數據湖的 Time Travel 功能
- Chapter-04 ?輕松克隆您的 Delta Lake 以方便測試,數據共享以及進行重復的機器學習
- Chapter-05 ?在 Apache Spark 上的 Delta Lake 中啟用 Spark SQL 的 DDL 和 DML 語句
本文介紹內容
Delta Lake 系列電子書由 Databricks 出版,阿里云計算平臺事業部大數據生態企業團隊翻譯,旨在幫助領導者和實踐者了解 Delta Lake 的全部功能以及它所處的場景。在本文 Delta Lake 系列 - 特性( Features )中,重點介紹 Delta Lake 的特性。
后續
讀完本文后,您不僅可以了解 Delta Lake 提供了那些特性,還可以理解這些的特性是如何帶來實質性的性能改進的。
什么是 Delta Lake?
Delta Lake 是一個統一的數據管理系統,為云上數據湖帶來數據可靠性和快速分析。Delta Lake 運行在現有數據湖之上,并且與 Apache Spark 的 API 完全兼容。
在 Databricks 中,我們看到了 Delta Lake 如何為數據湖帶來可靠性、高性能和生命周期管理。我們的客戶已經驗證,Delta Lake 解決了以下挑戰:從復雜的數據格式中提取數據、很難刪除符合要求的數據、以及為了進行數據捕獲從而修改數據所帶來的問題。
通過使用 Delta Lake,您可以加快高質量數據導入數據湖的速度,團隊也可以在安全且可擴展云服務上快速使用這些數據。
Chapter-01 為什么使用 Delta Lake 的 MERGE 功能?
Delta Lake 是在 Apache Spark 之上構建的下一代引擎,支持 MERGE 命令,該命令使您可以有效地在數據湖中上傳和刪除記錄。
MERGE 命令大大簡化了許多通用數據管道的構建方式-所有重寫整個分區的低效且復雜的多跳步驟現在都可以由簡單的 MERGE 查詢代替。
這種更細粒度的更新功能簡化了如何為各種用例(從變更數據捕獲到 GDPR )構建大數據管道的方式。您不再需要編寫復雜的邏輯來覆蓋表同時克服快照隔離的不足。
隨著數據的變化,另一個重要的功能是在發生錯誤寫入時能夠進行回滾。 Delta Lake 還提供了帶有時間旅行特性的回滾功能,因此如果您合并不當,則可以輕松回滾到早期版本。
在本章中,我們將討論需要更新或刪除現有數據的常見用例。我們還將探討新增和更新固有的挑戰,并說明 MERGE 如何解決這些挑戰。
什么時候需要 upserts?
在許多常見場景中,都需要更新或刪除數據湖中的現有數據:
- 遵守通用數據保護法規(GDPR):隨著 GDPR 中數據遺忘規則(也稱為數據擦除)的推出,組織必須根據要求刪除用戶的信息。數據擦除還包括刪除數據湖中的用戶信息。
- 更改傳統數據庫中獲得的數據:在面向服務的體系結構中,典型的 web 和移動應用程序采用微服務架構,這些微服務架構一般是基于具有低延遲性能的傳統 SQL/NoSQL 數據庫搭建的。組織面臨的最大挑戰之一是將許多孤立的數據系統建立連接,因此數據工程師建立了管道,可以將所有數據源整合到中央數據湖中以加快分析。這些管道必須定期讀取傳統 SQL/NoSQL 表所做的更改,并將其應用于數據湖中的對應表中。此類更改可以支持多種形式:變化緩慢的表,所有插入/更新/刪除數據的數據變更等。
- 會話化:從產品分析,到目標廣告,再到預測性維護的許多領域,將多個事件分組為一個會話是常見的例子。建立連續的應用來跟蹤會話并記錄寫入數據湖的結果是非常困難的,因為數據湖經常因為追加的數據而進行優化。
- 重復數據刪除:常見的數據管道用例是通過追加數據的方式來將系統日志收集到 Delta Lake 表中。但是數據源通常會生成重復記錄,并且需要下游刪除重復數據來處理它們。
為什么對數據湖的 upserts 在傳統上具有挑戰性
由于數據湖基本上是基于文件的,它們經常針對新增數據而不是更改現有數據進行優化。因此構建上述用例一直是具有挑戰性的。
用戶通常會讀取整個表(或分區的子集),然后將其覆蓋。因此,每個組織都嘗試通過編寫復雜的查詢 SQL,Spark 等方式來重新造輪子,來滿足他們的需求。這種方法的特點是:
- 低效:為了更新很少的記錄而讀取和重寫整個分區(或整個表)會導致管道運行緩慢且成本高昂。手動調整表布局以及優化查詢是很繁瑣的,而且需要深厚的領域知識。
- 有可能出錯:手寫代碼來修改數據很容易出現邏輯和人為錯誤。例如,多個管道在沒有任何事務支持的情況下同時修改同一張表可能會導致不可預測的數據不一致,在最壞的情況下有可能會導致數據丟失。通常,即使是單一的手寫管道也可能由于業務邏輯中的錯誤,從而導致數據損壞。
- 難以維護:從根本上來說,這類手寫代碼難以理解,跟蹤和維護。從長遠來看,僅此一項就會顯著增加組織和基礎設施成本。
介紹 Delta Lake 中 MERGE 命令
使用 Delta Lake,您可以使用以下 MERGE 命令輕松解決上述用例,并且不會遇到任何上述問題:
讓我們通過一個簡單的示例來了解如何使用 MERGE。 假設您有一個變化緩慢的用戶數據表,該表維護著諸如地址之類的用戶信息。 此外您還有一個現有用戶和新用戶的新地址表。 要將所有新地址合并到主用戶表中,可以運行以下命令:
MERGE INTO users USING updates ON users.userId = updates.userId WHEN MATCHED THEN UPDATE SET address = updates.addresses WHEN NOT MATCHED THENINSERT (userId, address) VALUES (updates.userId, updates.address)這完全符合語法的要求-對于現有用戶(即 MATCHED 子句),它將更新 address 列,對于新用戶(即 NOT MATCHED 子句),它將插入所有列。 對于具有 TB 規模的大型數據表,Delta Lake MERGE 操作比覆蓋整個分區或表要快N個數量級,因為 Delta Lake 僅讀取相關文件并更新它們。 具體來說,Delta Lake 的 MERGE 命令具有以下優勢:
- 細粒度:該操作以文件而不是分區的粒度重寫數據,這樣解決了重寫分區,使用 MSCK 更新 Hive 元數據庫等所有復雜問題。
- 高效:Delta Lake 的數據 skip 功能使 MERGE 在查找要重寫的文件方面更高效,從而無需手動優化管道。 此外 Delta Lake 對所有 I/O 和處理過程進行了優化,使得 MERGE 進行所有數據的讀寫速度明顯快于 Apache Spark 中的類似操作。
- 事務性:Delta Lake 使用樂觀并發控制來確保并發寫入程序使用 ACID 事務來正確更新數據,同時并發讀取程序始終會看到一致的數據快照。
下圖是 MERGE 與手寫管道的直觀對比。
使用 MERGE 簡化用例
遵守 GDPR 而刪除數據
遵守 GDPR 的“被遺忘權”條款對數據湖中的數據進行任何處理都不容易。您可以使用示例代碼來設置一個簡單的定時計劃作業,如下所示,刪除所有選擇退出服務的用戶。
MERGE INTO users USING opted_out_users ON opted_out_users.userId = users.userId WHEN MATCHED THEN DELETE數據庫中的數據變更應用
您可以使用 MERGE 語法輕松地將外部數據庫的所有數據更改(更新,刪除,插入)應用到 Delta Lake 表中,如下所示:
MERGE INTO users USING ( SELECT userId, latest.address AS address, latest.deleted AS deleted FROM ( SELECT userId, MAX(struct(TIME, address, deleted)) AS latest FROM changes GROUP BY userId ) ) latestChange ON latestChange.userId = users.userId WHEN MATCHED AND latestChange.deleted = TRUE THEN DELETE WHEN MATCHED THEN UPDATE SET address = latestChange.address WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN INSERT (userId, address) VALUES (userId, address)從 streaming 管道更新會話信息
如果您有流事件的數據流入,并且想要對流事件數據進行會話化,同時增量更新會話并將其存儲在 Delta Lake 表中,則可以使用結構化數據流和 MERGE 中的 foreachBatch 來完成此操作。 例如,假設您有一個結構化流數據框架,該框架為每個用戶計算更新的 session 信息。 您可以在所有會話應用中啟動流查詢,更新數據到 Delta Lake 表中,如下所示(Scala 語言)。
streamingSessionUpdatesDF.writeStream .foreachBatch { (microBatchOutputDF: DataFrame, batchId: Long) => microBatchOutputDF.createOrReplaceTempView(“updates”) microBatchOutputDF.sparkSession.sql(s””” MERGE INTO sessions USING updates ON sessions.sessionId = updates.sessionId WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * “””) }.start()Chapter-02 使用Python API在Delta Lake數據表上進行簡單,可靠的更新和刪除操作
在本章中,我們將演示在飛機時刻表的場景中,如何在 Delta Lake 中使用 Python 和新的 Python API。 我們將展示如何新增,更新和刪除數據,如何使用 time travle 功能來查詢舊版本數據,以及如何清理較舊的版本。
Delta Lake 使用入門
Delta Lake 軟件包可以通過 PySpark 的--packages 選項來進行安裝。在我們的示例中,我們還將演示在 VACUUM 文件和 Apache Spark 中執行 Delta Lake SQL 命令的功能。 由于這是一個簡短的演示,因此我們還將啟用以下配置:
spark.databricks.delta.retentionDurationCheck.enabled=false允許我們清理文件的時間短于默認的保留時間7天。 注意,這僅是對于 SQL 命令 VACUUM 是必需的。
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension在 Apache Spark 中啟用 Delta Lake SQL 命令;這對于 Python 或 Scala API 調用不是必需的。
# Using Spark Packages ./bin/pyspark --packages io.delta:delta-core_2.11:0.4.0 --conf “spark. databricks.delta.retentionDurationCheck.enabled=false” --conf “spark. sql.extensions=io.delta.sql.DeltaSparkSessionExtension”Delta Lake 數據的加載和保存
這次將使用準時飛行數據或離港延誤數據,這些數據是從 RITA BTS 航班離崗統計中心生成的;這些數據的一些示例包括 2014 Flight Departure Performance via d3.js Crossfilter 和 針對Apache Spark的具有圖形化結構的準時飛行數據。 在 PySpark 中,首先讀取數據集。
# Location variables tripdelaysFilePath = “/root/data/departuredelays.csv” pathToEventsTable = “/root/deltalake/departureDelays.delta”# Read flight delay data departureDelays = spark.read \ .option(“header”, “true”) \ .option(“inferSchema”, “true”) \ .csv(tripdelaysFilePath)接下來,我們將離港延遲數據保存到 Delta Lake 表中。 在保存的過程中,我們能夠利用它的優勢功能,包括 ACID 事務,統一批處理,streaming 和 time travel。
# Save flight delay data into Delta Lake format departureDelays \ .write \ .format(“delta”) \ .mode(“overwrite”) \ .save(“departureDelays.delta”)注意,這種方法類似于保存 Parquet 數據的常用方式。 現在您將指定格式(“delta”)而不是指定格式(“parquet”)。如果要查看基礎文件系統,您會注意到為 Delta Lake 的離港延遲表創建了四個文件。
/departureDelays.delta$ ls -l . .. _delta_log part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet Part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet現在,讓我們重新加載數據,但是這次我們的數據格式將由 Delta Lake 支持。
# Load flight delay data in Delta Lake format delays_delta = spark \ .read \ .format(“delta”) \ .load(“departureDelays.delta”) # Create temporary view delays_delta.createOrReplaceTempView(“delays_delta”)# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()運行結果:
最后,我們確定了從西雅圖飛往舊金山的航班數量;在此數據集中,有1698個航班。
立馬轉換到 Delta Lake
如果您有現成的 Parquet 表,則可以將它們轉換為 Delta Lake 格式,從而無需重寫表。 如果要轉換表,可以運行以下命令。
from delta.tables import *# Convert non partitioned parquet table at path ‘/path/to/table’ deltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/ table`”)# Convert partitioned parquet table at path ‘/path/to/table’ and partitioned by integer column named ‘part’ partitionedDeltaTable = DeltaTable.convertToDelta(spark, “parquet.`/path/to/table`”, “part int”)刪除我們的航班數據
要從傳統的數據湖表中刪除數據,您將需要:
從上面的查詢中可以看到,我們刪除了所有準時航班和早班航班(更多信息,請參見下文),從西雅圖到舊金山的航班有837班延誤。 如果您查看文件系統,會注意到即使刪除了一些數據,還是有更多文件。
/departureDelays.delta$ ls -l _delta_log part-00000-a2a19ba4-17e9-4931-9bbf-3c9d4997780b-c000.snappy.parquet part-00000-df6f69ea-e6aa-424b-bc0e-f3674c4f1906-c000.snappy.parquet part-00001-711bcce3-fe9e-466e-a22c-8256f8b54930-c000.snappy.parquet part-00001-a0423a18-62eb-46b3-a82f-ca9aac1f1e93-c000.snappy.parquet part-00002-778ba97d-89b8-4942-a495-5f6238830b68-c000.snappy.parquet part-00002-bfaa0a2a-0a31-4abf-aa63-162402f802cc-c000.snappy.parquet part-00003-1a791c4a-6f11-49a8-8837-8093a3220581-c000.snappy.parquet part-00003-b0247e1d-f5ce-4b45-91cd-16413c784a66-c000.snappy.parquet在傳統的數據湖中,刪除是通過重寫整個表(不包括要刪除的值)來執行的。 使用 Delta Lake,可以通過有選擇地寫入包含要刪除數據的文件的新版本來執行刪除操作,同時僅將以前的文件標記為已刪除。 這是因為 Delta Lake 使用多版本并發控制(MVCC)對表執行原子操作:例如,當一個用戶正在刪除數據時,另一用戶可能正在查詢之前的版本。這種多版本模型還使我們能夠回溯時間(即 time travel)并查詢以前的版本,這個功能稍后我們將看到。
更新我們的航班數據
要更新傳統數據湖表中的數據,您需要:
代替上面的步驟,使用 Delta Lake 我們可以通過運行 UPDATE 語句來簡化此過程。 為了顯示這一點,讓我們更新所有從底特律到西雅圖的航班。
# Update all flights originating from Detroit to now be originating from Seattle deltaTable.update(“origin = ‘DTW’”, { “origin”: “’SEA’” } )# How many flights are between Seattle and San Francisco spark.sql(“select count(1) from delays_delta where origin = ‘SEA’ and destination = ‘SFO’”).show()如今底特律航班已被標記為西雅圖航班,現在我們有986航班從西雅圖飛往舊金山。如果您要列出您的離崗延遲文件系統(即 $ ../departureDelays/ls -l),您會注意到現在有11個文件(而不是刪除文件后的8個文件和表創建后的4個文件)。
合并我們的航班數據
使用數據湖時,常見的情況是將數據連續追加到表中。這通常會導致數據重復(您不想再次將其插入表中),需要插入的新行以及一些需要更新的行。 使用 Delta Lake,所有這些都可以通過使用合并操作(類似于 SQL MERGE 語句)來實現。
讓我們從一個樣本數據集開始,您將通過以下查詢對其進行更新,插入或刪除重復數據。
# What flights between SEA and SFO for these date periods spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()該查詢的輸出如下表所示。 請注意,已添加顏色編碼以清楚地標識哪些行是已刪除的重復數據(藍色),已更新的數據(黃色)和已插入的數據(綠色)。
接下來,讓我們生成自己的 merge_table,其中包含將插入,更新或刪除重復的數據。具體看以下代碼段
items = [(1010710, 31, 590, ‘SEA’, ‘SFO’), (1010521, 10, 590, ‘SEA’, ‘SFO’), (1010822, 31, 590, ‘SEA’, ‘SFO’)] cols = [‘date’, ‘delay’, ‘distance’, ‘origin’, ‘destination’] merge_table = spark.createDataFrame(items, cols) merge_table.toPandas()在上表(merge_table)中,有三行不同的日期值:
使用 Delta Lake,可以通過合并語句輕松實現,具體看下面代碼片段。
# Merge merge_table with flights deltaTable.alias(“flights”) \ .merge(merge_table.alias(“updates”),”flights.date = updates.date”) \ .whenMatchedUpdate(set = { “delay” : “updates.delay” } ) \ .whenNotMatchedInsertAll() \ .execute() # What flights between SEA and SFO for these date periods spark.sql(“select * from delays_delta where origin = ‘SEA’ and destination = ‘SFO’ and date like ‘1010%’ limit 10”).show()一條語句即可有效完成刪除重復數據,更新和插入這三個操作。
查看數據表歷史記錄
如前所述,在我們進行每個事務(刪除,更新)之后,在文件系統中創建了更多文件。 這是因為對于每個事務,都有不同版本的 Delta Lake 表。
這可以通過使用 DeltaTable.history() 方法看到,如下所示。
注意,您還可以使用 SQL 執行相同的任務:
spark.sql(“DESCRIBE HISTORY ‘” + pathToEventsTable + “’”).show()
如您所見,對于每個操作(創建表,刪除和更新),都有三行代表表的不同版本(以下為簡化版本,以幫助簡化閱讀):
回溯數據表的歷史
借助 Time Travel,您可以查看帶有版本或時間戳的 Delta Lake 表。要查看歷史數據,請指定版本或時間戳選項。 在以下代碼段中,我們將指定版本選項。
# Load DataFrames for each version dfv0 = spark.read.format(“delta”).option(“versionAsOf”, 0).load(“departureDelays.delta”) dfv1 = spark.read.format(“delta”).option(“versionAsOf”, 1).load(“departureDelays.delta”) dfv2 = spark.read.format(“delta”).option(“versionAsOf”, 2).load(“departureDelays.delta”)# Calculate the SEA to SFO flight counts for each version of history cnt0 = dfv0.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt1 = dfv1.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count() cnt2 = dfv2.where(“origin = ‘SEA’”).where(“destination = ‘SFO’”).count()# Print out the value print(“SEA -> SFO Counts: Create Table: %s, Delete: %s, Update: %s” % (cnt0, cnt1, cnt2))## Output SEA -> SFO Counts: Create Table: 1698, Delete: 837, Update: 986無論是用于治理,風險管理,合規(GRC)還是錯誤時進行回滾,Delta Lake 表都包含元數據(例如,記錄操作員刪除的事實)和數據(例如,實際刪除的行)。但是出于合規性或大小原因,我們如何刪除數據文件?
使用 vacuum 清理舊版本的數據表
默認情況下,Delta Lake vacuum 方法將刪除所有超過7天參考時間的行(和文件)。如果要查看文件系統,您會注意到表的11個文件。
/departureDelays.delta$ ls -l _delta_logpart-00000-5e52736b-0e63-48f3-8d56-50f7cfa0494d-c000.snappy.parquet part-00000-69eb53d5-34b4-408f-a7e4-86e000428c37-c000.snappy.parquet part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-20893eed-9d4f-4c1f-b619-3e6ea1fdd05f-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00001-d4823d2e-8f9d-42e3-918d-4060969e5844-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00002-3027786c-20a9-4b19-868d-dc7586c275d4-c000.snappy.parquet part-00002-f2609f27-3478-4bf9-aeb7-2c78a05e6ec1-c000.snappy.parquet part-00003-850436a6-c4dd-4535-a1c0-5dc0f01d3d55-c000.snappy.parquet Part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet要刪除所有文件,以便僅保留當前數據快照,您可以 vacuum 方法指定一個較小的值(而不是默認保留7天)。
# Remove all files older than 0 hours old. deltaTable.vacuum(0) Note, you perform the same task via SQL syntax: ? # Remove all files older than 0 hours old spark.sql(“VACUUM ‘” + pathToEventsTable + “‘ RETAIN 0 HOURS”)清理完成后,當您查看文件系統時,由于歷史數據已被刪除,您會看到更少的文件。
/departureDelays.delta$ ls -l _delta_log part-00000-f8edaf04-712e-4ac4-8b42-368d0bbdb95b-c000.snappy.parquet part-00001-9b68b9f6-bad3-434f-9498-f92dc4f503e3-c000.snappy.parquet part-00002-24da7f4e-7e8d-40d1-b664-95bf93ffeadb-c000.snappy.parquet part-00003-b9292122-99a7-4223-aaa9-8646c281f199-c000.snappy.parquet請注意,運行 vacuum 之后,回溯到比保留期更早的版本的功能將會失效。
Chapter-03 大型數據湖的 Time Travel 功能
Delta Lake 提供 Time Travel 功能。 Delta Lake 是一個開源存儲層,可為數據湖帶來可靠性。 Delta Lake 提供 ACID 事務,可伸縮的元數據處理,以及批流一體數據處理。 Delta Lake 在您現有的數據湖之上運行,并且與 Apache Spark API 完全兼容。
使用此功能,Delta Lake 會自動對您存儲在數據湖中的大數據進行版本控制,同時您可以訪問該數據的任何歷史版本。這種臨時數據管理可以簡化您的數據管道,包括簡化審核,在誤寫入或刪除的情況下回滾數據以及重現實驗和報告。
您的組織最終可以在一個干凈,集中化,版本化的云上大數據存儲庫上實現標準化,以此進行分析。
更改數據的常見挑戰
- 審核數據更改:審核數據更改對于數據合規性以及簡單的調試(以了解數據如何隨時間變化)都至關重要。在這種情況下,傳統數據系統都轉向大數據技術和云服務。
- 重現實驗和報告:在模型訓練期間,數據科學家對給定的數據集執行不同參數的各種實驗。當科學家在一段時間后重新訪問實驗以重現模型時,通常源數據已被上游管道修改。很多時候他們不知道這些上游數據發生了更改,因此很難重現他們的實驗。一些科學家和最好的工程師通過創建數據的多個副本來進行實踐,從而增加了存儲量的費用。對于生成報告的分析師而言,情況也是如此。
- 回滾:數據管道有時會向下游消費者寫入臟數據。發生這種情況的原因可能是基礎架構不穩定或者混亂的數據或者管道中的 Bug 等問題。對目錄或表進行簡單追加的管道,可以通過基于日期的分區輕松完成回滾。隨著更新和刪除,這可能變得非常復雜,數據工程師通常必須設計復雜的管道來應對這種情況。
使用Time Travel功能
Delta Lake 的 time travel 功能簡化了上述用例的數據管道構建。Delta Lake 中的 Time Travel 極大地提高了開發人員的生產力。它有助于:
- 數據科學家可以更好地管理實驗
- 數據工程師簡化了管道同時可以回滾臟數據
- 數據分析師可以輕松地分析報告
企業最終可以在干凈,集中化,版本化的云存儲中的大數據存儲庫上建立標準化,在此基礎上進行數據分析。我們很高興看到您將能夠使用此功能完成工作。
當您寫入 Delta Lake 表或目錄時,每個操作都會自動進行版本控制。您可以通過兩種不同的方式訪問數據的不同版本:
使用時間戳
Scala 語法
您可以將時間戳或日期字符串作為 DataFrame 閱讀器的選項來提供:
val df = spark.read .format(“delta”) . option(“timestampAsOf”, “2019-01-01”) .load(“/path/to/my/table”) df = spark.read \ .format(“delta”) \ .option(“timestampAsOf”, “2019-01-01”) \ .load(“/path/to/my/table”) SQL語法 SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01” SELECT count(*) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) SELECT count(*) FROM my_table TIMESTAMP AS OF “2019-01-01 01:30:00.000”如果您無權訪問閱讀器的代碼庫,您可以將輸入參數傳遞給該庫以讀取數據,通過將 yyyyMMddHHmmssSSS 格式的時間戳傳遞給表來進行數據回滾:
val inputPath = “/path/to/my/table@20190101000000000” val df = loadData(inputPath) // Function in a library that you don’t have access to def loadData(inputPath : String) : DataFrame = { spark.read .format(“delta”) .load(inputPath) } inputPath = “/path/to/my/table@20190101000000000” df = loadData(inputPath)# Function in a library that you don’t have access to def loadData(inputPath): return spark.read \ .format(“delta”) \ .load(inputPath) }使用版本號
在 Delta Lake 中,每次寫入都有一個版本號,您也可以使用該版本號來進行回溯。
Scala語法
val df = spark.read .format(“delta”) .option(“versionAsOf”, “5238”) .load(“/path/to/my/table”)val df = spark.read .format(“delta”) .load(“/path/to/my/table@v5238”)Python語法
df = spark.read \.format(“delta”) \ .option(“versionAsOf”, “5238”) \ .load(“/path/to/my/table”)df = spark.read \.format(“delta”) \ .load(“/path/to/my/table@v5238”)SQL語法
SELECT count(*) FROM my_table VERSION AS OF 5238審核數據變更
您可以使用 DESCRIBE HISTORY 命令或通過 UI 來查看表更改的歷史記錄。
重做實驗和報告
Time travel 在機器學習和數據科學中也起著重要作用。模型和實驗的可重復性是數據科學家的關鍵考慮因素,因為他們通常在投入生產之前會創建數百個模型,并且在那個耗時的過程中,有可能想回到之前早期的模型。 但是由于數據管理通常與數據科學工具是分開的,因此確實很難實現。
Databricks 將 Delta Lake 的 Time Travel 功能與 MLflow(機器學習生命周期的開源平臺)相集成來解決可重復實驗的問題。 為了重新進行機器學習培訓,您只需將帶有時間戳的 URL 路徑作為 MLflow 參數來跟蹤每個訓練作業的數據版本。
這使您可以返回到較早的設置和數據集以重現較早的模型。 您無需與上游團隊就數據進行協調,也不必擔心為不同的實驗克隆數據。 這就是統一分析的力量,數據科學與數據工程緊密結合在一起。
回滾
Time travel 可以在產生臟數據的情況下方便回滾。 例如,如果您的 GDPR 管道作業有一個意外刪除用戶信息的 bug,您可以用下面方法輕松修復管道:
INSERT INTO my_table SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1) WHERE userId = 111 You can also fix incorrect updates as follows: MERGE INTO my_table target USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source ON source.userId = target.userId WHEN MATCHED THEN UPDATE SET *如果您只想回滾到表的之前版本,則可以使用以下任一命令來完成:
RESTORE TABLE my_table VERSION AS OF [version_number] RESTORE TABLE my_table TIMESTAMP AS OF [timestamp]固定視圖的不斷更新跨多個下游作業的 Delta Lake 表
通過 AS OF 查詢,您現在可以為多個下游作業固定不斷更新的 Delta Lake 表的快照。考慮一種情況,其中 Delta Lake 表正在不斷更新,例如每15秒更新一次,并且有一個下游作業會定期從此 Delta Lake 表中讀取數據并更新不同的目標表。 在這種情況下,通常需要一個源 Delta Lake 表的一致視圖,以便所有目標表都反映相同的狀態。
現在,您可以按照下面的方式輕松處理這種情況:
version = spark.sql(“SELECT max(version) FROM (DESCRIBE HISTORY my_table)”).collect()# Will use the latest version of the table for all operations belowdata = spark.table(“my_table@v%s” % version[0][0]data.where(“event_type = e1”).write.jdbc(“table1”)data.where(“event_type = e2”).write.jdbc(“table2”) ...data.where(“event_type = e10”).write.jdbc(“table10”)時間序列分析查詢變得簡單
Time travel 還簡化了時間序列分析。例如,如果您想了解上周添加了多少新客戶,則查詢可能是一個非常簡單的方式,如下所示:
SELECT count(distinct userId) - ( SELECT count(distinct userId) FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7)) FROM my_tableChapter-04 輕松克隆您的 Delta Lake 以方便測試,數據共享以及重復進行機器學習
Delta Lake 有一個表克隆的功能,可以輕松進行測試,共享和重新創建表以實現 ML 的多次訓練。在數據湖或數據倉庫中創建表的副本有幾種實際用途。但是考慮到數據湖中表的數據量及其增長速度,進行表的物理副本是一項昂貴的操作。
借助表克隆,Delta Lake 現在使該過程更簡單且更省成本。
什么是克隆?
克隆是源表在給定時間點的副本。它們具有與源表相同的元數據:相同表結構,約束,列描述,統計信息和分區。但是它們是一個單獨的表,具有單獨的體系或歷史記錄。對克隆所做的任何更改只會影響克隆表,而不會影響源表。由于快照隔離,在克隆過程中或之后發生的源表更改也不會反映到克隆表中。在 Delta Lake 中,我們有兩種克隆方式:淺克隆或深克隆。
淺克隆
淺克隆(也稱為零拷貝)僅復制要克隆的表的元數據;表本身的數據文件不會被復制。這種類型的克隆不會創建數據的另一物理副本,從而將存儲成本降至最低。淺克隆很便宜,而且創建起來非常快。
這些克隆表自己不作為數據源,而是依賴于它們的源文件作為數據源。如果刪除了克隆表所依賴的源文件,例如使用 VACUUM,則淺克隆可能會變得不可用。因此,淺克隆通常用于短期使用案例,例如測試和實驗。
深克隆
淺克隆非常適合短暫的用例,但某些情況下需要表數據的獨立副本。深克隆會復制源表的元數據和數據文件全部信息。從這個意義上講,它的功能類似于使用 CTAS 命令(CREATE TABLE .. AS ... SELECT ...)進行復制。但是由于它可以按指定版本復制原始表,因此復制起來更簡單,同時您無需像使用 CTAS 一樣重新指定分區,約束和其他信息。此外它更快,更健壯,也可以針對故障使用增量方式進行工作。
使用深克隆,我們將復制額外的元數據,例如 streaming 應用程序事務和 COPY INTO 事務。因此您可以在深克隆之后繼續運行 ETL 應用程序。
克隆的適用場景?
有時候我希望有一個克隆人來幫助我做家務或魔術。但是我們這里不是在談論人類克隆。在許多情況下,您需要數據集的副本-用于探索,共享或測試 ML 模型或分析查詢。以下是一些客戶用例的示例。
用生產表進行測試和試驗
當用戶需要測試其數據管道的新版本時,他們通常依賴一些測試數據集,這些測試數據跟其生產環境中的數據還是有很大不同。數據團隊可能也想嘗試各種索引技術,以提高針對海量表的查詢性能。這些實驗和測試想在生產環境進行,就得冒影響線上數據和用戶的風險。
為測試或開發環境拷貝線上數據表可能需要花費數小時甚至數天的時間。此外,開發環境保存所有重復的數據會產生額外的存儲成本-設置反映生產數據的測試環境會產生很大的開銷。 對于淺克隆,這是微不足道的:
-- SQL CREATE TABLE delta.`/some/test/location` SHALLOW CLONE prod.events# Python DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=True)// Scala DeltaTable.forName(“spark”, “prod.events”).clone(“/some/test/location”, isShallow=true)在幾秒鐘內創建完表的淺克隆之后,您可以開始運行管道的副本以測試新代碼,或者嘗試在不同維度上優化表,可以看到查詢性能提高了很多很多。 這些更改只會影響您的淺克隆,而不會影響原始表。
暫存對生產表的重大更改
有時,您可能需要對生產表進行一些重大更改。 這些更改可能包含許多步驟,并且您不希望其他用戶看到您所做的更改,直到您完成所有工作。 淺克隆可以在這里為您提供幫助:
-- SQL CREATE TABLE temp.staged_changes SHALLOW CLONE prod.events; DELETE FROM temp.staged_changes WHERE event_id is null; UPDATE temp.staged_changes SET change_date = current_date() WHERE change_date is null; ... -- Perform your verifications對結果滿意后,您有兩種選擇。 如果未對源表進行任何更改,則可以用克隆替換源表。如果對源表進行了更改,則可以將更改合并到源表中。
-- If no changes have been made to the source REPLACE TABLE prod.events CLONE temp.staged_changes; -- If the source table has changed MERGE INTO prod.events USING temp.staged_changes ON events.event_id <=> staged_changes.event_id WHEN MATCHED THEN UPDATE SET *; -- Drop the staged table DROP TABLE temp.staged_changes;機器學習結果的可重復性
訓練出有效的 ML 模型是一個反復的過程。在調整模型不同部分的過程中,數據科學家需要根據固定的數據集來評估模型的準確性。
這是很難做到的,特別是在數據不斷被加載或更新的系統中。 在訓練和測試模型時需要一個數據快照。 此快照支持了 ML 模型的重復訓練和模型治理。
我們建議利用 Time Travel 在一個快照上運行多個實驗;在 Machine Learning Data Lineage With MLflow and Delta Lake 中可以看到一個實際的例子。
當您對結果感到滿意并希望將數據存檔以供以后檢索時(例如,下一個黑色星期五),可以使用深克隆來簡化歸檔過程。 MLflow 與 Delta Lake 的集成非常好,并且自動記錄功能(mlflow.spark.autolog()方法)將告訴您使用哪個數據表版本進行了一組實驗。
# Run your ML workloads using Python and then DeltaTable.forName(spark, “feature_store”).cloneAtVersion(128, “feature_ store_bf2020”)數據遷移
出于性能或管理方面的原因,可能需要將大量表移至新的專用存儲系統。原始表將不再接收新的更新,并且將在以后的某個時間點停用和刪除。深度克隆使海量表的復制更加健壯和可擴展。
-- SQL CREATE TABLE delta.`zz://my-new-bucket/events` CLONE prod.events; ALTER TABLE prod.events SET LOCATION ‘zz://my-new-bucket/events’;由于借助深克隆,我們復制了流應用程序事務和 COPY INTO 事務,因此您可以從遷移后停止的確切位置繼續ETL應用程序!
資料共享
在一個組織中,來自不同部門的用戶通常都在尋找可用于豐富其分析或模型的數據集。您可能希望與組織中的其他用戶共享數據。 但不是建立復雜的管道將數據移動到另一個里,而是創建相關數據集的副本通常更加容易和經濟。這些副本以供用戶瀏覽和測試數據來確認其是否適合他們的需求而不影響您自己生產系統的數據。在這里深克隆再次起到關鍵作用。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE data_science.events CLONE prod.events;數據存檔
出于監管或存檔的目的,表中的所有數據需要保留一定的年限,而活動表則將數據保留幾個月。如果您希望盡快更新數據,但又要求將數據保存幾年,那么將這些數據存儲在一個表中并進行 time travel 可能會變得非常昂貴。
在這種情況下,每天,每周,每月歸檔數據是一個更好的解決方案。深克隆的增量克隆功能將在這里為您提供真正的幫助。
-- The following code can be scheduled to run at your convenience CREATE OR REPLACE TABLE archive.events CLONE prod.events;請注意,與源表相比此表將具有獨立的歷史記錄,因此根據您的存檔頻率,源表和克隆表上的 time travel 查詢可能會返回不同的結果。
看起來真棒!有問題嗎?
這里只是重申上述一些陷阱,請注意以下幾點:
- 克隆是在你的快照上進行的。對克隆開始后的源表變化不會反映在克隆中。
- 淺克隆不像深克隆那樣是自包含的表。如果在源表中刪除了數據(例如通過 VACUUM),那么您的淺克隆可能無法使用。
- 克隆與源表具有獨立的歷史記錄。在源表和克隆表上的 time travel 查詢可能不會返回相同的結果。
- 淺克隆不復制流事務或將副本復制到元數據。使用深層克隆來遷移表,可以從上次暫停的地方繼續進行 ETL 處理。
我該如何使用?
淺克隆和深克隆支持數據團隊在測試和管理其新型云數據湖和倉庫如何開展新功能。表克隆可以幫助您的團隊對其管道實施生產級別的測試,微調索引以實現最佳查詢性能,創建表副本以進行共享-所有這些都以最小的開銷和費用實現。如果您的組織需要這樣做,我們希望您能嘗試克隆表并提供反饋意見-我們期待聽到您將來的新用例和擴展。
Chapter-05 在 Apache Spark 3.0 上的 Delta Lake 中啟用 Spark SQL DDL 和 DML 功能
Delta Lake 0.7.0 的發布與 Apache Spark 3.0 的發布相吻合,從而啟用了一組新功能,這些功能使用了 Delta Lake 的 SQL 功能進行了簡化。以下是一些關鍵功能。
在 Hive Metastore 中定義表支持 SQL DDL 命令
現在,您可以在 Hive Metastore 中定義 Delta 表,并在創建(或替換)表時在所有 SQL 操作中使用表名。
創建或替換表
-- Create table in the metastore CREATE TABLE events (date DATE, eventId STRING, eventType STRING, data STRING) USING DELTA PARTITIONED BY (date) LOCATION ‘/delta/events’ -- If a table with the same name already exists, the table is replaced with the new configuration, else it is created CREATE OR REPLACE TABLE events (date DATE, eventId STRING,eventType STRING,data STRING) USING DELTA PARTITIONED BY (date) LOCATION ‘/delta/events’顯式更改表架構
-- Alter table and schema ALTER TABLE table_name ADD COLUMNS (col_name data_type[COMMENT col_comment] [FIRST|AFTER colA_name], ...)您還可以使用 Scala / Java / Python API:
- DataFrame.saveAsTable(tableName) 和 DataFrameWriterV2 APIs。
- DeltaTable.forName(tableName) 這個 API 用于創建 io.delta.tables.DeltaTable 實例,對于在 Scala/Java/Python 中執行 Update/Delete/Merge 操作是非常有用。
支持 SQL 插入,刪除,更新和合并
通過 Delta Lake Tech Talks,最常見的問題之一是何時可以在 Spark SQL 中使用 DML 操作(如刪除,更新和合并)?不用再等了,這些操作現在已經可以在 SQL 中使用了! 以下是有關如何編寫刪除,更新和合并(使用 Spark SQL 進行插入,更新,刪除和重復數據刪除操作)的示例。
-- Using append mode, you can atomically add new data to an existing Delta table INSERT INTO events SELECT * FROM newEvents -- To atomically replace all of the data in a table, you can use overwrite mode INSERT OVERWRITE events SELECT * FROM newEvents-- Delete events DELETE FROM events WHERE date < ‘2017-01-01’-- Update events UPDATE events SET eventType = ‘click’ WHERE eventType = ‘click’-- Upsert data to a target Delta -- table using merge MERGE INTO events USING updatesON events.eventId = updates.eventId WHEN MATCHED THEN UPDATESET events.data = updates.data WHEN NOT MATCHED THEN INSERT (date, eventId, data)VALUES (date, eventId, data)值得注意的是,Delta Lake 中的合并操作比標準 ANSI SQL 語法支持更高級的語法。例如,合并支持
- 刪除操作-刪除與源數據行匹配的目標。 例如,“...配對后刪除...”
- 帶有子句條件的多個匹配操作-當目標和數據行匹配時具有更大的靈活性。 例如:
- 星形語法-用于使用名稱相似的源列來設置目標列值的簡寫。 例如:
自動和增量式 Presto/Athena 清單生成
正如 Query Delta Lake Tables From Presto and Athena, Improved Operations Concurrency,andMergePerformance 文章中所述,Delta Lake 支持其他處理引擎通過 manifest 文件來讀取 Delta Lake。manifest 文件包含清單生成時的最新版本。如上一章所述,您將需要:
- 生成 Delta Lake 清單文件
- 配置 Presto 或 Athena 讀取生成的清單
- 手動重新生成(更新)清單文件
Delta Lake 0.7.0的新增功能是使用以下命令自動更新清單文件:
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true )通過表屬性文件來配置表
通過使用 ALTER TABLE SET TBLPROPERTIES,您可以在表上設置表屬性,可以啟用,禁用或配置 Delta Lake 的許多功能,就像自動清單生成那樣。例如使用表屬性,您可以使用 delta.appendOnly=true 阻止 Delta 表中數據的刪除和更新。
您還可以通過以下屬性輕松控制 Delta Lake 表保留的歷史記錄:
- delta.logRetentionDuration:控制表的歷史記錄(即事務日志歷史記錄)保留的時間。默認情況下會保留30天的歷史記錄,但是您可能需要根據自己的要求(例如GDPR歷史記錄上下文)更改此值。 ?
- delta.deletedFileRetentionDuration:控制文件成為 VACUUM 的候選時必須在多久被刪除。默認情況下會刪除7天以上的數據文件。
從 Delta Lake 0.7.0 開始,您可以使用 ALTER TABLE SET TBLPROPERTIES 來配置這些屬性。
ALTER TABLE delta.`pathToDeltaTable` SET TBLPROPERTIES(delta.logRetentionDuration = “interval “delta.deletedFileRetentionDuration = “interval “ )在 Delta Lake 表中提交支持添加用戶定義的元數據
您可以指定自定義的字符串來作為元數據,通過 Delta Lake 表操作進行的提交,也可以使用DataFrameWriter選項userMetadata,或者 SparkSession 的配置spark.databricks.delta.commitInfo。 userMetadata。
在以下示例中,我們將根據每個用戶請求從數據湖中刪除一個用戶(1xsdf1)。為確保我們將用戶的請求與刪除相關聯,我們還將 DELETE 請求 ID 添加到了 userMetadata中。
SET spark.databricks.delta.commitInfo.userMetadata={ “GDPR”:”DELETE Request 1x891jb23” }; DELETE FROM user_table WHERE user_id = ‘1xsdf1’當查看用戶表(user_table)的歷史記錄操作時,可以輕松地在事務日志中標識關聯的刪除請求。
其他亮點
Delta Lake 0.7.0 版本的其他亮點包括:
- 支持 Azure Data Lake Storage Gen2-Spark 3.0 已經支持 Hadoop 3.2 庫,也被 Azure Data Lake Storage Gen2 支持。
- 改進了對流式一次觸發的支持-使用 Spark 3.0,我們確保一次觸發(Trigger.Once)在單個微批處理中處理 Delta Lake 表中的所有未完成數據,即使使用 DataStreamReader 選項 maxFilesPerTriggers 速度受限。
在 AMA 期間,關于結構化流和使用 trigger.once 的問題又很多。
有關更多信息,一些解釋此概念的有用資源包括:
- 每天運行一次流作業,可節省10倍的成本
- 超越 Lambda:引入Delta架構:特別是成本與延遲的對比
后續
您已經了解了 Delta Lake 及其特性,以及如何進行性能優化,本系列還包括其他內容:
- Delta Lake 技術系列-基礎和性能
- Delta Lake 技術系列-Lakehouse
- Delta Lake 技術系列-Streaming
- Delta Lake 技術系列-客戶用例(Use Case)
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的【详谈 Delta Lake 】系列技术专题 之 特性(Features)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 唯品会:在 Flink 容器化与平台化上
- 下一篇: Vite + React 组件开发实践