从Google Mesa到百度PALO(数仓)
最近在研究OLAP相關的技術,正好看到Google 2014年的論文《Mesa: Geo-Replicated, Near RealTime, Scalable Data Warehousing》,以及百度最近2017年開源的基于Mesa+Impala的實現系統PALO,本篇就嘗試結合起來看下二者,主要是學習介紹性質的文章。
?
1. Mesa
Mesa是一個Google內部使用的數據倉庫系統,從論文的標題可以抓住幾個關鍵詞:可實現跨DC復制的、近實時的、可擴展的。這幾點算是Mesa的特色所在,同時和Mesa要解決的問題背景有很大關系,Mesa主要解決Google在線廣告報表和分析業務,論文里提到的use cases包括reporting, internal auditing, analysis, billing和forecasting等方面。舉個例子,廣告主需要通過Google的AdWords業務系統查看報告:2017.1整月的預算消費情況,包括所有推廣計劃(Campaign)的展現量、點擊量、消費等指標,這就是一個典型的應用場景。
?
為了滿足Google內部的業務功能需求,需要設計一個data store,它的非功能需求要滿足:
?
1. Atomic Updates. 原子更新。
一個用戶的動作,比如一個點擊行為,會被影響成百上千的視圖的指標,比如影響推廣計劃、分網站、創意等等一系列具體報表,這個點擊行為要么全部生效,要么全不生效,不能存在中間狀態。
?
2. Consistency and Correctness. 一致性和正確性。
強一致性必須保證,可重復讀,即使是跨DC也需要保證讀出來的一致,這么高的要求和廣告系統的嚴謹性有直接關系。
?
3. Availability. 高可用。
不能存在單點(SPOF),不能停服(downtime)。
?
4. Near RealTime Update Throughput. 近實時的高吞吐更新。
系統要支持增量實時更新,吞吐要達到百萬行/秒。增量在分鐘級即可被查詢到的queryability,這么高的要求和廣告系統角度來說很必要,每秒鐘Google都會有百萬級別的廣告展現,而廣告主或者系統的其他模塊需要更短的時間看到報表,輔助決策。
?
5. Query Performance. 高性能查詢。
系統既要支持低延遲的用戶報表查詢請求,也要支持高吞吐的Ad-hoc即席分析查詢。低延遲要保證99分位平響在百毫秒。
?
6. Scalability. 高擴展。
隨著數據量和訪問量增量,系統的能力可線性(linear)的增長。
?
7. Online Data and Metadata Transformation. 在線的schema變更。
業務不斷變化,對于schema的變更,包括加表、刪表、加列、減列,新建索引,修改物化視圖等的都必須不能停服的在線完成,而且不能影響數據更新和查詢。
?
有了需求,那么就一句話總結下Google把Mesa看做一個什么系統。
Mesa is a distributed, replicated, and highly available data processing, storage, and query system for structured data. Mesa ingests data generated by upstream services, aggregates and persists the data internally, and serves the data via user queries.
翻譯下,Mesa是一個分布式、多副本的、高可用的數據處理、存儲和查詢系統,針對結構化數據。一般數據從上游服務產生(比如一個批次的spark streaming作業產生),在內部做數據的聚合和存儲,最終把數據serve到外面供用戶查詢。
?
對于Mesa的技術選型,論文里提到了Mesa充分利用了Google內部已有的building blocks,包括Colossus (對應Hadoop的HDFS)、BigTable(對應Hadoop的HBase)和MapReduce。Mesa的存儲是多副本的并且分區做sharding的,很好理解,分治策略幾乎是分布式系統的必備元素。批量更新,包括大批量,小批量(mini-batch)。使用MVCC機制,每個更新都有個version。為實現跨DC工作,還需要一個分布式一致性技術支持,例如Paxos。
?
論文里還對比了業界的其他方案,比如基于數據立方體cube的方案,很難做近實時更新(當年是了,現在kylin也支持了),Google內部的系統中BigTable不支持跨行事務,Megastore、Spanner和F1都是OLTP系統,不支持海量數據的高吞吐寫入。
?
下面進入正題,在海量數據規模下,實時性和吞吐率兩個指標,魚與熊掌不可兼得,Mesa基于廣告數據可聚合性的特質,從存儲,查詢等角度進行了大量針對性的設計,那么Mesa到底提出了什么創新的設計來應對它提出的需求呢?其實就兩方面,1)存儲設計,2)系統架構。其中我認識1)是這個論文最大的contribution。
?
1.1 存儲設計
1.1.1 數據模型(data model)
Mesa僅支持結構化數據,邏輯上存儲在一張表(table)里,表包括很多列,表都有一個schema,和傳統的數據庫類似,schema會定義各個列的類型,比如int32、int64、string等。
?
Mesa的列要能分成兩類,分別是維度列(dimensional attributes)和指標列(measure attributes),這實際可以看做是一種KV模型,Keys就是維度,Values就是指標。
?
同時指標列需要定義一個聚合函數aggregation function,例如SUM,MIN,MAX,COUNT等等,用于作用于Key相同的記錄,做聚合使用,聚合函數必須滿足結合律,可以選擇性滿足交換律。
?
Mesa中定義的索引Index其實只能是被動的符合Key的順序(因為物理上沒有多余的存儲索引,全靠數據有序存儲,后面存儲格式章節會細講)。
?
一個記錄或者用戶行為,叫做single fact會原子地、一致的影響多個物化視圖(materilized view),物化視圖一般利用維度列做上卷表(roll-up),這樣就可以做多維分析(MOLAP)的下鉆(drill down)和上卷(roll up)查詢了。
?
Google中Mesa存儲了上千張表,每張表最多幾百列。
?
下圖是論文中的例子,三張典型的表。
?
Table A的維度列包括Date, PublisherId, Country,指標列是Clicks, Cost,聚合函數是SUM。
Table B的維度列包括Date, AdvertiserId, Country,指標列是Clicks, Cost,聚合函數是SUM。
Table C是Table B的物化視圖,維度列是AdvertiserId, Country,指標列是Clicks Cost,聚合函數是SUM。
?
?
1.1.2 數據更新和查詢
為實現高吞吐的更新,Mesa必須按照批量的方式來實現,這些更新的小數據集合通常從upstream系統來,一般是分鐘級別產生一個,這個可以理解為Storm或者Spark Streaming產生的數據。所有的更新批次就是串行處理的。每個更新批次都會帶一個自增的版本號,其實這就是MVCC機制,這樣就可以做到無鎖的更新,對于查詢就需要指定一個版本號。同時,Mesa要求查詢除了包含版本號,還得有一個Predicate,也就是在Key space上做filter的謂詞條件。
?
論文中舉了一個例子,如下圖所示,在剛剛的數據模型中Table A和Table B是通過兩個更新的批次來的,經歷了兩次版本變化而來,可以看做是fact table。同時Table C是Table B的物化視圖,rollup的SQL如下:
?
| SELECT SUM(Clicks), SUM(Cost) GROUP BY AdvertiserId, Country |
?對于Table B的每個mini-batch更新,物化視圖都保持了和fact table的一致原子更新。
另外對于一些backfill和回滾數據的需求,比如某天的數據有問題,通常廣告領域就是反作弊后而后知,那么Mesa提出了negative facts的概念,也就是做減法即可,從最終一致的角度來做回滾。?
?
?
1.1.3 數據版本化
上一節提到了每個批次都版本化的概念,但是具體實現的困難要考慮:
?
1)每個版本獨立存儲很昂貴,浪費空間(而聚合后的數據往往更加的小)。
2)在查詢的時候going over所有的的版本并且做聚集,考慮每個版本是分鐘級生成了,那么每天的量也會很大,這種expensive的操作很影響在線的查詢延時。
3)傻傻的針對每一次更新,都在所有的版本上做預聚合,也非常的expensive。(看看bigtable、leveldb的多級存儲結構,就知道merge sort實時做每一個批次,系統是吃不消的)
?
為了解決這三個問題,Mesa的方案是:
?
提出Delta的概念,對于每次的更新,相同的Key都做預聚合,形成一個獨立的Singleton delta,一個Singleton delta包括很多rows,以及一個version = [V1, V2]。在某些場景下可能會不存儲原始數據,也就不能drill down到最細的粒度了,但是做了上卷所以會非常節省空間。
?
Delta之間可以做merge,例如[V1, V2]和[V2+1, V3]可以合并成[V1, V3],下面物理存儲章節會提到每個delta內部數據都是有序的,所有只需要線性時間復雜度(linear time),即最簡單的merge sorted array就可以合并好兩個delta。
?
Mesa要求查詢指定的版本號不能無限的小,需要在一個時間范圍前(比如24小時之內),這是因為還會存在一個Base compaction的策略,用來歸并所有的歷史delta,這和bigtable中的概念一樣,主要從查詢效率來說,通過合并小文件來減少隨機I/O的次數。合并了base之后,這些老版本的delta就可以刪除掉了。
?
但是base compaction往往是天級別做,因為很expensive,但是考慮分鐘級別的導入,也會有成百上千的小文件需要在runtime的時候做查詢,也就多了非常多的隨機I/O。為了加速實時的在線查詢,并且平衡導入的高吞吐,Mesa提出了多級的compaction策略,這里Mesa實際用了兩級存儲,會存在一個cumulative compaction的過程,例如每當積累到10個Singleton delta,就做一次小的多路歸并,合并成一個cumulative delta。再積累了10個之后再做一次多路歸并即可。
?
舉個例子,下圖的中Base是24小時之前的文件,天粒度聚合而成。存在61-92這些個singleton delta,它們都是每個mini-batch導入的預聚合好的數據,如果不存在cumulative delta,那么假如查詢條件的版本指定到91,那么就需要base,外加61-91這32次的隨機I/O,這種延遲明顯太大了,那么如果有了cumulative就可以按照最短路徑的算法,做一次查詢只需要base,加61-90這個cumulative,加91這一個delta,一共3次隨機I/O就可以查詢出來結果。
?
1.1.4 物理存儲格式
Mesa中的delta、cumulative和base在物理存儲上格式一樣,它們都是immutable的,這樣就很方便做mini-batch的增量的更新,而不至于很影響吞吐,因為compaction過程都是異步的。
?
Mesa的存儲格式要盡可能的節約空間,同時支持點查(fast seeking to a specific key),Mesa設計了索引Index和數據Data文件,物理上Index和Data數據是分開的,每個Index實際就是Short Key的順序排列外加offset偏移量,每個Data就是Key+Value的順序存儲。每個表都是這樣多個Index和多個Data的集合。
?
Mesa對于存儲格式并沒有展開說很多,但是提到了一些重點。Data文件中的數據按照Key有序排列,按行切塊形成row block,按列存儲,這種格式和現在的ORC、Parquet很像,Row Block的大小一般不大,它是從磁盤load到內存的最小粒度,使用這種格式很容易做壓縮,因為每一列的格式都是相同的,可以做一些輕量級的編碼比如RLE、字典編碼、Bitpacking等,在這個基礎之上再做重量級的壓縮,比如LZO、Snappy、GZIP等,就可以實現壓縮比很高的存儲。
?
Index文件存儲了Short Key,Short Key關聯一個Row Block,這樣只需要把Index加載到內存,在Index文件中做naive的二分查找定位Row Block在Data文件中偏移量offset,然后load Row Block加載到內存,再做一些Predicate filter的Scan,對于Key相同的按照聚合函數做聚合即可把結果查到。
?
對于Mesa的存儲模型,實際的物理上的文件可能會存在多個,如下圖所示。
每一對Index file和Data file的格式如果實現的最簡單,可以如下圖所示。如果按列存儲可以設計的更豐富,比如Parquet的數據存儲格式就為了支持嵌套的數據結構、方便做謂詞下推做了很多的設計。
?
?
1.2 系統架構
這一部分分為兩塊,第一是單DC(Datacenter)部署,第二是跨DC部署。這里不得不說Google的論文雖然拋出來的,但是細節都是很模糊的。
?
1.2.1 單DC部署
兩個子系統Update/Maintenance Subsystem和Query Subsystem分開,這樣也是為了滿足其高吞吐準實時導入,低延遲查詢的系統要求而做的技術選型。
?
Update/Maintenance Subsystem
?
主要職責包括,
1)加載update,并且按照存儲模型保存到Mesa的物理存儲上。
2)執行多級的compaction。
3)在線做schema change。
4)執行一些表的checksum檢查。
?
系統架構圖如下:
Controller可以看做是一個metadata的cache,worker的調度和queue的管理都它來。所有的metadata都存儲在BigTable中,所以Controller可以是一個無狀態的stateless的服務。Controller管理了4類worker,就是剛在提到的4個職責,各對應4種worker,Controller通過RPC接收外部的請求,然后把任務Task投遞到queue中。
?
Worker采用隔離的策略,4種職責各4個Worker Pool。Worker采用“拉”的策略,從queue中取任務,然后執行,例如加載update,取到任務后從任務的metadata中獲取原始數據(比如CSV文件)存儲的位置以及做一些數據校驗工作,然后做預聚合形成Singleton delta,存儲在Google的HDFS即Colussus中,然后再更新metadata commit這個版本已經incorperate到系統中形成了delta,外部可供查詢。圖中還有一個GC(Garbage Collector),這個就是Worker銷毀的,防止Worker死掉從而saturate整個Worker Pool。
?
這套Controller/Worker的架構,從下面要說的查詢系統中分離出來,充分體現了分治的策略,互不干擾。這里Table可能很大,所以Controller也是做了sharding的,來更好的做擴展,同時Controller不存在單點(SPOF),一旦有問題handoff到另外一個stand by即可,因為所有的metadata都在BigTable中存儲。
?
?
Query Subsystem
?
查詢子系統架構如下圖所示。
?一次查詢的步驟如下:獲取用戶請求,例如SQL,根據metadata,做校驗、語法解析、詞法解析、查詢計劃生成等,決定了需要查詢哪些文件;發起查詢請求,并且做歸并聚合處理;將結果轉換為客戶端需要的格式,響應回去。
?
Mesa作為一個簡單的通用存儲查詢系統,只提供了有限的語義,包括filter和group-by,剩下的Higher-level的語義包括JOIN、子查詢等等都由上層系統做,比如Google的Dremel或者MySQL。
?
這里論文還提到查詢系統的lable化,因為在線的reporting要求低延遲,一般是點查,而Ad-hoc的分析查詢一般要求高吞吐,為了防止二者互相干擾,還是采用了分治策略,把不同的query system貼上不同的label,這樣在查詢的時候可以有選擇的路由。
?
圖中的global locator service是每個query system啟動時候去注冊的,這樣client就可以根據label或者要查詢的表路由到正確的query server上。
?
?
1.2.2 跨DC部署
架構圖如下。
由于每個更新批次都是版本化的,所以采用MVCC機制,存在一個committer做upstream service和mesa的橋梁,對于每個update都保存在一個versions database – a globally replicated and consistent data store build on top of the Paxos consensus algorithm,實際可以看做spanner/F1中,然后依次的下發各個DC,每個DC內部都是剛剛提到的架構,Controller負責監聽新的version,拉取update并且更新本DC,成功后notify versions database,committer不斷的檢查是否commit criteria滿足了,比如5個里面3個成功了,那么commit這個version,再繼續下個批次的更新。
?
這種方案的好處在于,多個DC無鎖化和異步化,用以滿足高吞吐的導入和低延遲的查詢。
?
最后,論文還提到了一些Enhancements,包括query server的,使用MapReduce并行化處理worker任務的,如何做在線schema變更的,如果防止數據損壞(包括存儲的checksum,和異步的檢查等等)。一些lesson learned,可以說是分布式系統設計里面的common patter和容易踩到的坑的總結,可以好好讀讀。剩下的就是metrics對比了,這里不再贅述。
?
基本來說,Mesa論文還是很偏理論的,并且集中聚焦在數據模型上,這點我認為是貢獻最大的,下面要講的PALO也是借鑒了其數據模型。
?
?
2. PALO
2.1 簡介
說完了Mesa,說說PALO,PALO是百度2017年開源的項目,由于筆者之前有百度6年的工作的經歷,也使用過該項目的前身OlapEngine,所以這里簡單的介紹下。
?
Palo名字的由來是“玩轉OLAP”,把OLAP倒過來就是PALO。還是抓住github首頁的介紹關鍵詞:
A MPP-based Interactive Data Analysis SQL DB
PALO是基于MPP架構的,一個交互式的數據分析的SQL DB。注意其定位是一個DB,而不是像大數據領域的MPP比如開源的Presto、Impala那樣的純查詢引擎(query engine),所以PALO即包含存儲引擎,也包含查詢引擎(這里借鑒了Impala),而Presto、Impala的存儲都采用了開源的格式和存儲引擎,比如ORC、Parquet等,PALO的存儲格式借鑒了Mesa,所以這就是PALO和Mesa的聯系,PALO=Mesa的存儲引擎+Impala查詢引擎的開源實現。
?
百度內部一直有各種需求,比如statistics廣告統計報表就是典型。要支持增量更新,近實時,還需要提供低延遲的查詢,又要給批量的、高吞吐的Ad-hoc查詢做多維分析(比如BI系統)。過去用Mysql、Doris支持,但是都不理想。 而大家真正需要的是一個MPP SQL Engine。所以大家就有的搞MPP類的SparkSQL、Impala、Presto、Drill,有的搞MOLAP類的Druid、Kylin,有的考慮買商業數據庫(比如Greenplum,Vertica,AtScale),有的考慮用Amazon Redshift、Google BigQuery,有的嘗試了MonetDB等,所有方案基本都是因為較為復雜,或者不免費,或者不穩定,或者并不能很好的各種滿足需求,所以才逐步研發了PALO。
?
PALO是面向百TB ~ PB級別的查詢的產品,僅支持結構化數據,可供毫秒/秒級分析,是由百度大數據部團隊研發的,經歷了三代的產品Doris -> OlapEngine ->PALO,其中Doris是2012年之前廣告團隊采用的報表查詢系統,而OlapEngine是基于MySQL的一個查詢引擎,類似InnoDB或者MyISAM,也是借鑒了Mesa,最早是James Peng在鳳巢、網盟實施指導研發的項目,2014著手改造OlapEngine到PALO,PALO代表了當下state of the art的該類系統,目前廣泛應用于百度,150+產品線使用,600+臺機器,單一業務最大百TB。
?
PALO也可以看做是一個數據倉庫DW,因為借鑒的Mesa的模型,所以兼具低延遲的點查和高吞吐的Ad-hoc查詢功能。PALO支持batch loading和mini-batch即近實時的loading。和其他SQL-on-hadoop不同的是,PALO官方給出的特殊賣點是:
?
1)低成本的構建穩定可擴展的OLAP系統,開源免費并且可工作在普通機器上。
2)簡單易用的單一系統,拒絕hybrid architectures,不依賴Hadoop那套,架構簡單,并且可以使用MySQL協議接入。
?
下圖展示了PALO的定位。
?
?
2.2 PALO的特點
- 1) 高性能的行列存儲引擎
- 2) 小批量更新,批量原子提交,多版本支持
- 3) 高效的分布式數據導入
- 4) 支持Rollup Table, Scheme Change, Data Recovery
- 5) 較完備的分布式管理框架,使得整個PALO易用易運維
- 6) Range partition: 全局key排序,自動分裂還沒有滿足
- 7) MPP Query Engine – 低并發大查詢 + 高并發低延遲小查詢
- 8) 調度和資源隔離還在完善,支持優先級劃分和多租戶
- 9) 存儲分級支持,老數據用SATA,熱的新數據用SSD
- 10) 實現了Mysql網絡協議,可以很容易與各種上層工具打通
- 11) 支持多表join(這點由于自己實現了查詢引擎,所以彌補了Mesa存儲引擎的不能實現的)
- 12) Rollup表智能選擇
- 13) 支持謂詞下推
?
2.3 PALO的系統架構
架構圖如下。
?
FE包含query coordinator and catalog manager。Query coordinator接收SQL請求,根據元數據,編譯成query plan,然后建立query plan fragments,生成一個DAG執行的pipeline用于分發給BE執行(如下圖所示,是impala中的query plan到實際物理執行的DAG的轉換,可以把at HDFS和at HBase看做是BE執行的節點),Query coordinator統籌管理調度執行,這相當于Impalad的Query coordinator。Catalog manager存metadata,包括數據庫、表、分區、副本位置等等。多個FE可以保證HA和負載均衡。
(圖片來源:Impala論文)
?
FE是非對稱的架構,這和Hive、Impala等的中心架構不同,所有的metadata不是存儲在一個公共的服務上,在FE當中做了一個基于Paxos-like consensus算法的復制狀態機,這樣可以可靠的存儲數據,并且檢具擴展性,滿足高并發的查詢。FE分為三個角色,包括leader, follower和observer,leader負責寫入,follower用分布式一致性算法做同步日志,quorum方式使得follower成功,然后再commit。高并發場景下,多個follower會有問題,就像Zookeeper一樣,所以引入了observer角色,專門做異步同步。FE中的復制狀態機用Berkeley DB java version實現,FE和BE的通信使用Thrift框架。
?
PALO易用性好的一個方面也體現在其FE兼容MySQL協議上,也就是可以用MySQL client,JDBC等直接連接FE,發起DML、DDL語句,這樣也就非常好的可以和BI系統集成。
?
BE負責存儲數據、執行query fragments(這是impala論文里面提到的),BE就是一個query engine。BE沒有依賴任何分布式存儲,例如HDFS,而是自己負責管理多個副本,副本數量是可以指定的,由寫入的updater負責寫入多個副本,文件系統全是PALO自己管理的,所以PALO是一個DB,而不是一個其他大數據開源產品,例如Presto、Impala那樣的查詢引擎。FE在做query調度的時候會考慮數據的本地性(locality)以及最大化scan的能力。多個BE部署可以達到scalability and fault-tolerance。
?
PALO中的數據是水平分區的。按照桶bucket分區,但是Single-level不管是hash或者range都可能會有問題,比如hash(userid)或者range(date)都會不均勻,存在數據傾斜現象,所以PALO把這個shard的策略做成了可以支持Two-level。第一級是range partitioning,一般采用日期,方便做冷熱數據區分不同的存儲介質(SATA或者SSD),第二級是hash partitioning,可以看做是分桶,所以PALO要求使用者做好這個分區分桶。如果使用者執意把1TB大小的數據放到一個桶里面,那么這種不合理的使用和規劃,會影響PALO BE的查詢性能,因為BE執行一個Scan query fragments就是去按照Mesa的模型讀取Index和Data數據,一個MPP的思想就是并行化,這種大分桶也就限制了系統發揮能力,目前PALO還不支持自動分裂,不像HBase那這樣,所以這個分區和分桶的策略是做schema design的時候要提前考慮好的。
?
這里要提下PALO基于Mesa的數據模型和存儲模型,但是Mesa需要區分維度列和指標列,而一個通用的OLAP系統往往不能區分這些列,所以PALO為了做一個通用的OLAP,可以做到不區分這個維度列和指標列。即使不區分維度列和指標列,但是PALO借鑒了Mesa的存儲模型,所以如果沒有Key Space,那么就必須指定一個排序列,用于存儲需要。
?
PALO在性能追求上也是盡量做到最好。PALO的核心BE是使用C++開發的,這和Impala的思路很類似,Java的GC和內存控制一直是詬病,為了追求性能的極致,PALO選擇了C++作為開發語言更好的控制。同時PALO支持一些流行的OLAP優化手段,包括向量化執行和JIT,C++使用LLVM。PALO支持分區剪枝(Partition pruning),支持bloomfilter做某列的索引,同時Index中會存儲MIN/MAX等基本信息,方便做Predicate pushdown謂詞下推。由于基于Mesa,會利用預聚合的方式,使用物化視圖和做上卷表,在某些場景下可以大大加速查詢效率。綜上,這些都加速了OLAP的查詢性能。
?
2.4 PALO總結
這里由于筆者的精力有限,還沒有大規模使用PALO,暫且對于PALO的認識就限上面所述。作為一個前老百度人,對于百度開源產品還是很看好的,在公司內這可是明星級別的并且廣泛應用的產品,雖然現在剛剛開源,在產品化、文檔、工具、排查、穩定性等方面還需要完善和經受考驗,但是如果這個產品可以解決大家的痛點,作為building blocks可以幫助企業快速解決問題,我想社區的力量是巨大的,一定會把它發揚光大好,希望PALO未來的路越走越好。
總結
以上是生活随笔為你收集整理的从Google Mesa到百度PALO(数仓)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 浅谈数据湖
- 下一篇: k8s 基础介绍及概念