Hive JOIN使用详解
Hive是基于Hadoop平臺的,它提供了類似SQL一樣的查詢語言HQL。有了Hive,如果使用過SQL語言,并且不理解Hadoop MapReduce運行原理,也就無法通過編程來實現MR,但是你仍然可以很容易地編寫出特定查詢分析的HQL語句,通過使用類似SQL的語法,將HQL查詢語句提交Hive系統執行查詢分析,最終Hive會幫你轉換成底層Hadoop能夠理解的MR Job。
對于最基本的HQL查詢我們不再累述,這里主要說明Hive中進行統計分析時使用到的JOIN操作。在說明Hive JOIN之前,我們先簡單說明一下,Hadoop執行MR Job的基本過程(運行機制),能更好的幫助我們理解HQL轉換到底層的MR Job后是如何執行的。我們重點說明MapReduce執行過程中,從Map端到Reduce端這個過程(Shuffle)的執行情況,如圖所示(來自《Hadoop: The Definitive Guide》):
基本執行過程,描述如下:
通過上面的描述我們看到,在MR執行過程中,存在Shuffle過程的MR需要在網絡中的節點之間(Mapper節點和Reducer節點)拷貝數據,如果傳輸的數據量很大會造成一定的網絡開銷。而且,Map端和Reduce端都會通過一個特定的buffer來在內存中臨時緩存數據,如果無法根據實際應用場景中數據的規模來使用Hive,尤其是執行表的JOIN操作,有可能很浪費資源,降低了系統處理任務的效率,還可能因為內存不足造成OOME問題,導致計算任務失敗。
下面,我們說明Hive中的JOIN操作,針對不同的JOIN方式,應該如何來實現和優化:
生成一個MR Job
多表連接,如果多個表中每個表都使用同一個列進行連接(出現在JOIN子句中),則只會生成一個MR Job,例如:
SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)三個表a、b、c都分別使用了同一個字段進行連接,亦即同一個字段同時出現在兩個JOIN子句中,從而只生成一個MR Job。
生成多個MR Job
多表連接,如果多表中,其中存在一個表使用了至少2個字段進行連接(同一個表的至少2個列出現在JOIN子句中),則會至少生成2個MR Job,例如:
SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key2)三個表基于2個字段進行連接,這兩個字段b.key1和b.key2同時出現在b表中。連接的過程是這樣的:首先a和b表基于a.key和b.key1進行連接,對應著第一個MR Job;表a和b連接的結果,再和c進行連接,對應著第二個MR Job。
表連接順序優化
多表連接,會轉換成多個MR Job,每一個MR Job在Hive中稱為JOIN階段(Stage)。在每一個Stage,按照JOIN順序中的最后一個表應該盡量是大表,因為JOIN前一階段生成的數據會存在于Reducer的buffer中,通過stream最后面的表,直接從Reducer的buffer中讀取已經緩沖的中間結果數據(這個中間結果數據可能是JOIN順序中,前面表連接的結果的Key,數據量相對較小,內存開銷就小),這樣,與后面的大表進行連接時,只需要從buffer中讀取緩存的Key,與大表中的指定Key進行連接,速度會更快,也可能避免內存緩沖區溢出。例如:
SELECT a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)這個JOIN語句,會生成一個MR Job,在選擇JOIN順序的時候,數據量相比應該是b < c,表a和b基于a.key = b.key1進行連接,得到的結果(基于a和b進行連接的Key)會在Reducer上緩存在buffer中,在與c進行連接時,從buffer中讀取Key(a.key=b.key1)來與表c的c.key進行連接。
另外,也可以通過給出一些Hint信息來啟發JOIN操作,這指定了將哪個表作為大表,從而得到優化。例如:
?
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.valFROM a JOINb ON (a.key = b.key1) JOINc ON (c.key = b.key1)?
上述JOIN語句中,a表被視為大表,則首先會對表b和c進行JOIN,然后再將得到的結果與表a進行JOIN。
基于條件的LEFT OUTER JOIN優化
左連接時,左表中出現的JOIN字段都保留,右表沒有連接上的都為空。對于帶WHERE條件的JOIN語句,例如:
?SELECT a.val, b.valFROM a LEFTOUTER JOIN b ON (a.key=b.key)
WHERE a.ds='2009-07-07'AND b.ds='2009-07-07'
執行順序是,首先完成2表JOIN,然后再通過WHERE條件進行過濾,這樣在JOIN過程中可能會輸出大量結果,再對這些結果進行過濾,比較耗時。可以進行優化,將WHERE條件放在ON后,例如:
?SELECT a.val, b.valFROM a LEFTOUTER JOIN b
ON (a.key=b.keyAND b.ds='2009-07-07'AND a.ds='2009-07-07')
這樣,在JOIN的過程中,就對不滿足條件的記錄進行了預先過濾,可能會有更好的表現。
左半連接(LEFT SEMI JOIN)
左半連接實現了類似IN/EXISTS的查詢語義,使用關系數據庫子查詢的方式實現查詢SQL,例如:
SELECT a.key, a.valueFROM a WHEREa.key IN (SELECT b.key FROM b);使用Hive對應于如下語句:
SELECT a.key, a.valFROM a LEFTSEMI JOIN b ON (a.key= b.key)需要注意的是,在LEFT SEMI JOIN中,表b只能出現在ON子句后面,不能夠出現在SELECT和WHERE子句中。
關于子查詢,這里提一下,Hive支持情況如下:
- 在0.12版本,只支持FROM子句中的子查詢;
- 在0.13版本,也支持WHERE子句中的子查詢。
Map Side JOIN
Map Side JOIN優化的出發點是,Map任務輸出后,不需要將數據拷貝到Reducer節點,降低的數據在網絡節點之間傳輸的開銷。
多表連接,如果只有一個表比較大,其他表都很小,則JOIN操作會轉換成一個只包含Map的Job,例如:
對于表a數據的每一個Map,都能夠完全讀取表b的數據。這里,表a與b不允許執行FULL OUTER JOIN、RIGHT OUTER JOIN。
BUCKET Map Side JOIN
我們先看兩個表a和b的DDL,表a為:
?CREATE TABLEa(key INT, othera STRING)
CLUSTERED BY(key)INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY'\001'
COLLECTION ITEMS TERMINATED BY'\002'
MAP KEYS TERMINATED BY'\003'
STORED ASSEQUENCEFILE;
表b為:
?CREATE TABLEb(key INT, otherb STRING)
CLUSTERED BY(key)INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY'\001'
COLLECTION ITEMS TERMINATED BY'\002'
MAP KEYS TERMINATED BY'\003'
STORED ASSEQUENCEFILE;
現在要基于a.key和b.key進行JOIN操作,此時JOIN列同時也是BUCKET列,JOIN語句如下:
SELECT /*+ MAPJOIN(b) */ a.key, a.valueFROM a JOINb ON a.key = b.key并且表a有4個BUCKET,表b有32個BUCKET,默認情況下,對于表a的每一個BUCKET,都會去獲取表b中的每一個BUCKET來進行JOIN,這回造成一定的開銷,因為只有表b中滿足JOIN條件的BUCKET才會真正與表a的BUCKET進行連接。
這種默認行為可以進行優化,通過改變默認JOIN行為,只需要設置變量:
這樣,JOIN的過程是,表a的BUCKET 1只會與表b中的BUCKET 1進行JOIN,而不再考慮表b中的其他BUCKET 2~32。
如果上述表具有相同的BUCKET,如都是32個,而且還是排序的,亦即,在表定義中在CLUSTERED BY(key)后面增加如下約束:
則上述JOIN語句會執行一個Sort-Merge-Bucket (SMB) JOIN,同樣需要設置如下參數來改變默認行為,優化JOIN時只遍歷相關的BUCKET即可:
?
?set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin =true;
set hive.optimize.bucketmapjoin.sortedmerge =true;
?
關于更多的有關JOIN優化,可以參考后面的鏈接。
參考內容
- 《Hadoop: The Definitive Guide》
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Joins
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+JoinOptimization
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries
- https://cwiki.apache.org/confluence/display/Hive/OuterJoinBehavior
原文鏈接:http://shiyanjun.cn/archives/588.html
總結
以上是生活随笔為你收集整理的Hive JOIN使用详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark读取配置源码剖析
- 下一篇: 数据仓库中的SQL性能优化 - Hive