数据仓库中的SQL性能优化 - Hive篇
一個Hive查詢生成多個map reduce job,一個map reduce job又有map,reduce,spill,shuffle,sort等多個階段,所以針對hive查詢的優化可以大致分為針對MR中單個步驟的優化(其中又會有細分),針對MR全局的優化,和針對整個查詢(多MR job)的優化,下文會分別闡述。
在開始之前,先把MR的流程圖帖出來(摘自Hadoop權威指南),方便后面對照。另外要說明的是,這個優化只是針對Hive 0.9版本,而不是后來Hortonwork發起Stinger項目之后的版本。相對應的Hadoop版本是1.x而非2.x。
Map階段的優化(map phase)
Map階段的優化,主要是確定合適的map數。那么首先要了解map數的計算公式:
?num_map_tasks = max[${mapred.min.split.size},
min(${dfs.block.size}, ${mapred.max.split.size})]
- mapred.min.split.size指的是數據的最小分割單元大小。
- mapred.max.split.size指的是數據的最大分割單元大小。
- dfs.block.size指的是HDFS設置的數據塊大小。
一般來說dfs.block.size這個值是一個已經指定好的值,而且這個參數hive是識別不到的:
?hive> set dfs.block.size;
dfs.block.size is undefined
所以實際上只有mapred.min.split.size和mapred.max.split.size這兩個參數(本節內容后面就以min和max指代這兩個參數)來決定map數量。在hive中min的默認值是1B,max的默認值是256MB:
?hive> set mapred.min.split.size;
mapred.min.split.size=1
hive> set mapred.max.split.size;
mapred.max.split.size=256000000
所以如果不做修改的話,就是1個map task處理256MB數據,我們就以調整max為主。通過調整max可以起到調整map數的作用,減小max可以增加map數,增大max可以減少map數。需要提醒的是,直接調整mapred.map.tasks這個參數是沒有效果的。
調整大小的時機根據查詢的不同而不同,總的來講可以通過觀察map task的完成時間來確定是否需要增加map資源。如果map task的完成時間都是接近1分鐘,甚至幾分鐘了,那么往往增加map數量,使得每個map task處理的數據量減少,能夠讓map task更快完成;而如果map task的運行時間已經很少了,比如10-20秒,這個時候增加map不太可能讓map task更快完成,反而可能因為map需要的初始化時間反而讓job總體速度變慢,這個時候反而需要考慮是否可以把map的數量減少,這樣可以節省更多資源給其他Job。
Reduce階段的優化(reduce phase)
這里說的reduce階段,是指前面流程圖中的reduce phase(實際的reduce計算)而非圖中整個reduce task。Reduce階段優化的主要工作也是選擇合適的reduce task數量,跟上面的map優化類似。
與map優化不同的是,reduce優化時,可以直接設置mapred.reduce.tasks參數從而直接指定reduce的個數。當然直接指定reduce個數雖然比較方便,但是不利于自動擴展。Reduce數的設置雖然相較map更靈活,但是也可以像map一樣設定一個自動生成規則,這樣運行定時job的時候就不用擔心原來設置的固定reduce數會由于數據量的變化而不合適。
Hive估算reduce數量的時候,使用的是下面的公式:
?num_reduce_tasks = min[${hive.exec.reducers.max},
(${input.size} / ${ hive.exec.reducers.bytes.per.reducer})]
hive.exec.reducers.bytes.per.reducer默認為1G,也就是每個reduce處理相當于job輸入文件中1G大小的對應數據量,而且reduce個數不能超過一個上限參數值,這個參數的默認取值為999。所以我們也可以用調整這個公式的方式調整reduce數量,在靈活性和定制性上取得一個平衡。
設置reduce數同樣也是根據運行時間作為參考調整,并且可以根據特定的業務需求、工作負載類型總結出經驗,所以不再贅述。
Map與Reduce之間的優化(spill, copy, sort phase)
map phase和reduce phase之間主要有3道工序。首先要把map輸出的結果進行排序后做成中間文件,其次這個中間文件就能分發到各個reduce,最后reduce端在執行reduce phase之前把收集到的排序子文件合并成一個排序文件。這個部分可以調的參數挺多,但是一般都是不要調整的,不必重點關注。
Spill 與 Sort
在spill階段,由于內存不夠,數據可能沒辦法在內存中一次性排序完成,那么就只能把局部排序的文件先保存到磁盤上,這個動作叫spill,然后spill出來的多個文件可以在最后進行merge。如果發生spill,可以通過設置io.sort.mb來增大mapper輸出buffer的大小,避免spill的發生。另外合并時可以通過設置io.sort.factor來使得一次性能夠合并更多的數據。調試參數的時候,一個要看spill的時間成本,一個要看merge的時間成本,還需要注意不要撐爆內存(io.sort.mb是算在map的內存里面的)。Reduce端的merge也是一樣可以用io.sort.factor。一般情況下這兩個參數很少需要調整,除非很明確知道這個地方是瓶頸。如果map端的輸出太大,考慮到map數不一定能很方便的調整,那么這個時候就要考慮調大io.sort.mb(不過即使調大也要注意不能超過jvm heap size)。map端的輸出很大,要么是每個map讀入了很大的文件(比如不能split的大gz壓縮文件),要么是計算邏輯導致輸出膨脹了很多倍,都是比較少見的情況。
Copy
copy階段是把文件從map端copy到reduce端。默認情況下在5%的map完成的情況下reduce就開始啟動copy,這個有時候是很浪費資源的,因為reduce一旦啟動就被占用,一直等到map全部完成,收集到所有數據才可以進行后面的動作,所以我們可以等比較多的map完成之后再啟動reduce流程,這個比例可以通mapred.reduce.slowstart.completed.maps去調整,他的默認值就是5%。如果覺得這么做會減慢reduce端copy的進度,可以把copy過程的線程增大。tasktracker.http.threads可以決定作為server端的map用于提供數據傳輸服務的線程,mapred.reduce.parallel.copies可以決定作為client端的reduce同時從map端拉取數據的并行度(一次同時從多少個map拉數據),修改參數的時候這兩個注意協調一下,server端能處理client端的請求即可。
文件格式的優化
文件格式方面有兩個問題,一個是給輸入和輸出選擇合適的文件格式,另一個則是小文件問題。小文件問題在目前的hive環境下已經得到了比較好的解決,hive的默認配置中就可以在小文件輸入時自動把多個文件合并給1個map處理,輸出時如果文件很小也會進行一輪單獨的合并,所以這里就不專門討論了。相關的參數可以在這里找到。
關于文件格式,Hive0.9版本有3種,textfile,sequencefile和rcfile。總體上來說,rcfile的壓縮比例和查詢時間稍好一點,所以推薦使用。
關于使用方法,可以在建表結構時可以指定格式,然后指定壓縮插入:
?create table rc_file_test( col int ) stored as rcfile;
set hive.exec.compress.output = true;
insert overwrite table rc_file_test
select * from source_table;
另外時也可以指定輸出格式,也可以通過hive.default.fileformat來設定輸出格式,適用于create table as select的情況:
?set hive.default.fileformat = SequenceFile;
set hive.exec.compress.output = true;
/*對于sequencefile,有record和block兩種壓縮方式可選,block壓縮比更高*/
set mapred.output.compression.type = BLOCK;
create table seq_file_test
as select * from source_table;
上面的文件格式轉換,其實是由hive完成的(也就是插入動作)。但是也可以由外部直接導入純文本(可以按照這里的做法預先壓縮),或者是由MapReduce Job生成的數據。
值得注意的是,hive讀取sequencefile的時候,是把key忽略的,也就是直接讀value并且按照指定分隔符分隔字段。但是如果hive的數據來源是從mr生成的,那么寫sequencefile的時候,key和value都是有意義的,key不能被忽略,而是應該當成第一個字段。為了解決這種不匹配的情況,有兩種辦法。一種是要求凡是結果會給hive用的mr job輸出value的時候帶上key。但是這樣的話對于開發是一個負擔,讀寫數據的時候都要注意這個情況。所以更好的方法是第二種,也就是把這個源自于hive的問題交給hive解決,寫一個InputFormat包裝一下,把value輸出加上key即可。以下是核心代碼,修改了RecordReader的next方法:
?//注意:這里為了簡化,假定了key和value都是Text類型,所以MR的輸出的k/v都要是Text類型。
//這個簡化還會造成數據為空時,出現org.apache.hadoop.io.BytesWritable cannot be cast to org.apache.hadoop.io.Text的錯誤,因為默認hive的sequencefile的key是一個空的ByteWritable。
public synchronized boolean next(K key, V value) throws IOException
{
Text tKey = (Text) key;
Text tValue = (Text) value;
if (!super.next(innerKey, innerValue))
return false;
Text inner_key = (Text) innerKey; //在構造函數中用createKey()生成
Text inner_value = (Text) innerValue; //在構造函數中用createValue()生成
tKey.set(inner_key);
tValue.set(inner_key.toString() + '\t' + inner_value.toString()); // 分隔符注意自己定義
return true;
}
Job整體優化
有一些問題必須從job的整體角度去觀察。這里討論幾個問題:Job執行模式(本地執行v.s.分布式執行)、JVM重用、索引、Join算法、數據傾斜。
Job執行模式
Hadoop的map reduce job可以有3種模式執行,即本地模式,偽分布式,還有真正的分布式。本地模式和偽分布式都是在最初學習hadoop的時候往往被說成是做單機開發的時候用到。但是實際上對于處理數據量非常小的job,直接啟動分布式job會消耗大量資源,而真正執行計算的時間反而非常少。這個時候就應該使用本地模式執行mr job,這樣執行的時候不會啟動分布式job,執行速度就會快很多。比如一般來說啟動分布式job,無論多小的數據量,執行時間一般不會少于20s,而使用本地mr模式,10秒左右就能出結果。
設置執行模式的主要參數有三個,一個是hive.exec.mode.local.auto,把他設為true就能夠自動開啟local mr模式。但是這還不足以啟動local mr,輸入的文件數量和數據量大小必須要控制,這兩個參數分別為hive.exec.mode.local.auto.tasks.max和hive.exec.mode.local.auto.inputbytes.max,默認值分別為4和128MB,即默認情況下,map處理的文件數不超過4個并且總大小小于128MB就啟用local mr模式。
另外,如果是簡單的select語句,比如select某個列取個10條數據看看sample,那么在hive0.10之后有專門的fetch task優化,使用參數hive.fetch.task.conversion即可。
JVM重用
正常情況下,MapReduce啟動的JVM在完成一個task之后就退出了,但是如果任務花費時間很短,又要多次啟動JVM的情況下(比如對很大數據量進行計數操作),JVM的啟動時間就會變成一個比較大的overhead。在這種情況下,可以使用jvm重用的參數:
set mapred.job.reuse.jvm.num.tasks = 5;他的作用是讓一個jvm運行多次任務之后再退出。這樣一來也能節約不少JVM啟動時間。
索引
總體上來說,hive的索引目前還是一個不太適合使用的東西,這里只是考慮到敘述完整性,對其進行基本的介紹。
Hive中的索引架構開放了一個接口,允許你根據這個接口去實現自己的索引。目前hive自己有一個參考的索引實現(CompactIndex),后來在0.8版本中又加入位圖索引。這里就講講CompactIndex。
CompactIndex的實現原理類似一個lookup table,而非傳統數據庫中的B樹。如果你對table A的col1做了索引,索引文件本身就是一個table,這個table會有3列,分別是col1的枚舉值,每個值對應的數據文件位置,以及在這個文件位置中的偏移量。通過這種方式,可以減少你查詢的數據量(偏移量可以告訴你從哪個位置開始找,自然只需要定位到相應的block),起到減少資源消耗的作用。但是就其性能來說,并沒有很大的改善,很可能還不如構建索引需要花的時間。所以在集群資源充足的情況下,沒有太大必要考慮索引。
CompactIndex的還有一個缺點就是使用起來不友好,索引建完之后,使用之前還需要根據查詢條件做一個同樣剪裁才能使用,索引的內部結構完全暴露,而且還要花費額外的時間。具體看看下面的使用方法就了解了:
?/*在index_test_table表的id字段上創建索引*/
create index idx on table index_test_table(id)
as 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler' with deferred rebuild;
alter index idx on index_test_table rebuild;
/*索引的剪裁。找到上面建的索引表,根據你最終要用的查詢條件剪裁一下。*/
/*如果你想跟RDBMS一樣建完索引就用,那是不行的,會直接報錯,這也是其麻煩的地方*/
create table my_index
as select _bucketname, `_offsets`
from default__index_test_table_idx__ where id = 10;
/*現在可以用索引了,注意最終查詢條件跟上面的剪裁條件一致*/
set hive.index.compact.file = /user/hive/warehouse/my_index;
set hive.input.format = org.apache.hadoop.hive.ql.index.compact.HiveCompactIndexInputFormat;
select count(*) from index_test_table where id = 10;
Join算法
處理分布式join,一般有兩種方法:
- replication join:把其中一個表復制到所有節點,這樣另一個表在每個節點上面的分片就可以跟這個完整的表join了;
- repartition join:把兩份數據按照join key進行hash重分布,讓每個節點處理hash值相同的join key數據,也就是做局部的join。
這兩種方式在M/R Job中分別對應了map side join和reduce side join。在一些MPP數據庫中,數據可以按照某列字段預先進行hash分布,這樣在跟這個表以這個字段為join key進行join的時候,該表肯定不需要做數據重分布了。這種功能是以HDFS作為底層文件系統的hive所沒有的,即使是hive中的bucket也只能到文件級別的hash,而非節點級別的hash。
在默認情況下,hive的join策略是進行reduce side join。當兩個表中有一個是小表的時候,就可以考慮用map join了,因為小表復制的代價會好過大表shuffle的代價。使用map join的配置方法有兩種,一種直接在sql中寫hint,語法是/*+MAPJOIN (tbl)*/,其中tbl就是你想要做replication的表。另一種方法是設置hive.auto.convert.join = true,這樣hive會自動判斷當前的join操作是否合適做map join,主要是找join的兩個表中有沒有小表。至于多大的表算小表,則是由hive.smalltable.filesize決定,默認25MB。
但是有的時候,沒有一個表足夠小到能夠放進內存,但是還是想用map join怎么辦?這個時候就要用到bucket map join。其方法是兩個join表在join key上都做hash bucket,并且把你打算復制的那個(相對)小表的bucket數設置為大表的倍數。這樣數據就會按照join key做hash bucket。小表依然復制到所有節點,map join的時候,小表的每一組bucket加載成hashtable,與對應的一個大表bucket做局部join,這樣每次只需要加載部分hashtable就可以了。
然后在兩個表的join key都具有唯一性的時候(也就是可做主鍵),還可以進一步做sort merge bucket map join。做法還是兩邊要做hash bucket,而且每個bucket內部要進行排序。這樣一來當兩邊bucket要做局部join的時候,只需要用類似merge sort算法中的merge操作一樣把兩個bucket順序遍歷一遍即可完成,這樣甚至都不用把一個bucket完整的加載成hashtable,這對性能的提升會有很大幫助。
然后這里以一個完整的實驗說明這幾種join算法如何操作。
首先建表要帶上bucket:
create table map_join_test(id int)
clustered by (id) sorted by (id) into 32 buckets
stored as textfile;
然后插入我們準備好的800萬行數據,注意要強制劃分成bucket(也就是用reduce劃分hash值相同的數據到相同的文件):
?set hive.enforce.bucketing = true;
insert overwrite table map_join_test
select * from map_join_source_data;
這樣這個表就有了800萬id值(且里面沒有重復值,所以可以做sort merge),占用80MB左右。
接下來我們就可以一一嘗試map join的算法了。首先是普通的map join:
select /*+mapjoin(a) */count(*)
from map_join_test a
join map_join_test b on a.id = b.id;
然后就會看到分發hash table的過程:
?2013-08-31 09:08:43 Starting to launch local task to process map join; maximum memory = 1004929024
2013-08-31 09:08:45 Processing rows: 200000 Hashtable size: 199999 Memory usage: 38823016 rate: 0.039
2013-08-31 09:08:46 Processing rows: 300000 Hashtable size: 299999 Memory usage: 56166968 rate: 0.056
……
2013-08-31 09:12:39 Processing rows: 4900000 Hashtable size: 4899999 Memory usage: 896968104 rate: 0.893
2013-08-31 09:12:47 Processing rows: 5000000 Hashtable size: 4999999 Memory usage: 922733048 rate: 0.918
Execution failed with exit status: 2
Obtaining error information
Task failed!
Task ID:
Stage-4
?
不幸的是,居然內存不夠了,直接做map join失敗了。但是80MB的大小為何用1G的heap size都放不下?觀察整個過程就會發現,平均一條記錄需要用到200字節的存儲空間,這個overhead太大了,對于map join的小表size一定要好好評估,如果有幾十萬記錄數就要小心了。雖然不太清楚其中的構造原理,但是在互聯網上也能找到其他的例證,比如這里和這里,平均一行500字節左右。這個明顯比一般的表一行占用的數據量要大。不過hive也在做這方面的改進,爭取縮小hash table,比如HIVE-6430。
所以接下來我們就用bucket map join,之前分的bucket就派上用處了。只需要在上述sql的前面加上如下的設置:
set hive.optimize.bucketmapjoin = true;然后還是會看到hash table分發:
?2013-08-31 09:20:39 Starting to launch local task to process map join; maximum memory = 1004929024
2013-08-31 09:20:41 Processing rows: 200000 Hashtable size: 199999 Memory usage: 38844832 rate: 0.039
2013-08-31 09:20:42 Processing rows: 275567 Hashtable size: 275567 Memory usage: 51873632 rate: 0.052
2013-08-31 09:20:42 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable
2013-08-31 09:20:46 Upload 1 File to: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000000_0.hashtable File size: 11022975
2013-08-31 09:20:47 Processing rows: 300000 Hashtable size: 24432 Memory usage: 8470976 rate: 0.008
2013-08-31 09:20:47 Processing rows: 400000 Hashtable size: 124432 Memory usage: 25368080 rate: 0.025
2013-08-31 09:20:48 Processing rows: 500000 Hashtable size: 224432 Memory usage: 42968080 rate: 0.043
2013-08-31 09:20:49 Processing rows: 551527 Hashtable size: 275960 Memory usage: 52022488 rate: 0.052
2013-08-31 09:20:49 Dump the hashtable into file: file:/tmp/hadoop/hive_2013-08-31_21-20-37_444_1135806892100127714/-local-10003/HashTable-Stage-1/MapJoin-a-10-000001_0.hashtable
……
這次就會看到每次構建完一個hash table(也就是所對應的對應一個bucket),會把這個hash table寫入文件,重新構建新的hash table。這樣一來由于每個hash table的量比較小,也就不會有內存不足的問題,整個sql也能成功運行。不過光光是這個復制動作就要花去3分半的時間,所以如果整個job本來就花不了多少時間的,那這個時間就不可小視。
最后我們試試sort merge bucket map join,在bucket map join的基礎上加上下面的設置即可:
?set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
sort merge bucket map join是不會產生hash table復制的步驟的,直接開始做實際map端join操作了,數據在join的時候邊做邊讀。跳過復制的步驟,外加join算法的改進,使得sort merge bucket map join的效率要明顯好于bucket map join。
關于join的算法雖然有這么些選擇,但是個人覺得,對于日常使用,掌握默認的reduce join和普通的(無bucket)map join已經能解決大多數問題。如果小表不能完全放內存,但是小表相對大表的size量級差別也非常大的時候,或者是必須要做cross join,那也可以試試bucket map join,不過其hash table分發的過程會浪費不少時間,需要評估下是否能夠比reduce join更高效。而sort merge bucket map join雖然性能不錯,但是把數據做成bucket本身也需要時間,另外其發動條件比較特殊,就是兩邊join key必須都唯一(很多介紹資料中都不提這一點。強調下必須都是唯一,哪怕只有一個表不唯一,出來的結果也是錯的。當然,其實這點完全可以根據其算法原理推敲出來)。這樣的場景相對比較少見,“用戶基本表 join 用戶擴展表”以及“用戶今天的數據快照 join 用戶昨天的數據快照”這類場景可能比較合適。
這里順便說個題外話,在數據倉庫中,小表往往是維度表,而小表map join這件事情其實用udf代替還會更快,因為不用單獨啟動一輪job,所以這也是一種可選方案。當然前提條件是維度表是固定的自然屬性(比如日期),只增加不修改(比如網站的頁面編號)的情況也可以考慮。如果維度有更新,要做緩慢變化維的,當然還是維表好維護。至于維表原本的一個主要用途OLAP,以Hive目前的性能是沒法實現的,也就不需要多慮了。
數據傾斜
所謂數據傾斜,說的是由于數據分布不均勻,個別值集中占據大部分數據量,加上hadoop的計算模式,導致計算資源不均勻引起性能下降。下圖就是一個例子:
還是拿網站的訪問日志說事吧。假設網站訪問日志中會記錄用戶的user_id,并且對于注冊用戶使用其用戶表的user_id,對于非注冊用戶使用一個user_id = 0代表。那么鑒于大多數用戶是非注冊用戶(只看不寫),所以user_id = 0占據了絕大多數。而如果進行計算的時候如果以user_id作為group by的維度或者是join key,那么個別reduce會收到比其他reduce多得多的數據——因為它要接收所有user_id = 0的記錄進行處理,使得其處理效果會非常差,其他reduce都跑完很久了它還在運行。
傾斜分成group by造成的傾斜和join造成的傾斜,需要分開看。
group by造成的傾斜有兩個參數可以解決,一個是hive.map.aggr,默認值已經為true,他的意思是做map aggregation,也就是在mapper里面做聚合。這個方法不同于直接寫mapreduce的時候可以實現的combiner,事實上各種基于mr的框架如pig,cascading等等用的都是map aggregation(或者叫partial aggregation)而非combiner的策略,也就是在mapper里面直接做聚合操作而不是輸出到buffer給combiner做聚合。對于map aggregation,hive還會做檢查,如果aggregation的效果不好,那么hive會自動放棄map aggregation。判斷效果的依據就是經過一小批數據的處理之后,檢查聚合后的數據量是否減小到一定的比例,默認是0.5,由hive.map.aggr.hash.min.reduction這個參數控制。所以如果確認數據里面確實有個別取值傾斜,但是大部分值是比較稀疏的,這個時候可以把比例強制設為1,避免極端情況下map aggr失效。hive.map.aggr還有一些相關參數,比如map aggr的內存占用等,具體可以參考這篇文章。另一個參數是hive.groupby.skewindata。這個參數的意思是做reduce操作的時候,拿到的key并不是所有相同值給同一個reduce,而是隨機分發,然后reduce做聚合,做完之后再做一輪MR,拿前面聚合過的數據再算結果。所以這個參數其實跟hive.map.aggr做的是類似的事情,只是拿到reduce端來做,而且要額外啟動一輪job,所以其實不怎么推薦用,效果不明顯。
如果碰到count distinct的情況需要優化,改寫SQL是一個比較簡便的方法,可以按照下面這么做:
?/*改寫前*/
select a, count(distinct b) as c from tbl group by a;
/*改寫后*/
select a, count(*) as c
from (select distinct a, b from tbl) group by a;
join造成的傾斜,就比如上面描述的網站訪問日志和用戶表兩個表join:
select a.* from logs a join users b on a.user_id = b.user_id;hive給出的解決方案叫skew join,其原理把這種user_id = 0的特殊值先不在reduce端計算掉,而是先寫入hdfs,然后啟動一輪map join專門做這個特殊值的計算,期望能提高計算這部分值的處理速度。當然你要告訴hive這個join是個skew join,即:
set hive.optimize.skewjoin = true;還有要告訴hive如何判斷特殊值,根據hive.skewjoin.key設置的數量hive可以知道,比如默認值是100000,那么超過100000條記錄的值就是特殊值。
skew join的流程可以用下圖描述:
另外對于特殊值的處理往往跟業務有關系,所以也可以從業務角度重寫sql解決。比如前面這種傾斜join,可以把特殊值隔離開來(從業務角度說,users表應該不存在user_id = 0的情況,但是這里還是假設有這個值,使得這個寫法更加具有通用性):
?select a.* from
(
select a.*
from (select * from logs where user_id = 0) a
join (select * from users where user_id = 0) b
on a.user_id = b.user_id
union all
select a.*
from logs a join users b
on a.user_id <> 0 and a.user_id = b.user_id
)t;
?
大部分時候傾斜是因為某一個特殊值,但是也有極端的情況是因為某一類特殊值,這往往是業務設計造成。比如對于商品item_id的編碼,除了本身的id序列,還人為的把item的類型也作為編碼放在最后兩位,這樣如果類型1的編碼是00,類型2的編碼是01,并且類型1是主要商品類,將會造成以00為結尾的商品整體傾斜。這時,如果reduce的數量恰好是100的整數倍,會造成partitioner把00結尾的item_id都hash到同一個reducer,引爆問題。當然,這種情況解決不難,只需要設置合適的reduce值,但是這種坑就會比較隱蔽。
SQL整體優化
前面對于單個job如何做優化已經做過詳細討論,但是hive查詢會生成多個job,針對多個job,有什么地方需要優化?
Job間并行
首先,在hive生成的多個job中,在有些情況下job之間是可以并行的,典型的就是子查詢。當需要執行多個子查詢union all或者join操作的時候,job間并行就可以使用了。比如下面的代碼就是一個可以并行的場景示意:
?select * from
(
select count(*) from logs
where log_date = 20130801 and item_id = 1
union all
select count(*) from logs
where log_date = 20130802 and item_id = 2
union all
select count(*) from logs
where log_date = 20130803 and item_id = 3
)t
?
設置job間并行的參數是hive.exec.parallel,將其設為true即可。默認的并行度為8,也就是最多允許sql中8個job并行。如果想要更高的并行度,可以通過hive.exec.parallel. thread.number參數進行設置,但要避免設置過大而占用過多資源。
減少Job數
另外在實際開發過程中也發現,一些實現思路會導致生成多余的job而顯得不夠高效。比如這個需求:查詢某網站日志中同時訪問過頁面a和頁面b的用戶數量。低效的思路是面向明細的,先取出看過頁面a的用戶,再取出看過頁面b的用戶,然后取交集,代碼如下:
?select count(*)
from
(select distinct user_id
from logs where page_name = 'a') a
join
(select distinct user_id
from logs where blog_owner = 'b') b
on a.user_id = b.user_id;
?
這樣一來,就要產生2個求子查詢的job,一個用于關聯的job,還有一個計數的job,一共有4個job。
但是我們直接用面向統計的方法去計算的話(也就是用group by替代join),則會更加符合M/R的模式,只需要用兩個job就能跑完:
select count (*) from (
select user_id
from logs group by user_id
having (count(case when page_name = 'a' then 1 end) *
count(case when page_name = 'b' then 1 end) > 0)
)t;
第一種查詢方法符合思考問題的直覺,是工程師和分析師在實際查數據中最先想到的寫法,但是如果在目前hive的query planner不是那么智能的情況下,想要更加快速的跑出結果,懂一點工具的內部機理也是必須的。
2015.01 updated:?最近本文被CSDN轉載。時隔一年多,hive已經有了很多變化,當然本文中的方法都還是適用的。本文中的一些內容(比如存儲格式)已經有了更好的解決辦法,在我比較新的blog中也有間接的體現。但是礙于精力有限,不會專門在本文中更新相關內容了。另外有網友指出原來文章中最后一段代碼是有問題的,經檢查確實是我的疏忽,描述也略有問題,現已在本文中改正。當然原有代碼體現出來的思路是沒有問題的,主要是語法細節的錯誤。
2015.12 updated:?更新了關于hive.map.aggr的解釋,并且補充了因為對字段人為編碼而造成的數據傾斜的案例。
原文鏈接:http://my.oschina.net/leejun2005/blog/308427?fromerr=eCxcpQ1Q
總結
以上是生活随笔為你收集整理的数据仓库中的SQL性能优化 - Hive篇的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hive JOIN使用详解
- 下一篇: Hive 基础(1):分区、桶、Sort