MaxCompute Hash Clustering介绍
?
背景
在MaxCompute查詢中,Join是很常見(jiàn)的場(chǎng)景。例如以下Query,就是一個(gè)簡(jiǎn)單的Inner Join把t1表和t2表通過(guò)id連接起來(lái):
SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;
Join在MaxCompute內(nèi)部主要有三種實(shí)現(xiàn)方法:
Broadcast Hash Join - 當(dāng)Join存在一個(gè)很小的表時(shí),我們會(huì)采用這種方式,即把小表廣播傳遞到所有的Join Task Instance上面,然后直接和大表做Hash Join。
Shuffle Hash Join - 如果Join表比較大,我們就不能直接廣播了。這時(shí)候,我么可以把兩個(gè)表按照J(rèn)oin Key做Hash Shuffle,由于相同的鍵值Hash結(jié)果也是一樣的,這就保證了相同的Key的記錄會(huì)收集到同一個(gè)Join Task Instance上面。然后,每個(gè)Instance對(duì)數(shù)據(jù)量小的一路建Hash表,數(shù)據(jù)量大的順序讀取Join。
Sort Merge Join - 如果Join的表更大一些,#2的方法也用不了,因?yàn)閮?nèi)存已經(jīng)不足以容納建立一個(gè)Hash Table。這時(shí)我們的實(shí)現(xiàn)方法是,先按照J(rèn)oin Key做Hash Shuffle,然后再按照J(rèn)oin Key做排序,最后我們對(duì)Join雙方做一個(gè)歸并,具體流程如下圖所示:
實(shí)際上對(duì)于MaxCompute今天的數(shù)據(jù)量和規(guī)模,我們絕大多數(shù)情況下都是使用的Sort Merge Join,但這其實(shí)是非常昂貴的操作。從上圖可以看到,Shuffle的時(shí)候需要一次計(jì)算,并且中間結(jié)果需要落盤,后續(xù)Reducer讀取的時(shí)候,又需要讀取和排序的過(guò)程。對(duì)于M個(gè)Mapper和R個(gè)Reducer的場(chǎng)景,我們將產(chǎn)生M x R次的IO讀取。對(duì)應(yīng)的Fuxi物理執(zhí)行計(jì)劃如下所示,需要兩個(gè)Mapper Stage,一個(gè)Join Stage,其中紅色部分為Shuffle和Sort操作:
與此同時(shí),我們觀察到,有些Join是可能反復(fù)發(fā)生的,比如上面的Query改成了:
SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;
雖然,我們選擇的列不一樣了,但是底下的Join是完全一樣的,整個(gè)Shuffle和Sort的過(guò)程也是完全一樣的。
又或者:
SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;
這個(gè)時(shí)候是t1和t3來(lái)Join,但實(shí)際上對(duì)于t1而言,整個(gè)Shuffle和Sort過(guò)程還是完全一樣。
于是,我們考慮,如果我們初始表數(shù)據(jù)生成時(shí),按照Hash Shuffle和Sort的方式存儲(chǔ),那么后續(xù)查詢中將避免對(duì)數(shù)據(jù)的再次Shuffle和Sort。這樣做的好處是,雖然建表時(shí)付出了一次性的代價(jià),卻節(jié)省了將來(lái)可能產(chǎn)生的反復(fù)的Shuffle和Join。這時(shí)Join的Fuxi物理執(zhí)行計(jì)劃變成了如下所示,不僅節(jié)省了Shuffle和Sort的操作,并且查詢從3個(gè)Stage變成了1個(gè)Stage完成:
所以,總結(jié)來(lái)說(shuō),Hash Clustering通過(guò)允許用戶在建表時(shí)設(shè)置表的Shuffle和Sort屬性,進(jìn)而MaxCompute根據(jù)數(shù)據(jù)已有的存儲(chǔ)特性,優(yōu)化執(zhí)行計(jì)劃,提高效率,節(jié)省資源消耗。
功能描述
目前Hash Clustering功能已經(jīng)上線,缺省條件下即打開(kāi)支持。
- 創(chuàng)建Hash Clustering Table
用戶可以使用以下語(yǔ)句創(chuàng)建Hash Clustering表。用戶需要指定Cluster Key(即Hash Key),以及Hash分片(我們稱之為Bucket)的數(shù)目。Sort是可以選項(xiàng),但在大多數(shù)情況下,建議和Cluster Key一致,以便取得最佳的優(yōu)化效果。
CREATE TABLE [IF NOT EXISTS] table_name
[(col_name data_type [comment col_comment], ...)][comment table_comment][PARTITIONED BY (col_name data_type [comment col_comment], ...)][CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETS]
[AS select_statement]
舉個(gè)例子如下:
CREATE TABLE T1 (a string, b string, c bigint) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
如果是分區(qū)表,則可以用這樣的語(yǔ)句創(chuàng)建:
CREATE TABLE T1 (a string, b string, c bigint) PARTITIONED BY (dt string) CLUSTERED BY (c) SORTED by (c) INTO 1024 BUCKETS;
CLUSTERED BY
CLUSTERED BY指定Hash Key,MaxCompute將對(duì)指定列進(jìn)行Hash運(yùn)算,按照Hash值分散到各個(gè)Bucket里面。為避免數(shù)據(jù)傾斜,避免熱點(diǎn),取得較好的并行執(zhí)行效果,CLUSTERED BY列適宜選擇取值范圍大,重復(fù)鍵值少的列。此外,為了達(dá)到Join優(yōu)化的目的,也應(yīng)該考慮選取常用的Join/Aggregation Key,即類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的主鍵。
SORTED BY
SORTED BY子句用于指定在Bucket內(nèi)字段的排序方式,建議Sorted By和Clustered By一致,以取得較好的性能。此外,當(dāng)SORTED BY子句指定之后,MaxCompute將自動(dòng)生成索引,并且在查詢的時(shí)候利用索引來(lái)加快執(zhí)行。
INTO number_of_buckets BUCKETS
INTO ... BUCKETS 指定了哈希桶的數(shù)目,這個(gè)數(shù)字必須提供,但用戶應(yīng)該由數(shù)據(jù)量大小來(lái)決定。Bucket越多并發(fā)度越大,Job整體運(yùn)行時(shí)間越短,但同時(shí)如果Bucket太多的話,可能導(dǎo)致小文件太多,另外并發(fā)度過(guò)高也會(huì)造成CPU時(shí)間的增加。目前推薦設(shè)置讓每個(gè)Bucket數(shù)據(jù)大小在500MB - 1GB之間,如果是特別大的表,這個(gè)數(shù)值可以再大點(diǎn)。
目前,MaxCompute只能在Bucket Number完全一致的情況下去掉Shuffle步驟,我們下一個(gè)發(fā)布,會(huì)支持Bucket的對(duì)齊,也就是說(shuō)存在Bucket倍數(shù)關(guān)系的表,也可以做Shuffle Remove。為了將來(lái)可以較好的利用這個(gè)功能,我們建議Bucket Number選用2的N次方,如512,1024,2048,最大不超過(guò)4096,否則影響性能以及資源使用。
對(duì)于Join優(yōu)化的場(chǎng)景,兩個(gè)表的Join要去掉Shuffle和Sort步驟,要求哈希桶數(shù)目一致。如果按照上述原則計(jì)算兩個(gè)表的哈希桶數(shù)不一致,怎么辦呢?這時(shí)候建議統(tǒng)一使用數(shù)字大的Bucket Number,這樣可以保證合理的并發(fā)度和執(zhí)行效率。如果表的大小實(shí)在是相差太遠(yuǎn),那么Bucket Number設(shè)置,可以采用倍數(shù)關(guān)系,比如1024和256,這樣將來(lái)我們進(jìn)一步支持哈希桶的自動(dòng)分裂和合并時(shí),也可以利用數(shù)據(jù)特性進(jìn)行優(yōu)化。
- 更改表屬性
對(duì)于分區(qū)表,我們支持通過(guò)ALTER TABLE語(yǔ)句,來(lái)增加或者去除Hash Clustering屬性:
ALTER TABLE table_name
[CLUSTERED BY (col_name [, col_name, ...]) [SORTED BY (col_name [ASC | DESC] [, col_name [ASC | DESC] ...])] INTO number_of_buckets BUCKETSALTER TABLE table_name NOT CLUSTERED;
關(guān)于ALTER TABLE,有幾點(diǎn)需要注意:
alter table改變聚集屬性,只對(duì)于分區(qū)表有效,非分區(qū)表一旦聚集屬性建立就無(wú)法改變。
alter table只會(huì)影響分區(qū)表的新建分區(qū)(包括insert overwrite生成的),新分區(qū)將按新的聚集屬性存儲(chǔ),老的數(shù)據(jù)分區(qū)保持不變。
由于alter table只影響新分區(qū),所以該語(yǔ)句不可以再指定PARTITION
ALTER TABLE語(yǔ)句適用于存量表,在增加了新的聚集屬性之后,新的分區(qū)將做hash cluster存儲(chǔ)。
- 表屬性顯示驗(yàn)證
在創(chuàng)建Hash Clustering Table之后,可以通過(guò):
DESC EXTENDED table_name;
來(lái)查看表屬性,Clustering屬性將顯示在Extended Info里面,如下圖所示:
對(duì)于分區(qū)表,除了可以使用以上命令查看Table屬性之后,于是需要通過(guò)以下命令查看分區(qū)的屬性:
DESC EXTENDED table_name partition(pt_spec);
例如:
Hash Clustering的其他優(yōu)點(diǎn)
- Bucket Pruning優(yōu)化
考慮以下查詢:
CREATE TABLE t1 (id bigint, a string, b string) CLUSTERED BY (id) SORTED BY (id) into 1000 BUCKETS;
...
SELECT t1.a, t1.b, t1.c FROM t1 WHERE t1.id=12345;
對(duì)于普通表,這個(gè)通常意味著全表掃描操作,如果表非常大的情況下,資源消耗量是非常可觀的。但是,因?yàn)槲覀円呀?jīng)對(duì)id做Hash Shuffle,并且對(duì)id做排序,我們的查詢可以大大簡(jiǎn)化:
通過(guò)查詢值"12345"找到對(duì)應(yīng)的Hash Bucket,這時(shí)候我們只需要在1個(gè)Bucket里面掃描,而不是全部1000個(gè)。我們稱之為“Bucket Pruning”。
以下是安全部基于User ID查詢場(chǎng)景的一個(gè)例子。下面這個(gè)logview是普通的表的查詢操作,可以看到,由于數(shù)據(jù)量很大,一共起了1111個(gè)Mapper,讀取了427億條記錄,最后找符合條件記錄26條,總共耗時(shí)1分48秒:
同樣的數(shù)據(jù),同樣的查詢,用Hash Clustering表來(lái)做,我們可以直接定位到單個(gè)Bucket,并利用Index只讀取包含查詢數(shù)據(jù)的Page,可以看到這里只用了4個(gè)Mapper,讀取了10000條記錄,總共耗時(shí)只需要6秒,如果用service mode這個(gè)時(shí)間還會(huì)更短:
- Aggregation優(yōu)化
例如,對(duì)于以下查詢:
SELECT department, SUM(salary) FROM employee GROUP BY (department);?
在通常情況下,我們會(huì)對(duì)department進(jìn)行Shuffle和Sort,然后做Stream Aggregate,統(tǒng)計(jì)每一個(gè)department group。但是如果表數(shù)據(jù)已經(jīng)CLUSTERED BY (department) SORTED BY (department),那么這個(gè)Shuffle和Sort的操作,也就相應(yīng)節(jié)省掉了。
- 存儲(chǔ)優(yōu)化
即便我們不考慮以上所述的各種計(jì)算上的優(yōu)化,單單是把表Shuffle并排序存儲(chǔ),都會(huì)對(duì)于存儲(chǔ)空間節(jié)省上有很大幫助。因?yàn)镸axCompute底層使用列存儲(chǔ),通過(guò)排序,鍵值相同或相近的記錄存放到一起,對(duì)于壓縮,編碼都會(huì)更加友好,從而使得壓縮效率更高。在實(shí)際測(cè)試中,某些極端情況下,排序存儲(chǔ)的表可以比無(wú)序表的存儲(chǔ)空間節(jié)省50%。對(duì)于生命周期很長(zhǎng)的表,使用Hash Clustering存儲(chǔ),是一個(gè)很值得考慮的優(yōu)化。
以下是一個(gè)簡(jiǎn)單的實(shí)驗(yàn),使用100G TPC-H lineitem表,包含了int,double,string等多種數(shù)據(jù)類型,在數(shù)據(jù)和壓縮方式等完全一樣的情況下,hash clustering的表空間節(jié)省了~10%。
測(cè)試數(shù)據(jù)及分析
對(duì)于Hash Clustering整體帶來(lái)的性能收益,我們通過(guò)標(biāo)準(zhǔn)的TPC-H測(cè)試集進(jìn)行衡量。測(cè)試使用1T數(shù)據(jù),統(tǒng)一使用500 Buckets,除了nation和region兩個(gè)極小的表以外,其余所有表均按照第一個(gè)列作為Cluster和Sort Key。
整體測(cè)試結(jié)果表明,在使用了Hash Clustering之后,總CPU時(shí)間減少17.3%,總的Job運(yùn)行時(shí)間減少12.8%。
具體各個(gè)Query CPU時(shí)間對(duì)比如下:
Job運(yùn)行時(shí)間對(duì)比如下:
需要注意到是TPC-H里并不是所有的Query都可以利用到Clustering屬性,特別是兩個(gè)耗時(shí)最長(zhǎng)的Query沒(méi)有辦法利用上,所以從總體上的效率提升并不是非常驚人。但如果單看可以利用上Clustering屬性的Query,收益還是非常明顯的,比如Q4快了68%,Q12快了62%,Q10快了47%,等等。
以下是TPC-H Q4在普通表的Fuxi執(zhí)行計(jì)劃:
而下面則是使用Hash Clustering之后的執(zhí)行計(jì)劃,可以看到,這個(gè)DAG被大大的簡(jiǎn)化,這也是性能得到大幅提升的關(guān)鍵原因:
功能限制及將來(lái)計(jì)劃
目前Hash Clustering的第一階段開(kāi)發(fā)工作完成,但還存在以下限制和不足:
?
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
?
總結(jié)
以上是生活随笔為你收集整理的MaxCompute Hash Clustering介绍的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 十余位权威专家深度解读,达摩院2019十
- 下一篇: 2018年自然语言处理最值得关注的研究、