EMR StarRocks 极速数据湖分析原理解析
簡介:數據湖概念日益火熱,本文由阿里云開源大數據 OLAP 團隊和 StarRocks 數據湖分析團隊共同為大家介紹“ StarRocks 極速數據湖分析 ”背后的原理。 【首月99元】EMR StarRocks 數據湖極速分析體驗,試用火熱進行中,快來申請吧 -> https://survey.aliyun.com/apps/zhiliao/Yns9d9Xxz
StarRocks 是一個強大的數據分析系統,主要宗旨是為用戶提供極速、統一并且易用的數據分析能力,以幫助用戶通過更小的使用成本來更快的洞察數據的價值。通過精簡的架構、高效的向量化引擎以及全新設計的基于成本的優化器(CBO),StarRocks 的分析性能(尤其是多表 JOIN 查詢)得以遠超同類產品。
為了能夠滿足更多用戶對于極速分析數據的需求,同時讓 StarRocks 強大的分析能力應用在更加廣泛的數據集上,阿里云開源大數據 OLAP 團隊聯合社區一起增強 StarRocks的數據湖分析能力。使其不僅能夠分析存儲在 StarRocks 本地的數據,還能夠以同樣出色的表現分析存儲在 Apache Hive、Apache Iceberg 和 Apache Hudi 等開源數據湖或數據倉庫的數據。
本文將重點介紹 StarRocks 極速數據湖分析能力背后的技術內幕,性能表現以及未來的規劃。
一、整體架構
在數據湖分析的場景中,StarRocks 主要負責數據的計算分析,而數據湖則主要負責數據的存儲、組織和維護。上圖描繪了由 StarRocks 和數據湖所構成的完成的技術棧。
StarRocks 的架構非常簡潔,整個系統的核心只有 FE(Frontend)、BE(Backend)兩類進程,不依賴任何外部組件,方便部署與維護。其中 FE 主要負責解析查詢語句(SQL),優化查詢以及查詢的調度,而 BE 則主要負責從數據湖中讀取數據,并完成一系列的 Filter 和 Aggregate 等操作。
數據湖本身是一類技術概念的集合,常見的數據湖通常包含 Table Format、File Format 和 Storage 三大模塊。其中 Table Format 是數據湖的“UI”,其主要作用是組織結構化、半結構化,甚至是非結構化的數據,使其得以存儲在像 HDFS 這樣的分布式文件系統或者像 OSS 和 S3 這樣的對象存儲中,并且對外暴露表結構的相關語義。Table Format 包含兩大流派,一種是將元數據組織成一系列文件,并同實際數據一同存儲在分布式文件系統或對象存儲中,例如 Apache Iceberg、Apache Hudi 和 Delta Lake 都屬于這種方式;還有一種是使用定制的 metadata service 來單獨存放元數據,例如 StarRocks 本地表,Snowflake 和 Apache Hive 都是這種方式。
File Format 的主要作用是給數據單元提供一種便于高效檢索和高效壓縮的表達方式,目前常見的開源文件格式有列式的 Apache Parquet 和 Apache ORC,行式的 Apache Avro 等。
Storage 是數據湖存儲數據的模塊,目前數據湖最常使用的 Storage 主要是分布式文件系統 HDFS,對象存儲 OSS 和 S3 等。
FE
FE 的主要作用將 SQL 語句轉換成 BE 能夠認識的 Fragment,如果把 BE 集群當成一個分布式的線程池的話,那么 Fragment 就是線程池中的 Task。從 SQL 文本到分布式物理執行計劃,FE 的主要工作需要經過以下幾個步驟:
- SQL Parse: 將 SQL 文本轉換成一個 AST(抽象語法樹)
- SQL Analyze:基于 AST 進行語法和語義分析
- SQL Logical Plan: 將 AST 轉換成邏輯計劃
- SQL Optimize:基于關系代數,統計信息,Cost 模型對 邏輯計劃進行重寫,轉換,選擇出 Cost “最低” 的物理執行計劃
- 生成 Plan Fragment:將 Optimizer 選擇的物理執行計劃轉換為 BE 可以直接執行的 Plan Fragment。
- 執行計劃的調度
BE
Backend 是 StarRocks 的后端節點,負責數據存儲以及 SQL 計算執行等工作。
StarRocks 的 BE 節點都是完全對等的,FE 按照一定策略將數據分配到對應的 BE 節點。在數據導入時,數據會直接寫入到 BE 節點,不會通過FE中轉,BE 負責將導入數據寫成對應的格式以及生成相關索引。在執行 SQL 計算時,一條 SQL 語句首先會按照具體的語義規劃成邏輯執行單元,然后再按照數據的分布情況拆分成具體的物理執行單元。物理執行單元會在數據存儲的節點上進行執行,這樣可以避免數據的傳輸與拷貝,從而能夠得到極致的查詢性能。
二、技術細節
StarRocks 為什么這么快
CBO 優化器
一般 SQL 越復雜,Join 的表越多,數據量越大,查詢優化器的意義就越大,因為不同執行方式的性能差別可能有成百上千倍。StarRocks 優化器主要基于 Cascades 和 ORCA 論文實現,并結合 StarRocks 執行器和調度器進行了深度定制,優化和創新。完整支持了 TPC-DS 99 條 SQL,實現了公共表達式復用,相關子查詢重寫,Lateral Join, CTE ?復用,Join Rorder,Join 分布式執行策略選擇,Runtime Filter 下推,低基數字典優化 等重要功能和優化。
CBO 優化器好壞的關鍵之一是 Cost 估計是否準確,而 Cost 估計是否準確的關鍵點之一是統計信息是否收集及時,準確。 StarRocks 目前支持表級別和列級別的統計信息,支持自動收集和手動收集兩種方式,無論自動還是手動,都支持全量和抽樣收集兩種方式。
MPP 執行
MPP (massively parallel processing) 是大規模并行計算的簡稱,核心做法是將查詢 Plan 拆分成很多可以在單個節點上執行的計算實例,然后多個節點并行執行。 每個節點不共享 CPU,內存, 磁盤資源。MPP 數據庫的查詢性能可以隨著集群的水平擴展而不斷提升。
如上圖所示,StarRocks 會將一個查詢在邏輯上切分為多個 Query Fragment(查詢片段),每個 Query Fragment 可以有一個或者多個 Fragment 執行實例,每個Fragment 執行實例 會被調度到集群某個 BE 上執行。 如上圖所示,一個 Fragment 可以包括 一個 或者多個 Operator(執行算子),圖中的 Fragment 包括了 Scan, Filter, Aggregate。如上圖所示,每個 Fragment 可以有不同的并行度。
如上圖所示,多個 Fragment 之間會以 Pipeline 的方式在內存中并行執行,而不是像批處理引擎那樣 Stage By Stage 執行。
如上圖所示,Shuffle (數據重分布)操作是 MPP 數據庫查詢性能可以隨著集群的水平擴展而不斷提升的關鍵,也是實現高基數聚合和大表 Join 的關鍵。
向量化執行引擎
隨著數據庫執行的瓶頸逐漸從 IO 轉移到 CPU,為了充分發揮 CPU 的執行性能,StarRocks 基于向量化技術重新實現了整個執行引擎。 算子和表達式向量化執行的核心是批量按列執行,批量執行,相比與單行執行,可以有更少的虛函數調用,更少的分支判斷;按列執行,相比于按行執行,對 CPU Cache 更友好,更易于 SIMD 優化。
向量化執行不僅僅是數據庫所有算子的向量化和表達式的向量化,而是一項巨大和復雜的性能優化工程,包括數據在磁盤,內存,網絡中的按列組織,數據結構和算法的重新設計,內存管理的重新設計,SIMD 指令優化,CPU Cache 優化,C++優化等。向量化執行相比之前的按行執行,整體性能提升了5到10倍。
StarRocks 如何優化數據湖分析
大數據分析領域,數據除了存儲在數倉之外,也會存儲在數據湖當中,傳統的數據湖實現方案包括 Hive/HDFS。近幾年比較火熱的是 LakeHouse 概念,常見的實現方案包括 Iceberg/Hudi/Delta。那么 StarRocks 能否幫助用戶更好地挖掘數據湖中的數據價值呢?答案是肯定的。
在前面的內容中我們介紹了 StarRocks 如何實現極速分析,如果將這些能力用于數據湖肯定會帶來更好地數據湖分析體驗。在這部分內容中,我們會介紹 StarRocks 是如何實現極速數據湖分析的。
我們先看一下全局的架構,StarRocks 和數據湖分析相關的主要幾個模塊如下圖所示。其中 Data Management 由數據湖提供,Data Storage 由對象存儲 OSS/S3,或者是分布式文件系統 HDFS 提供。
目前,StarRocks 已經支持的數據湖分析能力可以歸納為下面幾個部分:
- 支持 Iceberg v1 表查詢 https://github.com/StarRocks/starrocks/issues/1030
- 支持 Hive 外表查詢 外部表 @ External_table @ StarRocks Docs (dorisdb.com)
- 支持 Hudi COW 表查詢 https://github.com/StarRocks/starrocks/issues/2772
接下來我們從查詢優化和查詢執行這幾個方面來看一下,StarRocks 是如何實現將極速分析的能力賦予數據湖的。
查詢優化
查詢優化這部分主要是利用前面介紹的 CBO 優化器來實現,數據湖模塊需要給優化器統計信息。基于這些統計信息,優化器會利用一系列策略來實現查詢執行計劃的最優化。接下來我們通過例子看一下幾個常見的策略。
統計信息
我們看下面這個例子,生成的執行計劃中,HdfsScanNode 包含了 cardunality、avgRowSize 等統計信息的展示。
MySQL [hive_test]> explain select l_quantity from lineitem; +-----------------------------+ | Explain String | +-----------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5: l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 1:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 01 | | UNPARTITIONED | | | | 0:HdfsScanNode | | TABLE: lineitem | | partitions=1/1 | | cardinality=126059930 | | avgRowSize=8.0 | | numNodes=0 | +-----------------------------+在正式進入到 CBO 優化器之前,這些統計信息都會計算好。比如針對 Hive 我們有 MetaData Cache 來緩存這些信息,針對 Iceberg 我們通過 Iceberg 的 manifest 信息來計算這些統計信息。獲取到這些統計信息之后,對于后續的優化策略的效果有很大地提升。
分區裁剪
分區裁剪是只有當目標表為分區表時,才可以進行的一種優化方式。分區裁剪通過分析查詢語句中的過濾條件,只選擇可能滿足條件的分區,不掃描匹配不上的分區,進而顯著地減少計算的數據量。比如下面的例子,我們創建了一個以 ss_sold_date_sk 為分區列的外表。
create external table store_sales( ss_sold_time_sk bigint , ss_item_sk bigint , ss_customer_sk bigint , ss_coupon_amt decimal(7,2) , ss_net_paid decimal(7,2) , ss_net_paid_inc_tax decimal(7,2) , ss_net_profit decimal(7,2) , ss_sold_date_sk bigint ) ENGINE=HIVE PROPERTIES ( "resource" = "hive_tpcds", "database" = "tpcds", "table" = "store_sales" );在執行如下查詢的時候,分區2451911和2451941之間的數據才會被讀取,其他分區的數據會被過濾掉,這可以節約很大一部分的網絡 IO 的消耗。
select ss_sold_time_sk from store_sales where ss_sold_date_sk between 2451911 and 2451941 order ss_sold_time_sk;Join Reorder
多個表的 Join 的查詢效率和各個表參與 Join 的順序有很大關系。如 select * from T0, T1, T2 where T0.a=T1.a and T2.a=T1.a,這個 SQL 中可能的執行順序有下面兩種情況:
- T0 和 T1 先做 Join,然后再和 T2 做 Join
- T1 和 T2 先做 Join,然后再和 T0 做 Join
根據 T0 和 T2 的數據量及數據分布,這兩種執行順序會有不同的性能表現。針對這個情況,StarRocks 在優化器中實現了基于 DP 和貪心的 Join Reorder 機制。目前針對 Hive的數據分析,已經支持了 Join Reorder,其他的數據源的支持也正在開發中。下面是一個例子:
MySQL [hive_test]> explain select * from T0, T1, T2 where T2.str=T0.str and T1.str=T0.str; +----------------------------------------------+ | Explain String | +----------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:1: str | 2: str | 3: str | | PARTITION: UNPARTITIONED | | RESULT SINK | | 8:EXCHANGE | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: HASH_PARTITIONED: 2: str | | STREAM DATA SINK | | EXCHANGE ID: 08 | | UNPARTITIONED | | 7:HASH JOIN | | | join op: INNER JOIN (BUCKET_SHUFFLE(S)) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 1: str = 3: str | | |----6:EXCHANGE | | 4:HASH JOIN | | | join op: INNER JOIN (PARTITIONED) | | | hash predicates: | | | colocate: false, reason: | | | equal join conjunct: 2: str = 1: str | | |----3:EXCHANGE | | 1:EXCHANGE | | PLAN FRAGMENT 2 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 06 | | HASH_PARTITIONED: 3: str | | 5:HdfsScanNode | | TABLE: T2 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | | PLAN FRAGMENT 3 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 03 | | HASH_PARTITIONED: 1: str | | 2:HdfsScanNode | | TABLE: T0 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | | PLAN FRAGMENT 4 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | STREAM DATA SINK | | EXCHANGE ID: 01 | | HASH_PARTITIONED: 2: str | | 0:HdfsScanNode | | TABLE: T1 | | partitions=1/1 | | cardinality=1 | | avgRowSize=16.0 | | numNodes=0 | +----------------------------------------------+謂詞下推
謂詞下推將查詢語句中的過濾表達式計算盡可能下推到距離數據源最近的地方,從而減少數據傳輸或計算的開銷。針對數據湖場景,我們實現了將 Min/Max 等過濾條件下推到 Parquet 中,在讀取 Parquet 文件的時候,能夠快速地過濾掉不用的 Row Group。
比如,對于下面的查詢,l_discount=1對應條件會下推到 Parquet 側。
MySQL [hive_test]> explain select l_quantity from lineitem where l_discount=1; +----------------------------------------------------+ | Explain String | +----------------------------------------------------+ | PLAN FRAGMENT 0 | | OUTPUT EXPRS:5: l_quantity | | PARTITION: UNPARTITIONED | | | | RESULT SINK | | | | 2:EXCHANGE | | | | PLAN FRAGMENT 1 | | OUTPUT EXPRS: | | PARTITION: RANDOM | | | | STREAM DATA SINK | | EXCHANGE ID: 02 | | UNPARTITIONED | | | | 1:Project | | | <slot 5> : 5: l_quantity | | | | | 0:HdfsScanNode | | TABLE: lineitem | | NON-PARTITION PREDICATES: 7: l_discount = 1.0 | | partitions=1/1 | | cardinality=63029965 | | avgRowSize=16.0 | | numNodes=0 | +----------------------------------------------------+其他策略
除了上面介紹的幾種策略,針對數據湖分析,我們還適配了如 Limit 下推、TopN 下推、子查詢優化等策略。能夠進一步地優化查詢性能。
查詢執行
前面介紹了,StarRocks 的執行引擎是全向量化、MPP 架構的,這些無疑都會給我們分析數據湖的數據帶來很大提升。接下來我們看一下 StarRocks 是如何調度和執行數據湖分析查詢的。
查詢調度
數據湖的數據一般都存儲在如 HDFS、OSS 上,考慮到混部和非混部的情況。我們對 Fragment 的調度,實現了一套負載均衡的算法。
- 做完分區裁剪之后,得到要查詢的所有 HDFS 文件 block
- 對每個 block 構造 THdfsScanRange,其中 hosts 包含 block 所有副本所在的 datanode 地址,最終得到 List
- Coordinator 維護一個所有 be 當前已經分配的 scan range 數目的 map,每個 datanode 上磁盤已分配的要讀取 block 的數目的 map>,及每個 be 平均分配的 scan range 數目 numScanRangePerBe
- 如果 block 副本所在的 datanode 有be(混部)
- 每個 scan range 優先分配給副本所在的 be 中 scan range 數目最少的 be。如果 be 已經分配的 scan range 數目大于 numScanRangePerBe,則從遠程 be 中選擇 scan range 數目最小的
- 如果有多個 be 上 scan range 數目一樣小,則考慮 be 上磁盤的情況,選擇副本所在磁盤上已分配的要讀取 block 數目小的 be
- 如果 block 副本所在的 datanode 機器沒有 be(單獨部署或者可以遠程讀)
- 選擇 scan range 數目最小的 be
查詢執行
在調度到 BE 端進行執行之后,整個執行過程都是向量化的。具體看下面 Iceberg 的例子,IcebergScanNode 對應的 BE 端目前是 HdfsScanNode 的向量化實現,其他算子也是類似,在 BE 端都是向量化的實現。
MySQL [external_db_snappy_yuzhou]> explain select c_customer_id customer_id -> ,c_first_name customer_first_name -> ,c_last_name customer_last_name -> ,c_preferred_cust_flag customer_preferred_cust_flag -> ,c_birth_country customer_birth_country -> ,c_login customer_login -> ,c_email_address customer_email_address -> ,d_year dyear -> ,'s' sale_type -> from customer, store_sales, date_dim -> where c_customer_sk = ss_customer_sk -> and ss_sold_date_sk = d_date_sk; +------------------------------------------------ | PLAN FRAGMENT 0 | OUTPUT EXPRS:2: c_customer_id | 9: c_first_name | 10: c_last_name | 11: c_preferred_cust_flag | 15: c_birth_country | 16: c_login | 17: c_email_address | 48: d_year | 70: expr | | PARTITION: UNPARTITIONED | RESULT SINK | 9:EXCHANGE | PLAN FRAGMENT 1 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 09 | UNPARTITIONED | 8:Project | | <slot 2> : 2: c_customer_id | | <slot 9> : 9: c_first_name | | <slot 10> : 10: c_last_name | | <slot 11> : 11: c_preferred_cust_flag | | <slot 15> : 15: c_birth_country | | <slot 16> : 16: c_login | | <slot 17> : 17: c_email_address | | <slot 48> : 48: d_year | | <slot 70> : 's' | 7:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 21: ss_customer_sk = 1: c_customer_sk | 4:Project | | <slot 21> : 21: ss_customer_sk | | <slot 48> : 48: d_year | 3:HASH JOIN | | join op: INNER JOIN (BROADCAST) | | hash predicates: | | colocate: false, reason: | | equal join conjunct: 41: ss_sold_date_sk = 42: d_date_sk | 0:IcebergScanNode | TABLE: store_sales | cardinality=28800991 | avgRowSize=1.4884362 | numNodes=0 | PLAN FRAGMENT 2 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 06 | UNPARTITIONED | 5:IcebergScanNode | TABLE: customer | cardinality=500000 | avgRowSize=36.93911 | numNodes=0 | PLAN FRAGMENT 3 | OUTPUT EXPRS: | PARTITION: RANDOM | STREAM DATA SINK | EXCHANGE ID: 02 | UNPARTITIONED | 1:IcebergScanNode | TABLE: date_dim | cardinality=73049 | avgRowSize=4.026941 | numNodes=0三、基準測試
TPC-H 是美國交易處理效能委員會TPC(Transaction Processing Performance Council)組織制定的用來模擬決策支持類應用的測試集。 It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.
TPC-H 根據真實的生產運行環境來建模,模擬了一套銷售系統的數據倉庫。該測試共包含8張表,數據量可設定從1 GB~3 TB不等。其基準測試共包含了22個查詢,主要評價指標為各個查詢的響應時間,即從提交查詢到結果返回所需時間。
測試結論
在 TPCH 100G規模的數據集上進行對比測試,共22個查詢,結果如下:
StarRocks 使用本地存儲查詢和 Hive 外表查詢兩種方式進行測試。其中,StarRocks On Hive 和 Trino On Hive 查詢的是同一份數據,數據采用 ORC 格式存儲,采用 zlib 格式壓縮。測試環境使用阿里云 EMR 進行構建。
最終,StarRocks 本地存儲查詢總耗時為21s,StarRocks Hive 外表查詢總耗時92s。Trino 查詢總耗時307s。可以看到 StarRocks On Hive 在查詢性能方面遠遠超過 Trino,但是對比本地存儲查詢還有不小的距離,主要的原因是訪問遠端存儲增加了網絡開銷,以及遠端存儲的延時和 IOPS 通常都不如本地存儲,后面的計劃是通過 Cache 等機制彌補問題,進一步縮短 StarRocks 本地表和 StarRocks On Hive 的差距。
具體測試過程請參考:StarRocks vs Trino TPCH 性能測試對比報告
四、未來規劃
得益于全面向量化執行引擎,CBO 優化器以及 MPP 執行框架等核心技術,目前 ?StarRocks 已經實現了遠超其他同類產品的極速數據湖分析能力。從長遠來看, StarRocks 在數據湖分析方向的愿景是為用戶提供極其簡單、易用和高速的數據湖分析能力。為了能夠實現這一目標,StarRocks 現在還有許多工作需要完成,其中包括:
- 集成 Pipeline 執行引擎,通過 Push Based 的流水線執行方式,進一步降低查詢響應速度
- 自動的冷熱數據分層存儲,用戶可以將頻繁更新的熱數據存儲在 StarRocks 本地表上,StarRocks 會定期自動將冷數據從本地表遷移到數據湖
- 去掉顯式建立外表的步驟,用戶只需要建立數據湖對應的 resource 即可實現數據湖庫表全自動同步
- 進一步完善 StarRocks 對于數據湖產品特性的支持,包括支持 Apache Hudi 的 MOR 表和 Apache Iceberg 的 v2 表;支持直接寫數據湖;支持 Time Travel 查詢,完善 Catalog 的支持度等
- 通過層級 Cache 來進一步提升數據湖分析的性能
五、更多信息
參考鏈接
[1] StarRocks - 開源大數據平臺E-MapReduce - 阿里云
[2] https://github.com/StarRocks/starrocks/issues/1030
[3] https://docs.dorisdb.com/zh-cn/main/using_starrocks/External_table#hive%E5%A4%96%E8%A1%A8
[4] https://github.com/StarRocks/starrocks/issues/2772
[5] StarRocks vs Trino TPCH 性能測試對比報告
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。?
總結
以上是生活随笔為你收集整理的EMR StarRocks 极速数据湖分析原理解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 揭秘阿里云 RTS SDK 是如何实现直
- 下一篇: 运行cudasift