HIVE查询优化
所有的調優都離不開對CPU、內存、IO這三樣資源的權衡及調整
Hive QL的執行本質上是MR任務的運行,因此優化主要考慮到兩個方面:Mapreduce任務優化、SQL語句優化
一、Mapreduce任務優化
1、設置合理的task數量(map task、reduce task)
這里有幾個考慮的點,一方面Hadoop MR task的啟動及初始化時間較長,如果task過多,可能會導致任務啟動和初始化時間遠超邏輯處理時間,這種情況白白浪費了計算資源。另一方面,如果任務復雜,task過少又會導致任務遲遲不能完成,這種情況又使計算資源沒有充分利用。
因為其讀取輸入使用Hadoop API,所以其map數量由以下參數共同決定:
minSize = Math.max(job.getLong(“mapred.min.split.size”, 1), minSplitSize);
mapred.min.split.size:設置每個map處理的最小數據量
minSplitSize:一般默認為都為1,可由子類復寫函數protected void setMinSplitSize(long minSplitSize) 重新設置。
blockSize:默認的HDFS文件塊大小
goalSize=totalSize/numSplits 期望的每個Map處理的split大小,僅僅是期望的
numSplits :是在 job 啟動時通過JobConf.setNumMapTasks(int n) 設置的值,是給框架的map數量的提示 totalSize :整個job所有輸入的總大小
splitSize的計算方式如下:
max( minSize, min(blockSize, goalSize) )
最終map數量由以下方式計算得出:
map數量=totalSize/splitSize
可以看出在調整map數量時,可通過調整blockSize和mapred.min.split.size的方式實現,但是調整blockSize可能并不現實,所以程序執行時通過設置mapred.min.split.size參數來設定。
當然,需要特別注意的是,如果文件特別大,需要支持分割才能進行分片,產生多個map,否則單個文件不可分割那就一個map。如果文件都特別小(比blockSize都小),可以使用CombineFileInputFormat將input path中的小文件合并成再送給mapper處理。
reduce task個數確定:
1)設置set mapred.reduce.tasks=?
2)若1)沒有設置,hive通過下面兩個參數來計算
set hive.exec.reducers.maxs=? (每個job最大的task數量)
set hive.exec.reducers.bytes.per.reducer = ? (每個reduce任務處理的數據量,默認為1G)
2、對小文件合并
過多的小文件對于執行引擎起map reduce任務進行處理非常不利,一個是每個小文件起一個task去處理,非常浪費資源,另一個是過多的小文件也對NameNode造成很大壓力(每個小文件都要記錄一個元數據 150 byte)。所以,減少小文件的數量也是一個優化措施。
盡可能避免產生:
1)在執行hive查詢時合理設置reduce的數量
2)可以使用hadoop archive命令將多個小文件進行歸檔
3)動態分區插入數據可能會產生大量小文件,所以在使用時盡量避免動態分區
4)對map和reduce的輸出進行merge
set hive.merge.mapfiles = true //設置map端輸出進行合并,默認為true
set hive.merge.mapredfiles = true //設置reduce端輸出進行合并,默認為false
set hive.merge.size.per.task = 25610001000 //設置合并文件的大小
set hive.merge.smallfiles.avgsize=16000000 //當輸出文件的平均大小小于該值時,啟動一個獨立的MapReduce任務進行文件merge
若已產生小文件:
1)set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat 用于設置在執行map前對小文件進行合并,以下參數設置決定了合并文件的大小,并最終減小map任務數
set mapred.max.split.size=256000000 //每個Map最大輸入大小(這個值決定了合并后文件的數量)
set mapred.min.split.size.per.node=100000000 //一個節點上split的最小大小(這個值決定了多個DataNode上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000 //一個機架下split的最小大小(這個值決定了多個機架上的文件是否需要合并)
3、避免數據傾斜
不管task數量是多少,發生數據傾斜就會大大影響查詢效率。
1)設置hive.map.aggr=true
該配置可將頂層的聚合操作放在map端進行,以減輕reduce端的壓力。
4、并行執行
Hive會將查詢轉化成一個或者多個階段,這些階段可能包括:MR階段、抽樣階段、合并階段、limit階段等。默認單次執行一個階段,可進行如下配置,使得兩個并不沖突的階段可以并行執行,提高查詢效率。
set hive.exec.parallel=true // 可以開啟并行執行
set hive.exec.parallel.thread.number=16 // 同一個sql允許最大并行度,默認為8
5、本地模式
這一情況適用于數據集小的情況,并且啟動和運行MR任務消耗的時間可能超過邏輯處理的時候,可配置hive在適當情況下自動進行此項優化
條件如下:
輸入數據量大小必須小于 hive.exec.mode.local.auto.inputbytes.max(默認128MB)
map任務數量必須小于 hive.exec.mode.local.auto.tasks.max(默認4)
需開啟設置:
set hive.exec.mode.local.auto=true // 默認為false
6、JVM重用
hadoop也通過JVM來執行map或者reduce任務,當任務非常多時,啟動并初始化任務的耗時會大大增加,有時甚至比邏輯執行時間都要長,所以可通過JVM重用使得JVM實例在同一個job時可以由多task重用,避免增加過多的啟動和初始化時間??赏ㄟ^如下參數設置:
set mapred.job.reuse.jvm.num.tasks=10; // 10為重用個數
二、SQL語句優化
SQL優化有些原則是通用的,部分可參考前一篇總結:
這里針對HIVE來進行一些說明:
1、列裁剪分區裁剪這些基本縮小查找范圍的自不必說。
2、join優化
1)在查詢語句中如果小表在join的左邊,那么hive可能將小表放入分布式緩存,實現map-side join。我們不用考慮哪張表是小表而調整join順序,新版hive提供了 hive.auto.convert.join 可以來優化,默認是啟動的。
hive.mapjoin.smalltable.filesize=25000000(默認值) // 小于該值的表在join時被hive認為是小表
2)在特定情況下,大表也可以使用map-side join,需要滿足以下條件
- 表數據必須按照ON語句中的鍵進行分桶
- 其中一張表的分桶數必須是另一張表的分桶數的倍數
hive這種情況下在map階段可以進行分桶連接,這一功能需要通過如下設置進行開啟
set hive.enforce.sortmergebucketmapjoin=true
如果分桶表數據是按照連接鍵或者桶的鍵進行排序的,hive可以進行一個更快的合并-連接(sort-merge join),需要如下配置
hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat
hive.auto.convert.sortmerge.join=true
hive.enforce.sortmergebucketmapjoin=true
3、group by操作
可先進行map端部分聚合,然后在reduce端最終聚合,進行如下設置
set hive.map.aggr=true; // 開啟Map端聚合參數設置
set hive.grouby.mapaggr.checkinterval=100000; // 聚合的鍵對應的記錄條數超過這個值則會進行分拆
如下設置可在出現數據傾斜時分兩個MR作業來完成,而不是在一個job中完成
set hive.groupby.skewindata = true (默認為false) // 在第一步mapreduce中map的結果隨機分布于到reduce,于是這一步中每個reduce只做部分聚合,在下一個mapreduce中再進行最終聚合。
4、使用LEFT SEMI JOIN (左半連接)替代 IN/EXISTS 子查詢
hive中前者 LEFT SEMI JOIN 是后者 IN/EXISTS 更高效的實現
例如:
select a.id, a.name from a where a.id in (select b.id from b);
應該改為 select a.id, a.name from a left semi job b on a.id=b.id;
5、選擇合適的分區、分桶、排序方式
distribute by : 用來在map端按鍵對數據進行拆分,根據reduce的個數進行數據分發,默認是采用hash算法
cluster by :除了有distribute by的功能外,還能對查詢結果進行排序,等于distribute by + sort by
sort by :數據在進入reducer之前就排好序,根據指定值對進入到同一reducer的所有行進行排序
order by :對所有數據進行排序(全局有序),若數據量很大,可能需要很多時間
注:嚴格模式( hive.mapred.mode=strict 默認為 nonstrict ) 用于 hive 在特定情況下阻止任務的提交,針對以下三種情況
(1)對于分區表,不加分區字段過濾條件,不能執行
(2)對于order by語句,必須使用limit語句
(3)限制笛卡爾積的查詢(join的時候不使用on,而使用where的)
三、其它
1、存儲格式
優先選擇列式存儲格式如orc、parquet,這將使hive查詢時避免全部掃描,列數據存儲在一起,只需拿相應列即可,能夠大大縮短響應時間
2、壓縮方式
在mapreduce的世界中,性能瓶頸主要來自于磁盤IO和網絡IO,CPU基本不構成壓力,根據需要選擇合適的壓縮方式,如下圖可參考
3、vectorized query( 向量化查詢 Hive 0.13中引入 )
通過一次性批量執行1024行而不是每次單行執行,提供掃描、聚合、篩選器和連接等操作的性能。通過如下方式啟用:
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
4、CBO(cost based query optimization)基于成本的優化
hive 0.14.0 加入,hive1.1.0 后默認開啟,hive底層自動優化多個join的順序并選擇合適的join算法,優化每個查詢的執行邏輯和物理執行計劃。使用時需進行如下設置
set hive.cbo.enable = true;
set hive.compute.query.using.stats = true;
set hive.stats.fetch.column.stats = true;
set hive.stats.fetch.partition.stats = true;
5、推測執行
偵測執行緩慢的任務(通常由于負載不均衡或者資源分布原因導致),通過執行其備份任務對相同數據進行重算,加速獲取單個task的結果,以此來提高整體的執行效率,可由如下參數控制:
set mapred.map.tasks.speculative.execution=true
set mapred.reduce.tasks.speculative.execution=true
以上內容,如有錯誤,請看到的小伙伴幫忙指正,多謝。
總結
- 上一篇: 使用medusa进行ssh爆破
- 下一篇: 跳槽没有20%以上的加薪就等于降薪?我: