2022-02-09大数据学习日志——Hadoop离线阶段——Hive窗口函数、性能调优
學習目標
掌握窗口函數的使用 知道Hive數據壓縮、文件存儲格式 掌握Hive通用調優(重要的見下述大綱)內容大綱
#Hive窗口函數(Window function)開窗函數分組TopN級聯累加問題連續登陸 #Hive的性能調優hive的數據文件格式 數據壓縮行式存儲 列式存儲(ORC parquet)hive通用調優 *join優化*group by數據傾斜優化*task并行度問題其他通用調優01_Apache Hive 窗口函數 快速理解與語法
1.1 快速理解窗口函數功能
-
window function 窗口函數、開窗函數、olap分析函數。
-
窗口:可以理解為操作數據的范圍,窗口有大有小,本窗口中操作的數據有多有少。
-
可以簡單地解釋為類似于聚合函數的計算函數,但是通過GROUP BY子句組合的常規聚合會隱藏正在聚合的各個行,最終輸出一行;而窗口函數聚合后還可以訪問當中的各個行,并且可以將這些行中的某些屬性添加到結果集中。
1.2 窗口函數語法
具有OVER語句的函數叫做窗口函數。
Function(arg1,..., argn) OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>])--其中Function(arg1,..., argn) 可以是下面分類中的任意一個--聚合函數:比如sum max avg等--排序函數:比如rank row_number等--分析函數:比如lead lag first_value等--OVER [PARTITION BY <...>] 類似于group by 用于指定分組 每個分組你可以把它叫做窗口 --如果沒有PARTITION BY 那么整張表的所有行就是一組--[ORDER BY <....>] 用于指定每個分組內的數據排序規則 支持ASC、DESC--[<window_expression>] 用于指定每個窗口中 操作的數據范圍 默認是窗口中所有行- 建表加載數據 后續練習使用
02_Apache Hive 窗口函數 聚合函數和窗口表達式
2.1 聚合函數
-
語法
sum|max|min|avg OVER ([PARTITION BY <...>] [ORDER BY <....>] [<window_expression>]) -
重點:有PARTITION BY 沒有PARTITION BY的區別;有ORDER BY沒有ORDER BY的區別。
- 有沒有partition by 影響的是全局聚合 還是分組之后 每個組內聚合。
- 有沒有order by的區別:
- 沒有order by,聚合的時候是組內所有的數據聚合再一起 全局聚合
- 如果有order by,聚合的時候是累加聚合,默認是第一行聚合到當前行。
-
栗子
--1、求出每個用戶總pv數 sum+group by普通常規聚合操作 select cookieid,sum(pv) as total_pv from website_pv_info group by cookieid; +-----------+-----------+ | cookieid | total_pv | +-----------+-----------+ | cookie1 | 26 | | cookie2 | 35 | +-----------+-----------+--2、sum+窗口函數 總共有四種用法 注意是整體聚合 還是累積聚合 --sum(...) over( )對表所有行求和 --sum(...) over( order by ... ) 連續累積求和 --sum(...) over( partition by... ) 同組內所行求和 --sum(...) over( partition by... order by ... ) 在每個分組內,連續累積求和--需求:求出網站總的pv數 所有用戶所有訪問加起來 --sum(...) over( )對表所有行求和 select cookieid,createtime,pv,sum(pv) over() as total_pv from website_pv_info;--需求:求出每個用戶總pv數 --sum(...) over( partition by... ),同組內所行求和 select cookieid,createtime,pv,sum(pv) over(partition by cookieid) as total_pv from website_pv_info;--需求:求出每個用戶截止到當天,累積的總pv數 --sum(...) over( partition by... order by ... ),在每個分組內,連續累積求和 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime) as current_total_pv from website_pv_info; +-----------+-------------+-----+-------------------+ | cookieid | createtime | pv | current_total_pv | +-----------+-------------+-----+-------------------+ | cookie1 | 2018-04-10 | 1 | 1 | | cookie1 | 2018-04-11 | 5 | 6 | | cookie1 | 2018-04-12 | 7 | 13 | | cookie1 | 2018-04-13 | 3 | 16 | | cookie1 | 2018-04-14 | 2 | 18 | | cookie1 | 2018-04-15 | 4 | 22 | | cookie1 | 2018-04-16 | 4 | 26 | | cookie2 | 2018-04-10 | 2 | 2 | | cookie2 | 2018-04-11 | 3 | 5 | | cookie2 | 2018-04-12 | 5 | 10 | | cookie2 | 2018-04-13 | 6 | 16 | | cookie2 | 2018-04-14 | 3 | 19 | | cookie2 | 2018-04-15 | 9 | 28 | | cookie2 | 2018-04-16 | 7 | 35 | +-----------+-------------+-----+-------------------+
2.2 窗口表達式 window_expression
直譯叫做window表達式 ,通俗叫法稱之為window子句。
-
功能:控制窗口操作的范圍。
-
語法
rows between- preceding:往前- following:往后- current row:當前行- unbounded:起點- unbounded preceding 表示從前面的起點 第一行- unbounded following:表示到后面的終點 最后一行 -
栗子
--默認從第一行到當前行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime) as pv1 from website_pv_info;--第一行到當前行 等效于rows between不寫 默認就是第一行到當前行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime rows between unbounded preceding and current row) as pv2 from website_pv_info;--向前3行至當前行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and current row) as pv4 from website_pv_info;--向前3行 向后1行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime rows between 3 preceding and 1 following) as pv5 from website_pv_info;--當前行至最后一行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime rows between current row and unbounded following) as pv6 from website_pv_info;--第一行到最后一行 也就是分組內的所有行 select cookieid,createtime,pv,sum(pv) over(partition by cookieid order by createtime rows between unbounded preceding and unbounded following) as pv6 from website_pv_info;
03_Apache Hive 窗口函數 排序函數(row_number等)
-
功能:主要對數據分組排序之后,組內順序標號。
-
核心函數:row_number、rank、dense_rank
-
適合場景:分組TopN問題(注意哦 不是全局topN)
-
栗子
SELECTcookieid,createtime,pv,RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn1,DENSE_RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn2,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn3 FROM website_pv_info;--需求:找出每個用戶訪問pv最多的Top3 重復并列的不考慮 SELECT * from (SELECTcookieid,createtime,pv,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS seq FROM website_pv_info) tmp where tmp.seq <4; -
ntile函數
-
功能:將分組排序之后的數據分成指定的若干個部分(若干個桶)
-
規則:盡量平均分配 ,優先滿足最小的桶,彼此最多不相差1個。
-
栗子
--把每個分組內的數據分為3桶 SELECTcookieid,createtime,pv,NTILE(3) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn2 FROM website_pv_info ORDER BY cookieid,createtime;--需求:統計每個用戶pv數最多的前3分之1天。 --理解:將數據根據cookieid分 根據pv倒序排序 排序之后分為3個部分 取第一部分 SELECT * from (SELECTcookieid,createtime,pv,NTILE(3) OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rnFROM website_pv_info) tmp where rn =1;
-
04_Apache Hive 窗口函數 lag、lead函數
--LAG 用于統計窗口內往上第n行值 SELECT cookieid,createtime,url,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,LAG(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS last_1_time,LAG(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS last_2_time FROM website_url_info;--LEAD 用于統計窗口內往下第n行值 SELECT cookieid,createtime,url,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,LEAD(createtime,1,'1970-01-01 00:00:00') OVER(PARTITION BY cookieid ORDER BY createtime) AS next_1_time,LEAD(createtime,2) OVER(PARTITION BY cookieid ORDER BY createtime) AS next_2_time FROM website_url_info;--FIRST_VALUE 取分組內排序后,截止到當前行,第一個值 SELECT cookieid,createtime,url,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,FIRST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS first1 FROM website_url_info;--LAST_VALUE 取分組內排序后,截止到當前行,最后一個值 SELECT cookieid,createtime,url,ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY createtime) AS rn,LAST_VALUE(url) OVER(PARTITION BY cookieid ORDER BY createtime) AS last1 FROM website_url_info;05_Apache Hive 文件存儲格式(text、ORC、parquet)
-
列式存儲、行式存儲
- 數據最終在文件中底層以什么樣的形成保存。
-
Hive中表的數據存儲格式,不是只支持text文本格式,還支持其他很多格式。
-
hive表的文件格式是如何指定的呢? 建表的時候通過STORED AS 語法指定。如果沒有指定默認都是textfile。
-
Hive中主流的幾種文件格式。
-
textfile 文件格式
-
ORC、Parquet 列式存儲格式。
都是列式存儲格式,底層是以二進制形式存儲。數據存儲效率極高,對于查詢賊方便。 二進制意味著肉眼無法直接解析,hive可以自解析。 -
栗子
分別使用3種不同格式存儲數據,去HDFS上查看底層文件存儲空間的差異。
--1、創建表,存儲數據格式為TEXTFILE create table log_text ( track_time string, url string, session_id string, referer string, ip string, end_user_id string, city_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE; --如果不寫stored as textfile 默認就是textfile--加載數據 load data local inpath '/root/hivedata/log.data' into table log_text;--2、創建表,存儲數據格式為ORC create table log_orc( track_time string, url string, session_id string, referer string, ip string, end_user_id string, city_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS orc ;--向表中插入數據 思考為什么不能使用load命令加載? 因為load是純復制移動操作 不會調整文件格式。 insert into table log_orc select * from log_text;--3、創建表,存儲數據格式為parquet create table log_parquet( track_time string, url string, session_id string, referer string, ip string, end_user_id string, city_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS PARQUET ;--向表中插入數據 insert into table log_parquet select * from log_text ;
-
06_Apache Hive 數據壓縮和文件格式搭配(ORC+snappy)
-
Hive的默認執行引擎是MapReduce,因此通常所說的Hive壓縮指的是MapReduce的壓縮。
-
壓縮是指通過算法對數據進行重新編排,降低存儲空間。無損壓縮。
-
MapReduce可以在兩個階段進行數據壓縮
- map的輸出
- 減少shuffle的數據量 提高shuffle時網絡IO的效率
- reduce的輸出
- 減少輸出文件的大小 降低磁盤的存儲空間
- map的輸出
-
壓縮的弊端
- 浪費時間
- 消耗CPU、內存
- 某些優秀的壓縮算法需要錢
-
壓縮的算法(推薦使用snappy)
Snappy org.apache.hadoop.io.compress.SnappyCodec -
Hive中壓縮的設置:注意 本質還是指的是MapReduce的壓縮
--設置Hive的中間壓縮 也就是map的輸出壓縮 1)開啟 hive 中間傳輸數據壓縮功能 set hive.exec.compress.intermediate=true; 2)開啟 mapreduce 中 map 輸出壓縮功能 set mapreduce.map.output.compress=true; 3)設置 mapreduce 中 map 輸出數據的壓縮方式 set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;--設置Hive的最終輸出壓縮,也就是Reduce輸出壓縮 1)開啟 hive 最終輸出數據壓縮功能 set hive.exec.compress.output=true; 2)開啟 mapreduce 最終輸出數據壓縮 set mapreduce.output.fileoutputformat.compress=true; 3)設置 mapreduce 最終數據輸出壓縮方式 set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec; 4)設置 mapreduce 最終數據輸出壓縮為塊壓縮 還可以指定RECORD set mapreduce.output.fileoutputformat.compress.type=BLOCK; --設置完畢之后 只有當HiveSQL底層通過MapReduce程序執行 才會涉及壓縮。 --已有普通格式的表 select * from student_hdfs;--ctas語句 create table student_snappy as select * from student_hdfs ; -
在實際開發中,可以根據需求選擇不同的文件格式并且搭配不同的壓縮算法。可以得到更好的存儲效果。
--不指定壓縮格式 代表什么呢? --orc 存儲文件默認采用ZLIB 壓縮。比 snappy 壓縮的小 STORED AS orc; --2.78M--以ORC格式存儲 不壓縮 STORED AS orc tblproperties ("orc.compress"="NONE"); --7.69M--以ORC格式存儲 使用snappy壓縮 STORED AS orc tblproperties ("orc.compress"="SNAPPY"); --3.78M
07_Apache Hive 通用調優 fetch抓取機制、MR本地模式
7.1 Fetch抓取機制
-
功能:在執行sql的時候,能不走MapReduce程序處理就盡量不走MapReduce程序處理。
-
盡量直接去操作數據文件。
-
官方描述
Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incur RS – ReduceSinkOperator, requiring a MapReduce task), lateral views and joins.應該是單一數據源 沒有子查詢 沒有聚合操作 沒有去重操作 沒有側視圖 沒有join -
設置: hive.fetch.task.conversion= more。
--在下述3種情況下 sql不走mr程序--全局查找 select * from student; --字段查找 列裁剪 select num,name from student; --limit 限制查找 select num,name from student limit 2;
7.2 mapreduce本地模式
-
功能:如果非要執行MapReduce程序,能夠本地執行的,盡量不提交yarn上執行。
-
默認是關閉的。意味著只要走MapReduce就提交yarn執行。
mapreduce.framework.name = local 本地模式 mapreduce.framework.name = yarn 集群模式 -
Hive提供了一個參數,自動切換MapReduce程序為本地模式,如果不滿足條件,就執行yarn模式。
set hive.exec.mode.local.auto = true;--3個條件必須都滿足 自動切換本地模式 The total input size of the job is lower than: hive.exec.mode.local.auto.inputbytes.max (128MB by default) --數據量小于128MThe total number of map-tasks is less than: hive.exec.mode.local.auto.tasks.max (4 by default) --maptask個數少于4個The total number of reduce tasks required is 1 or 0. --reducetask個數是0 或者 1 -
切換Hive的執行引擎
WARNING: Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.如果針對Hive的調優依然無法滿足你的需求 還是效率低, 嘗試使用spark計算引擎 或者Tez.
08_Apache Hive 通用調優 join優化
底層還是MapReduce的join優化
8.1 map join
適合于小表join大表或者小表Join小表
#是否開啟自動轉為mapjoin 在滿足條件的情況下 默認true hive.auto.convert.join=trueHive老版本 #如果參與的一個表大小滿足條件 轉換為map join hive.mapjoin.smalltable.filesize=25000000 Hive2.0之后版本 #是否啟用基于輸入文件的大小,將reduce join轉化為Map join的優化機制。 hive.auto.convert.join.noconditionaltask=true #如果上述參數為true,假設參與join的表(或分區)有N個,并且有N-1個表(或分區)的大小總和小于下述參數指定的值,那么會直接轉為Map join。 hive.auto.convert.join.noconditionaltask.size=10000000 默認10M8.2 reduce join
適合于大表Join大表
如果不滿足map端join,那么就只能走reduce端join了,在hive中也把reduce端join叫做common join.8.3 bucket join
適合于大表Join大表
-
方式1:Bucktet Map Join
將表進行分桶,每次join時分桶參與而不是整張表參與,相當于小表join了。
語法: clustered by colName(參與join的字段) 參數: set hive.optimize.bucketmapjoin = true 要求: 分桶字段 = Join字段 ,分桶的個數相等或者成倍數,必須是在map join中 -
方式2:Sort Merge Bucket Join(SMB)
基于有序的數據Join 語法:clustered by colName sorted by (colName) 參數set hive.optimize.bucketmapjoin = true;set hive.auto.convert.sortmerge.join=true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.auto.convert.sortmerge.join.noconditionaltask=true;要求: 分桶字段 = Join字段 = 排序字段,分桶的個數相等或者成倍數
09_Apache Hive 通用調優 數據傾斜優化
9.1 group by數據傾斜
-
方案一:開啟Map端聚合
hive.map.aggr=true; #是否在Hive Group By 查詢中使用map端聚合。 #這個設置可以將頂層的部分聚合操作放在Map階段執行,從而減輕清洗階段數據傳輸和Reduce階段的執行時間,提升總體性能。但是指標不治本。 -
方案二:實現隨機分區
select * from table distribute by rand(); -
方案三:數據傾斜時自動負載均衡
9.2 join數據傾斜
-
方案一:提前過濾(比如分區裁剪),將大表數據變成小表數據,爭取實現Map Join
-
方案二:使用Bucket Join
-
方案三:使用Skew Join
#Skew Join原理:將Map Join和Reduce Join進行合并使用。 如果某個值出現了數據傾斜,就會將產生數據傾斜的數據單獨使用Map Join來實現,如果不是傾斜的,則按正常的reduce端join流程進行。其他沒有產生數據傾斜的數據由Reduce Join來實現,這樣就避免了Reduce Join中產生數據傾斜的問題最終將Map Join的結果和Reduce Join的結果進行Union合并#開啟運行過程中skewjoin set hive.optimize.skewjoin=true; #如果這個key的出現的次數超過這個范圍 set hive.skewjoin.key=100000; #在編譯時判斷是否會產生數據傾斜 set hive.optimize.skewjoin.compiletime=true; set hive.optimize.union.remove=true; #如果Hive的底層走的是MapReduce,必須開啟這個屬性,才能實現不合并 set mapreduce.input.fileinputformat.input.dir.recursive=true;
10_Apache Hive 通用調優 MR程序task個數調整
10.1 maptask個數
-
如果是在MapReduce中 maptask是通過邏輯切片機制決定的。
-
但是在hive中,影響的因素很多。比如邏輯切片機制,文件是否壓縮、壓縮之后是否支持切割。
-
因此在Hive中,調整MapTask的個數,直接去HDFS調整文件的大小和個數,效率較高。
如果小文件多,就進行小文件的合并 合并的大小最好=block size 如果大文件多,就調整blocl size
10.2 reducetask個數
-
如果在MapReduce中,通過代碼可以直接指定 job.setNumReduceTasks(N)
-
在Hive中,reducetask個數受以下幾個條件控制的
(1)每個 Reduce 處理的數據量默認是 256MB hive.exec.reducers.bytes.per.reducer=256000000 (2)每個任務最大的 reduce 數,默認為 1009 hive.exec.reducsers.max=1009 (3)mapreduce.job.reduces 該值默認為-1,由 hive 自己根據任務情況進行判斷。--如果用戶用戶不設置 hive將會根據數據量或者sql需求自己評估reducetask個數。 --用戶可以自己通過參數設置reducetask的個數set mapreduce.job.reduces = N --用戶設置的不一定生效,如果用戶設置的和sql執行邏輯有沖突,比如order by,在sql編譯期間,hive又會將reducetask設置為合理的個數。 Number of reduce tasks determined at compile time: 1
11_Apache Hive 通用調優 執行計劃
-
通過執行計劃可以看出hive接下來是如何打算執行這條sql的。
-
語法格式:explain + sql語句
-
栗子
explain select * from student;+----------------------------------------------------+ | Explain | +----------------------------------------------------+ | STAGE DEPENDENCIES: | | Stage-0 is a root stage | | | | STAGE PLANS: | | Stage: Stage-0 | | Fetch Operator | | limit: -1 | | Processor Tree: | | TableScan | | alias: student | | Statistics: Num rows: 1 Data size: 5260 Basic stats: COMPLETE Column stats: NONE | | Select Operator | | expressions: num (type: int), name (type: string), sex (type: string), age (type: int), dept (type: string) | | outputColumnNames: _col0, _col1, _col2, _col3, _col4 | | Statistics: Num rows: 1 Data size: 5260 Basic stats: COMPLETE Column stats: NONE | | ListSink | | | +----------------------------------------------------+
12_Apache Hive 通用調優 并行機制、推測執行機制
12.1 并行執行機制
-
如果hivesql的底層某些stage階段可以并行執行,就可以提高執行效率。
-
前提是stage之間沒有依賴 并行的弊端是瞬時服務器壓力變大。
-
參數
set hive.exec.parallel=true; --是否并行執行作業。適用于可以并行運行的 MapReduce 作業,例如在多次插入期間移動文件以插入目標 set hive.exec.parallel.thread.number=16; --最多可以并行執行多少個作業。默認為8。
12.2 Hive的嚴格模式
-
注意。不要和動態分區的嚴格模式搞混淆。
-
這里的嚴格模式指的是開啟之后 hive會禁止一些用戶都影響不到的錯誤包括效率低下的操作,不允許運行一些有風險的查詢。
-
設置
set hive.mapred.mode = strict --默認是嚴格模式 nonstrict -
解釋
1、如果是分區表,沒有where進行分區裁剪 禁止執行 2、order by語句必須+limit限制
12.3 推測執行機制
- MapReduce中task的一個機制。
- 功能:
- 一個job底層可能有多個task執行,如果某些拖后腿的task執行慢,可能會導致最終job失敗。
- 所謂的推測執行機制就是通過算法找出拖后腿的task,為其啟動備份的task。
- 兩個task同時處理一份數據,誰先處理完,誰的結果作為最終結果。
- 推測執行機制默認是開啟的,但是在企業生產環境中建議關閉。
總結
以上是生活随笔為你收集整理的2022-02-09大数据学习日志——Hadoop离线阶段——Hive窗口函数、性能调优的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Knoll Light Factory
- 下一篇: 西北农林科技大学研究生学位论文“参考文献