基于MaxCompute SQL 的半结构化数据处理实践
簡介:?MaxCompute作為企業級數據倉庫服務,集中存儲和管理企業數據資產、面向數據應用處理和分析數據,將數據轉換為業務洞察。通過與阿里云內、外部服務靈活組合,可構建豐富的數據應用。全托管的數據與分析解決方案,可簡化平臺運維、管理投入,提升面向業務的服務能力,加速價值實現。
本文作者 孔亮 阿里云智能 產品專家
一、MaxCompute 基礎介紹
阿里云數據與分析產品解決方案
MaxCompute作為企業級數據倉庫服務,集中存儲和管理企業數據資產、面向數據應用處理和分析數據,將數據轉換為業務洞察。通過與阿里云內、外部服務靈活組合,可構建豐富的數據應用。全托管的數據與分析解決方案,可簡化平臺運維、管理投入,提升面向業務的服務能力,加速價值實現。
從下圖可以看出 MaxCompute 是處于一個核心位置,首先 MaxCompute 是一個數據倉庫,而且是一個基于 Serverless架構超大規模集群產品,具備安全管理能力等企業級能力。之前是偏離線數據處理平臺,當前已經具備BI分析能力的企業級數據倉庫。
一般企業的離線數據鏈路,從數據源包含關系型數據庫、非結構化存儲、大數據存儲、消息隊列等等,都可以通過數據集成離線的方式,批的方式,進入到數據倉庫中。前端的各種分析應用,也可以有一些實時的分析,通過 MaxCompute-Hologres做查詢加速,這是離線的場景。實時鏈路是從消息隊列數據源,通過Datahub數據總線,到實時計算Flink,對接到實時數倉Hologres,再對接到前臺。中間的數據是通過 Hologres 和 MaxCompute 還有 Flink,做流批一體。MaxCompute在數據倉庫的基礎上,擴展了數據庫的一些聯邦查詢能力,包括Data Lake、Mysql、Hbase等,數據的集成,包括元數據的同步和查詢去同步數據數據湖數據處理的能力。還包含基于數據倉庫數據之后,人工智能PAI機器學習的能力。構成了完整的大數據底座。在這之上,DataWorks提供了,一站式的開發治理平臺,可以做任務調度、元數據數據數據質量的血緣管理、數據開發等能力。
MaxCompute 大數據計算服務簡介
MaxCompute(大數據計算服務)是一款多功能、高性能、易于使用的數據倉庫服務。
MaxCompute內建完善的企業級安全及管理功能、支持開放數據生態,以統一平臺滿足多使用場景(數據倉庫/BI、數據湖分析、機器學習)需要,被廣泛用于數據化運營、畫像及推薦、智能預測等應用場景。
MaxCompute 底層有統一Iass層的存儲和計算調度,存儲是盤古,調度是伏羲,把存儲跟計算資源做了一層封裝,用資源池的方式對上層應用無感知的提供使用,上層應用只需要使用這個資源組,而不需要知道具體的任務運行在什么資源上面。MaxCompute 可以提供結構化數倉的存儲能力,也可以提供數據湖包括開放格式、半結構化、非結構化等數據處理的能力。對于用戶來說,所有使用場景都在項目里面,每個項目之間租戶隔離,可以有本項目的資源,也可以共享資源。項目直接通過安全共享的方式,可以同步數據。再上層用統一的訪問認證、管理、安全、監控、元數據等能力。
可以總結出以下幾點:
- 簡單易用的SQL 端到端開發方式,支持Spark,分布式Python(Mars)等開源技術棧,內置完善的企業管理功能和開放的接口,簡單易用開放
- 統一元數據、統一數據存儲,一份統一的企業數據資產,云原生的多租戶系統,最高效的跨業務&跨組織數據連接與共享
- 自適應的按需彈性資源,精準匹配業務規模的變化,避免資源浪費或不足,業務負載隔離,消除業務間資源爭搶
- 存儲與計算獨立伸縮,支持TB到EB級的存儲擴展;連接廣泛外部數據源,開展聯邦計算
- 深度優化,自動數倉,集成多年雙11優化能力,智能調優+專家服務支持
MaxCompute 功能介紹
MaxCompute 功能可以分為下圖幾類。最核心的存儲計算能力是不對外開發的。存儲主要用的是數據庫表,計算資源會在SQL任務或者其他計算模型上使用時體現出來。看SQL計算能力,可以端到端完成整個數倉的數據處理和數據模型管理等能力,包括一些基礎的數據類型,內部表做一些分區,外部表處理非機構化數據等。支持流式寫入,流式upsert插入數據,刪改數據等能力。查詢時可以用非常復雜的查詢方法,可以看解析計劃Explain。UDF側,MaxCompute支持Java UDF和Python UDF,還包括內容安全UDF。
管理能力,下圖深色部分是在專有云里會有獨立的增強包來對企業提供額外的計算能力。公共云上是直接Serverless方式提供給用戶,只收資源費用。 管理能力也包含計量計費的能力,包括預付費和按量計費。任務有任務管理,查詢有查詢加速,專有云部署時,大規模跨域計算等企業級能力。MaxCompute除了SQL引擎外,還有向量檢索,TensorFlow,Spark,Mars,Hologres,這些都可以基于MaxCompute的底層存儲計算資源,用不同的引擎,提供對應場景的能力。合規治理部分,灰色部分有一些是DataWorks提供,包括本身數倉安全管理能力,元數據管理能力,審計能力,數據加密,數據脫敏,數據質量等功能。再通過SDK/API和MaxCompute提供的配套工具來完成數據開發,數據上傳下載,還有一些三方應用,二方應用來完成整個數倉的生態構建。
MaxCompute 主要解決方案
企業數據倉庫/數據中臺
將原始數據整合為可被廣泛使用的知識,用于后續消費使用,包括:
?集成存儲:收集、存儲和集中管理企業內外數據;
?處理分析:清洗、加工、整合多方數據;面向業務需求統計、挖掘;統一的存儲和處理可以提供彈性伸縮的存儲計算能力,減少成本
?標準化:建立企業數據倉庫模型(分層/分主題),建立數據標準,形成可復用數據資產,并且通過數據治理,進行數據生命周期安全、成本治理等、持續保障數據質量和標準化
?數據互通:在企業內流轉共享標準數據,打通數據孤島,讓關聯的數據發揮更大的價值
數據中臺不僅是技術平臺,還包含組織和管理流程要求,強調以公共數據產品服務業務,實現”數據業務化”,可認為數據倉庫的一種最佳實踐。MaxCompute+DataWorks是開箱即用的數據倉庫解決方案。
BI分析/數據分析
BI分析并不必然要使用數據倉庫,如可直接基于交易數據庫分析
數據倉庫能夠幫助提供BI分析需要的企業視角的全面數據
通過數據倉庫的數據資產管理,BI分析人員可更好地檢索、理解數據
數據倉庫還能夠以強大的性能,滿足多用戶并發、分析不同數據規模需求
MaxCompute提供數據的集中管理、處理分析,可直接對接BI或者將數據同步到外部分析型數據庫(OLAP)進行BI分析
預測分析/智能應用
數據倉庫與AI集成日益緊密
數據倉庫為機器學習進行數據加工、數據準備
機器學習對數據進行模型訓練,數據預測,結果可直接固化在數據倉庫進行知識共享,如用戶畫像分析對客戶性別、偏好的預測
MaxCompute無縫集成PAI、SparkML,1個平臺無需數據移動即可在企業數據之上建設基于機器學習的智能應用,如CTR預估、個性化推薦
二、MaxCompute半結構化數據處理
什么是半結構化數據
本文的主題是MaxCompute半結構化處理能力,我們先看一下什么是半結構化數據。
結構化數據,即行數據,存儲在數據庫里,可以用二維表結構來邏輯表達實現的數據
非結構化數據,包括所有格式的辦公文檔、文本、圖片、XML、HTML、各類報表、圖像和音頻/視頻信息等等
半結構化數據,就是介于完全結構化數據(如關系型數據庫、面向對象數據庫中的數據)和完全無結構的數據(如聲音、圖像文件等)之間的數據,HTML文檔就屬于半結構化數據。它一般是自描述的,經常變化的,數據的結構和內容混在一起,一般由一個三元組表示,包括標記、類型和對象的值。
通過數據模型比較:
結構化數據:關系型(二維表)
半結構化數據:由一個由節點集合和弧段集合組成的具根有向圖結構。(樹、圖)
非結構化數據:無
最后從wiki帶的定義看,半結構化的特點是復雜類型結構,易變,需要從自描述結構中提取數據進行計算。
Semi-structured data[1] is a form of structured data that does not obey the tabular structure of data models associated with relational databases or other forms of data tables, but nonetheless contains tags or other markers to separate semantic elements and enforce hierarchies of records and fields within the data. Therefore, it is also known as self-describing structure.
In semi-structured data, the entities belonging to the same class may have different attributes even though they are grouped together, and the attributes' order is not important.
Semi-structured data are increasingly occurring since the advent of the Internet where full-text documents and databases are not the only forms of data anymore, and different applications need a medium for exchanging information.
半結構化數據應用廣泛,因為:
?簡潔、簡單、體積小等。
?上手容易,高效。
?跨語言,用于Web項目的前后端交互接口,配置文件,文件存儲等等。移動端應用的火爆,進一步帶動json等半結構化數據的使用(json示例見下圖,來源于網絡)。這些數據都可以作為數據源,存入數據倉庫做分析。
所以半結構化數據處理的能力是數據倉庫的一個典型應用場景。
半結構化數據處理
一般的場景,按照數倉的流程來看,從數據源->數據處理->數據存儲,這個階段主要是半結構化處理的主要環節,因為往上層看,可能數據已經加工完成,直接面向應用了,半結構化數據體現的就沒有那么明顯。
這里的半結構化數據處理,有兩種做法,一種是把數據同步到一個字段里面,每次應用時用一些復雜類型,或者是json函數直接提取,就是按需提取,但數據是放到一個字段里面。這種優點是不用考慮半結構化數據結構變化。缺點是性能不佳,每次選用適合的處理函數和方法,開發復雜。不管什么樣的數據,都是一個大string存進去,還非常大,比如MaxCompute,一般的情況支持8M,但為了處理這種情況,MaxCompute也可以開到最大256M。 另一種方法是導入時或者批處理時按照json結構拆成一張寬表,再隨著json結構修改而修改/重建表結構。這樣做的優點是存儲和計算都能得到優化。但缺點是表機構經常修改,修改不便。
MaxCompute 半結構化數據處理
MaxCompute提供了以下四方面能力處理半結構化數據
提供復雜類型支持存儲半結構化數據
首先提供了復雜數據類型 存儲對應的半結構化數據
Schema evolution(表結構演進) 對應半結構化數據結構定義的修改
然后提供了schema evolution的能力,可以修改表和嵌套列,包括:
- 刪除列
- 添加列
- 修改列順序
- 修改列名
- 修改列數據類型(兼容類型)
- 修改復雜類型嵌套結構(與修改表結構相同)
Semi-structured data processing function 用于處理半結構化數據各節點的值
- MaxCompute SQL 為提升復雜數據類型(ARRAY、MAP、STRUCT)數據的處理能力和效率,增加了大量內建函數,可以使用內建函數對輸入的復雜數據類型數據進行處理,或經過函數處理輸出復雜數據類型數據。
- 同時提供了高階函數增強復雜數據類型數據的處理能力,相較于普通函數的輸入參數只能是數據,高階函數的輸入參數本身可以是一個函數。因此高階函數可以處理輸入的復雜數據類型數據,并使用lambda表達式簡化處理邏輯語法表達。
直接使用半結構化數據節點value進行計算
CREATE TABLE evol_t2 (id int, name struct<given: string, family: string>,phones array<struct<type: string, num: string>>) ; insert into table evol_t2 select 1, STRUCT('Bill', 'Gates'), array(STRUCT('work', '1234567890'),STRUCT('cell', '9876543210')); insert into table evol_t2 select 2, STRUCT('Michael', 'Jordan'), array(STRUCT('work', '1111111111'),STRUCT('cell', '9999999999'));插入結果如下:
select name.given as firstname,c.phones[1].num as phonenum from evol_t2 c where c.phones[1].type = 'cell';查詢結果如下:
MaxCompute 表結構修改
靈活修改表結構,既可以支持半結構化數據源schema的變化,也方便數倉建模模型調整,方便對存量表增補、剔除字段,然后把相同的字段放在一起或修改類型。
語法定義和示例如下:
刪除列
ALTER TABLE <table_name> DROP COLUMN <column_name>; create table if not exists evol_t(id bigint,value1 bigint,value2 bigint); ALTER TABLE evol_t DROP COLUMN value2;添加列
ALTER TABLE <table_name> ADD COLUMNS (col_name1 type1[, col_name2 type2...]); create table if not exists evol_t(id bigint,value1 bigint,value2 bigint); ALTER TABLE evol_t ADD COLUMNS value3 STRING;說明:添加的新列不支持指定順序,默認在最后一列。
修改列順序
ALTER TABLE <table_name> CHANGE COLUMN <original_column_name> <new_column_name> <column_type> AFTER <column_name>; create table if not exists evol_t(id bigint,value1 bigint,value2 bigint); ALTER TABLE evol_t CHANGE COLUMN value2 value3 bigint AFTER id;說明:目前不支持BEFORE關鍵詞,可以通過AFTER實現,如有必要可以在后續功能中增加。
修改列名
ALTER TABLE <table_name> CHANGE COLUMN <original_column_name> RENAME TO <new_column_name>;MaxCompute 復雜類型數據結構修改
復雜類型數據的各層嵌套列的schema也支持靈活修改,嵌套列和表結構一樣都可以享受列存優化的性能和直接查詢的便捷
CREATE TABLE evol_t (id int, point struct<x: double, y: double>) ; ALTER TABLE evol_t ADD COLUMNS (points_map map<string, struct<x: double, y: double>>); ALTER TABLE evol_t ADD COLUMNS (points_arr array<struct<x: double, y: double>>);因為所有的嵌套列都當作一張嵌套的表處理和識別,那么嵌套列也可以獲得如下能力:
- 表的結構可以修改(增、刪、改名字、改順序、改類型)
- 更精細的列存儲和壓縮
- 針對數據類型的存儲和計算優化
- 直接用節點值進行計算
- 更豐富的函數進行半結構化數據處理
MaxCompute 復雜類型數據處理函數
豐富的復雜類型數據處理函數方便直接對半結構化數據進行處理,且更多更易用的函數在不斷推出中
| 函數類別 | 函數 | 功能 |
| ARRAY函數 | ALL_MATCH | 判斷ARRAY數組中是否所有元素都滿足指定條件。 |
| ANY_MATCH | 判斷ARRAY數組中是否存在滿足指定條件的元素。 | |
| ARRAY | 使用給定的值構造ARRAY。 | |
| ARRAY_CONTAINS | 檢測指定的ARRAY中是否包含指定的值。 | |
| ARRAY_DISTINCT | 去除ARRAY數組中的重復元素。 | |
| ARRAY_EXCEPT | 找出在ARRAY A中,但不在ARRAY B中的元素,并去掉重復的元素后,以ARRAY形式返回結果。 | |
| ARRAY_INTERSECT | 計算兩個ARRAY數組的交集。 | |
| ARRAY_JOIN | 將ARRAY數組中的元素按照指定字符串進行拼接。 | |
| ARRAY_MAX | 計算ARRAY數組中的最大值。 | |
| ARRAY_MIN | 計算ARRAY數組中的最小值。 | |
| ARRAY_POSITION | 計算指定元素在ARRAY數組中第一次出現的位置。 | |
| ARRAY_REDUCE | 將ARRAY數組的元素進行聚合。 | |
| ARRAY_REMOVE | 在ARRAY數組中刪除指定元素。 | |
| ARRAY_REPEAT | 返回將指定元素重復指定次數后的ARRAY數組。 | |
| ARRAY_SORT | 將ARRAY數組的元素進行排序。 | |
| ARRAY_UNION | 計算兩個ARRAY數組的并集并去掉重復元素。 | |
| ARRAYS_OVERLAP | 判斷兩個ARRAY數組中是否包含相同元素。 | |
| ARRAYS_ZIP | 合并多個ARRAY數組。 | |
| CONCAT | 將ARRAY數組或字符串連接在一起。 | |
| EXPLODE | 將一行數據轉為多行的UDTF。 | |
| FILTER | 將ARRAY數組中的元素進行過濾。 | |
| INDEX | 返回ARRAY數組指定位置的元素值。 | |
| POSEXPLODE | 將指定的ARRAY展開,每個Value一行,每行兩列分別對應數組從0開始的下標和數組元素。 | |
| SIZE | 返回指定ARRAY中的元素數目。 | |
| SLICE | 對ARRAY數據切片,返回從指定位置開始、指定長度的數組。 | |
| SORT_ARRAY | 為指定的數組中的元素排序。 | |
| TRANSFORM | 將ARRAY數組中的元素進行轉換。 | |
| ZIP_WITH | 將2個ARRAY數組按照位置進行元素級別的合并。 |
| 函數類別 | 函數 | 功能 |
| MAP函數 | EXPLODE | 將一行數據轉為多行的UDTF。 |
| INDEX | 返回MAP類型參數中滿足指定條件的Value。 | |
| MAP | 使用指定的Key-Value對建立MAP。 | |
| MAP_CONCAT | 返回多個MAP的并集。 | |
| MAP_ENTRIES | 將MAP中的Key、Value鍵值映射轉換為STRUCT結構數組。 | |
| MAP_FILTER | 將MAP中的元素進行過濾。 | |
| MAP_FROM_ARRAYS | 通過給定的ARRAY數組構造MAP。 | |
| MAP_FROM_ENTRIES | 通過給定的結構體數組構造MAP。 | |
| MAP_KEYS | 將參數MAP中的所有Key作為數組返回。 | |
| MAP_VALUES | 將參數MAP中的所有Value作為數組返回。 | |
| MAP_ZIP_WITH | 對輸入的兩個MAP進行合并得到一個新MAP。 | |
| SIZE | 返回指定MAP中的K/V對數。 | |
| TRANSFORM_KEYS | 對MAP進行變換,保持Value不變,根據指定函數計算新的Key。 | |
| TRANSFORM_VALUES | 對MAP進行變換,保持Key不變,根據指定函數計算新的Value。 | |
| STRUCT函數 | FIELD | 獲取STRUCT中的成員變量的取值。 |
| INLINE | 將指定的STRUCT數組展開。每個數組元素對應一行,每行每個STRUCT元素對應一列。 | |
| STRUCT | 使用給定Value列表建立STRUCT。 | |
| NAMED_STRUCT | 使用給定的Name、Value列表建立STRUCT。 | |
| JSON函數 | FROM_JSON | 根據給定的JSON字符串和輸出格式信息,返回ARRAY、MAP或STRUCT類型。 |
| GET_JSON_OBJECT | 在一個標準JSON字符串中,按照指定方式抽取指定的字符串。 | |
| JSON_TUPLE | 在一個標準的JSON字符串中,按照輸入的一組鍵抽取各個鍵指定的字符串。 | |
| TO_JSON | 將指定的復雜類型輸出為JSON字符串。 |
MaxCompute 高階函數支持lambda表達式
復雜類型數據處理函數中高階函數包括:
ANY_MATCH、ALL_MATCH、ARRAY_REDUCE、ARRAY_SORT、FILTER、TRANSFORM、ZIP_WITH、MAP_FILTER、MAP_ZIP_WITH、TRANSFORM_KEYS、TRANSFORM_VALUES函數
支持lambda表達式語法,簡化了對復雜數據類型數據處理的表達。
部分函數的說明和示例如下:
判斷ARRAY數組array(1, 2, -10, 100, -30)中是否有元素滿足x-> x > 3條件。命令示例如下: --返回true。 selectany_match(array(1, 2, -10, 100, -30), x-> x > 3); 將ARRAY數組a中的元素利用func進行過濾,返回一個新的ARRAY數組。 --返回[2, 3]。 selectfilter(array(1, 2, 3), x -> x > 1); 將ARRAY數組a和b的元素按照位置,使用combiner進行元素級別的合并,返回一個新的ARRAY數組。 --返回[2, 4, 6, NULL]。 selectzip_with(array(1,2,3), array(1,2,3,4), (x,y) -> x + y); 將MAP對象input的元素進行過濾,只保留滿足predicate條件的元素。 --返回{-30:100, 20:50}。 selectmap_filter(map(10, -20, 20, 50, -30, 100, 21, null), (k, v) -> (k+v) > 10); 對輸入MAP對象input進行變換,保持Key不變,通過func計算新的Value值。 --返回{-30:71, 10:-10, 20:NULL}。 selecttransform_values(map(10, -20, 20, null, -30, 101), (k, v) -> k + v);三、實操演示
MaxCompute半結構化數據處理和schema evolution
請點擊視頻查看演示demo
功能說明
列舉幾個常用的修改復雜類型的節點schema的命令示例:
-- 給 struct 新增一列 ALTER TABLE evol_t ADD COLUMNS (point.z double); -- map 的 value 是 struct,新增一列 ALTER TABLE evol_t ADD COLUMNS (points_map.value.z double); -- array 的元素是 struct,新增一列 ALTER TABLE evol_t ADD COLUMNS (points_arr.element.z double);--示例中的一些用法 --增列 ALTER TABLE evol_t2 ADD COLUMNS (phones.element.type2 string); --刪列 ALTER TABLE evol_t2 DROP COLUMNS (phones.element.type); --改名 ALTER TABLE evol_t2 CHANGE COLUMN phones.type2 phones.type0 string; --改順序 ALTER TABLE evol_t2 CHANGE phones.num phones.num string AFTER type0;Demo中完整功能演示腳本
DROP table evol_t2;CREATE TABLE evol_t2 (id int, name struct<given: string, family: string>,phones array<struct<type: string, num: string>>) ; insert into table evol_t2 select 1, STRUCT('Bill', 'Gates'), array(STRUCT('work', '1234567890'),STRUCT('cell', '9876543210')); insert into table evol_t2 select 2, STRUCT('Michael', 'Jordan'), array(STRUCT('work', '1111111111'),STRUCT('cell', '9999999999')); select * from evol_t2;ALTER TABLE evol_t2 ADD COLUMNS (position map<string, struct<x: double, y: double>>); insert into table evol_t2 select 3, STRUCT('Michael', 'Jackson'), array(STRUCT('work', '1231231231'),STRUCT('cell', '1231231233')),map('p1',struct(1.1,1.2),'p2',struct(1.5,1.3)); select * from evol_t2;ALTER TABLE evol_t2 ADD COLUMNS (position.value.z double); insert into table evol_t2 select 4, STRUCT('Ming', 'Yao'), array(STRUCT('work', '5555555555'),STRUCT('cell', '6666666666')),map('p1',struct(5.5,1.0,12.0),'p2',struct(6.5,3.0,8.1)); select * from evol_t2;ALTER TABLE evol_t2 DROP COLUMNS (phones.element.type); --刪列 select * from evol_t2;ALTER TABLE evol_t2 ADD COLUMNS (phones.element.type2 string); --增列 select * from evol_t2;ALTER TABLE evol_t2 CHANGE COLUMN phones.type2 phones.type0 string; --改名 select * from evol_t2;insert into table evol_t2 select 5, STRUCT('Lei', 'Li'), array(STRUCT('9999999999','work'),STRUCT('8888888888','cell')),map('p1',struct(9.5,6.0,10.0),'p2',struct(5.5,2.0,3.0)); select * from evol_t2;ALTER TABLE evol_t2 CHANGE phones.num phones.num string AFTER type0; --改順序 select * from evol_t2;select name.given as firstname,c.phones[1].num as phonenum from evol_t2 c where c.phones[1].type0 = 'cell';select c.name.family||c.name.Given,c.phones[1].num,SQRT(POW(position['p2'].x-position['p1'].x,2)+POW(position['p2'].y-position['p1'].y,2)+POW(position['p2'].z-position['p1'].z,2)) from evol_t2 c where name.given in ('Ming','Lei');目前支持的數據類型轉化關系
近期將會灰度發布修改類型的feature,具體支持的數據類型轉化關系如下:
四、演進方向
繼續增強的功能和演進方向
- 增加更多的復雜數據類型處理函數
- 更多的數據類型兼容轉換
- 自動識別復雜類型數據類型的schema并存儲優化
- 更靈活的節點值提取和計算
- 更高性能的列分析能力
- Timetravel對于schema evolution和數據修改的版本管理
- ……
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的基于MaxCompute SQL 的半结构化数据处理实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 代理网关设计与实现(基于NETTY)
- 下一篇: 透析阿里云视频云「低代码音视频工厂」之能