mapreduce
MapReduce原語
hadoop MapReduce框架可以讓你的應用在集群中
可靠地
容錯地
并行
處理TB級別的數據
1024TB=1PB 1024PB=1EB 1024EB=1ZB
MapReduce原語
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-jWz2TKVZ-1617443696182)(media/c60e85f9619bb7b2fb877f4d1bac2ca2.emf)]
**“相同”**key的鍵值對為一組調用一次reduce方法,方法內迭代這一組數據進行計算
分組比較器
YARN:資源管理框架
ResourceManager:一個 主
NodeManager:很多,每個DataNode上有一個 從
Container(容器):CPU、內存
公司為了營業,掙錢租老王家的寫字樓
公司相當于MR作業
MR任務相當于公司員工,員工干活,相當于MR的任務運行。
員工在辦公室干活,任務在容器運行。
每個容器同時運行一個任務
客人提出訂幾間房
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-9a6YZbr8-1617443696184)(media/bbfa9b838f96a98e608931306c1f703e.gif)]
1、一個ResourceManager主節點
2、每個DataNode上一個NodeManager從節點
3、每個運行于MapReduce的程序有一個MRAppMaster
公司的運作流程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-GFJgTixZ-1617443696190)(media/9d191214bcec7a942dea5b7fcd1ee99c.png)]
1、MapReduce將輸入的數據集邏輯切片 split
2、map任務以并行方式處理切片數據
3、框架對map輸出排序,然后將數據發送給reduce
4、MapReduce的輸入輸出數據存在于同一個文件系統(HDFS)
5、框架負責任務調度、任務監控和失敗任務的重新執行
容錯地、可靠地、并行計算
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-DqilgIpr-1617443696193)(media/c60e85f9619bb7b2fb877f4d1bac2ca2.emf)]
1、MapReduce處理鍵值對形式的很多鍵值對輸入,生成鍵值對形式的很多鍵值對輸出
2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable接口。框架會對鍵進行排序,因此必須實現WritableComparable接口。
3、map輸出鍵值對類型和reduce鍵值對輸入類型一致
4、map的輸入鍵值對類型和輸出鍵值對類型一般不一致
5、reduce的輸入鍵值對類型和輸出鍵值對類型一般不一致
盡管hadoop框架是java開發的,MapReduce應用不一定得java開發。
hadoop
streaming允許用戶使用可執行文件的方式提供mapper和reducer,創建和執行作業。
hadoop pipes是一個跟SWIG兼容的C++ API,用于開發MapReduce應用(不基于JNI)。
mapreduce工作流程
為什么叫MapReduce:MapTask & ReduceTask
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3dww0Wir-1617443696195)(media/9d191214bcec7a942dea5b7fcd1ee99c.png)]
1、每個block會有map任務
2、block切分為切片,每個切片對應一個map任務,默認一個block一個切片,一個map
3、map默認按行讀取切片數據,組成鍵值對<當前行字節偏移量, “讀到的行字符串”>
4、map函數對該鍵值對進行計算,輸出若干鍵值對。<key, value, partition>
partition指定該鍵值對由哪個reducer進行處理
5、map輸出的kvp寫到環形緩沖區,環形緩沖區默認100MB,閾值80%,當環緩達到80%就向磁盤溢寫小文件,該小文件首先按照分區號排序,相同分區號的按key進行排序。
6、默認如果落磁盤的小文件達到了3個,則進行歸并,歸并的大文件也是按分區號排序,相同分區號按照key進行排序。只是一個歸并。
7、如果map任務處理完了,它的輸出被下載到reducer所在主機
按照HTTP GET的方式下載到reducer:
reducer發送HTTP GET請求到mapper主機下載數據,該過程是洗牌shuffle
8、每個map任務都要經歷運行結束洗牌的過程
9、可以設置combinClass,先在map端對數據進行一個壓縮,比如10w個<hello,1>壓縮為1個<hello,
10w>通過網絡IO洗牌,肯定要快很多。一般情況下,combineClass就是一個reducerClass。
combinerClass的設置要求數據算法滿足結合律。
交換律
1+2=2+1
結合律
1+2+3=(1+2)+3=1+(2+3)
map1 5/3
map2 7/6 reduce: 5/3+7/6+8/11 =? reduce:(5+7+8)/(3+6+11)
map3 8/11
map任務結束
reeduce任務開始
9、等所有map任務都運行結束,并且洗牌結束,每個reducer獲取到它自己應得的所有數據,此時開始reducer處理過程。
10、如果有時間,reduce會對洗牌獲取的數據進行歸并落磁盤
如果沒有時間,也歸并,只是可能不落磁盤,直接交給reduce方法進行迭代處理了。
洗牌獲取到的數據也可能不落磁盤,此時歸并的鍵值對來源可能是磁盤的和內存的一個混合。
11、reduce按照key進行分組,每個分組調用一次reduce方法,該方法迭代計算,將結果寫到HDFS輸出。
當一個map任務計算結束,所有的reduce需要使用http
get請求獲取各自分區編號的數據,當所有map任務結束后,開始reduce計算階段。
blk按照設置進行切片,一個切片對應一個map任務,map按行讀取切片內容,以鍵值對的形式發給map方法(<“偏移量”,
“zifuchuan”>)
當map對當前簡直對計算完成,要寫到環形緩沖區,在寫之前要計算該鍵值對的分區編號
默認情況下,key的hash值對reduce個數取模。
當環形緩沖區大小達到到80%的時候,需要向磁盤溢寫數據,在溢寫的時候需要對鍵值對按照分區排序,分區內按照key的字典序排序(快排排序)
溢寫的小文件如果達到3個,則進行歸并,歸并為大文件,大文件也是按照分區排序,分區內按照key的字典序排序。
當一個map任務處理完它的切片的數據,此時所有的reduce任務到該map的機器以http
get請求獲取各自編號分區的數據,下載到reduce本地
reduce獲取到map的數據后,如果有時間,也會進行歸并
并不能保證此時所有的map都計算結束了。
只有當所有的map計算結束,同時reduce獲取到所有的數據之后,才開始進行reduce計算。
按照原語,相同key的鍵值對為一組,調用一次reduce方法,方法內迭代這組數據計算,結果輸出到HDFS中。
mapreduce是一套分布式計算的流程、框架
數單詞游戲:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-H8HqM0Wh-1617443696202)(media/bb5b3192202e5c554768df019932206a.png)]
getFileBlockLocations(new Path(), offset, len);
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-29JDtWXU-1617443696203)(media/2bd3d6e6214eef0ddccf84f4d9e256a2.jpeg)]
reduce從map端拉取數據的過程稱為洗牌shuffle
通過網絡拉取,慢!!!
要對map端數據進行壓縮:
Combiner:
<hello, 1> 1000萬個 <hello, 1000萬>
但是不能保證combiner什么時候都能用:
需要計算滿足結合律:(A+B)+C=A+(B+C)
job.setCombinerClass(MyReducer.class)
8/9
4/7 REDUCE: (8+4+2)/(9+7+11)
2/11
也不能保證combiner什么時候都用得上:
環形緩沖區小文件歸并,進行combiner,如果不歸并,沒有combiner過程。
reducer通過HTTP按照分區號獲取map輸出文件的數據。map端有一個HTTP服務處理該reducer的HTTP請求。該HTTP服務最大線程數由mapreduce.shuffle.Max.threads屬性指定。這個屬性指定nodemanager的線程數,而不是對map任務指定線程數(該數字在多個不同的任務之間共享),因為nodemanager上有可能運行了好幾個map任務。默認值是0,表示最大線程數是服務器處理器核心數的兩倍。
map輸出文件位于運行map任務的本地磁盤。一個reduce任務需要從集群中多個map任務獲取指定分區的數據。多個map任務有可能是在不同時間完成的,每當一個map任務運行完,reduce就從該map任務獲取指定分區數據。reduce任務會以多線程的方式從多個map任務并行獲取指定分區數據。默認線程數是5,可以通過mapreduce.reduce.shuffle.parallelcopies屬性指定。
reducer拷貝map的輸出如果很小,則放在內存中(mapreduce.reduce.shuffle.input.buffer.percent指定堆空間百分比)否則拷貝到磁盤。當內存緩沖區數據大小達到閾值(mapreduce.reduce.shuffle.merge.percent
)或map輸出文件個數達到閾值(mapreduce.reduce.merge.inmem.threshold
),就發生文件合并溢寫到磁盤上。如果指定combiner,此處也會進行combine。
二次排序(先了解)
在map階段按照key對鍵值對進行排序,對值不排序。如果相對value進行排序,就需要二次排序。
需求:查找每年的最高氣溫
數據格式:年份為key,每天的氣溫是value
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-dInwPhpb-1617443696216)(media/4c0ee2608106c3eba3cd4c7b6de0ac90.png)]
所謂二次排序:
1、新的key應該是輸入的key和value的組合
2、按照復合key進行比較排序
3、分區比較器和分組比較器只對復合key中的原生key進行分區和分組
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ANIzevjh-1617443696217)(media/b401109fdf220e7d8e8f806744b93bef.jpeg)]
總結
Map:
1、根據業務需求處理數據并映射為KV模型
2、并行分布式
3、計算向數據移動
Reduce:
1、數據全量/分量加工
2、Reducer中可以包含不同的key 分區的范圍大于分組
3、相同分區的Key匯聚到一個Reducer中
4、“相同”的Key調用一次reduce方法
5、排序和比較實現key的匯聚
K,V使用自定義數據類型 MyKey:WritableComparable
MyValue:Writable
1、節省開發成本,提高程序自由度
2、框架會對鍵和值序列化,因此鍵類型和值類型需要實現Writable接口。
3、框架會對鍵進行排序,因此必須實現WritableComparable接口。
作業:
mapreduce處理過程,自己的語言寫
java API操作HDFS
MR作業提交流程
YARN
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-u3keUIAM-1617443696219)(media/bbfa9b838f96a98e608931306c1f703e.gif)]
ResourceManager管理集群中所有的資源
通過NodeManager管理
NodeManager通過Container管理資源
Container包裝資源:CPU/內存/IO
MapReduce作業
AppMaster 調度
向RM申請資源
MapTask
ReduceTask
客戶端:
RM客戶端:用于申請資源
AM客戶端:用于跟AppMaster交互
YARN:解耦資源與計算
ResourceManager
主,核心
集群節點資源管理
NodeManager
與RM匯報資源
管理Container生命周期
計算框架中的資源都以Container表示
Container:【由節點NM管理,CPU,MEM,I/O大小,啟動命令】
內存:1024MB
CPU:1個虛擬核心 vcore
默認NodeManager啟動線程監控Container大小,超出申請資源額度,kill
支持Linux內核的Cgroup
MR :
AppMaster 擁有 RM客戶端
作業為單位,避免單點故障,負載到不同的節點
創建Task,需要和RM申請資源(Container)
Task-Container
Map任務
Reduce任務
Client:
RM-Client:請求資源創建AM
AM-Client:與AM交互
YARN:Yet Another Resource Negotiator;
Hadoop 2.0新引入的資源管理系統,直接從MRv1演化而來的;
核心思想:將MRv1中JobTracker的資源管理和任務調度兩個功能分開,分別由ResourceManager和ApplicationMaster進程實現
ResourceManager:負責整個集群的資源管理和調度
ApplicationMaster:負責應用程序相關的事務,比如任務調度、任務監控和容錯等
YARN的引入,使得多個計算框架可運行在一個集群中
每個應用程序對應一個ApplicationMaster
目前多個計算框架可以運行在YARN上,比如MapReduce、Spark、Storm等
MapReduce On YARN:MRv2
將MapReduce作業直接運行在YARN上,而不是由JobTracker和TaskTracker構建的MRv1系統中
基本功能模塊
YARN:負責資源管理和調度
MRAppMaster:負責任務切分、任務調度、任務監控和容錯等
MapTask/ReduceTask:任務驅動引擎,與MRv1一致
每個MapRduce作業對應一個MRAppMaster
MRAppMaster任務調度
YARN將資源分配給MRAppMaster
MRAppMaster進一步將資源分配給內部的任務
MRAppMaster容錯
失敗后,由YARN重新啟動
任務失敗后,MRAppMaster重新申請資源
ResourceManager掛怎么辦?RM-HA
流程
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ATFguCez-1617443696223)(media/c6fd6979490a8e495297ad6ef26c1b16.png)]
1、客戶端,提交MapReduce作業
2、YARN的資源管理器(Resource Manager),協調集群中計算資源的分配
3、YARN的節點管理器(Node Manager),啟動并監控集群中的計算容器
4、MapReduce的Application Master,協調MapReduce作業中任務的運行。Application
Master和MapReduce任務運行于容器中,這些容器由resourcemanager調度,由nodemanager管理。
5、分布式文件系統(一般是HDFS),在組件之間共享作業數據。
Job對象的submit方法創建了一個內部的JobSubmitter實例并調用該實例的submitJobInternal方法。一旦提交了作業,waitForCompletion方法每秒鐘輪詢作業的執行進度,如果進度發生了變化,則向控制臺報告進度。當作業成功完成,展示作業計數器的數據。否則展示作業失敗的錯誤日志信息。
客戶端:JobSubmitter實現的作業提交的過程有如下幾個步驟:
1、向resourcemanager申請一個新的application ID,用于MapReduce作業的ID
2、檢查作業的輸出。如果沒有指定輸出或者輸出路徑已經存在,則不提交作業,MapReduce程序拋異常
3、計算作業的輸入切片。如果不能計算切片(比如輸入路徑不存在等),不提交作業,MR程序拋異常。
4、拷貝執行作業需要的資源到共享文件系統的以作業ID命名的目錄中,這些資源包括作業的jar包,配置文件,計算好的輸入切片。作業的jar包有一個很高的副本數量(mapreduce.client.submit.file.replication指定,默認值是10),這樣當nodemanager如果運行作業中的任務,會有很多副本可以訪問。
5、調用resourcemanager的submitApplication方法提交作業。
1、YARN為請求分配一個容器,resourcemanager通過容器所在節點上的nodemanager在該容器中啟動application
master進程。
2、MapReduce作業的application master是一個java
app,主入口類是MRAppMaster。從HDFS抽取客戶端計算好的輸入切片,為每一個切片創建一個map任務對象,以及一定數量的reduce任務對象.
application
master會為作業中所有的map任務以及reduce任務向resourcemanager請求容器。為map任務的請求會首先進行并且相對于reduce任務請求有更高的優先級。當map任務完成率達到了5%之后才會為reduce任務發送容器請求。
appmaster從hdfs抽取客戶端上傳的信息,計算好map對象和reduce對象,首先向resourcemanager為map任務申請資源,當map任務完成5%之后為reduce任務申請資源
reduce任務可以運行于集群中的任意位置,而map任務會有本地讀取數據的限制。移動計算而不是數據。數據本地。次之為機架本地。
請求會指定每個任務需要的內存和cpu資源。默認情況下為每個map任務或reduce任務分配1024MB的內存和一個虛擬核心。這些值對于每個作業都是可以配置的:mapreduce.map.memory.mb,
mapreduce.reduce.memory.mb
mapreduce.map.cpu.vcores
以及mapreduce.reduce.cpu.vcores。
一旦resourcemanager在一個節點上的一個容器中為一個任務分配了資源,application
master與nodemanager通信,啟動容器。任務通過一個java
app來執行,該app的主入口類是YarnChild。在它可以開始任務的執行之前,它要本地化任務需要的資源,包括jar包,配置文件,以及分布式緩存中存儲的其他共享文件。最后,它開始運行map任務或者reduce任務。
當作業的最后一個任務完成并通知application
master,AppMaster就更改作業的狀態為”successfully”。作業就打印信息告知客戶端,客戶端waitForCompletion方法返回。此時也會在控制臺打印作業的統計信息和計數器的信息。
作業完成,application
master所在容器和任務所在容器銷毀工作狀態(中間的輸出結果刪除)。作業的信息被作業歷史服務器存檔以備以后查詢使用。
YARN RM-HA搭建
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-wyGLn1kt-1617443696224)(media/38e4465d3690ae6ac907e6a4a9929403.jpeg)]
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5s7YOIJU-1617443696226)(media/53d92bd970ab864252bb6f27178aba0d.png)]
mapred-site.xml
local/classic/yarn
指定mr作業運行的框架:要么本地運行,要么使用MRv1,要么使用yarn
yarn-site.xml
將配置文件在四臺服務器同步
scp node[234]:`pwd`
首先啟動HDFS
start-ha.sh
在node3或node4上執行命令:
start-yarn.sh
在node4或者node3上執行命令:
yarn-daemon.sh start resourcemanager
停止:
在node3或者node4上執行:
stop-yarn.sh
在node4或者node3上執行:
yarn-deamon.sh stop resourcemanager
http://node3:8088
http://node4:8088
訪問resourcemanager的web頁面
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-cIOfTNJL-1617443696228)(media/d34354b54c107b394a5f48b070ddff97.png)]
運行自帶的wordcount
運行的命令:
cd $HADOOP_HOME
cd share/hadoop/mapreduce
hadoop jar hadoop-mapreduce-examples-2.6.5.jar wordcount /input /output
*input:是hdfs文件系統中數據所在的目錄
*ouput:是hdfs中不存在的目錄,mr程序運行的結果會輸出到該目錄
輸出目錄內容:
-rw-r–r-- 3 root supergroup 0 2017-07-02 02:49 /mr/test/output/_SUCCESS
-rw-r–r-- 3 root supergroup 49 2017-07-02 02:49 /mr/test/output/part-r-00000
/_SUCCESS:是信號/標志文件
/part-r-00000:是reduce輸出的數據文件
r:reduce的意思,00000是對應的reduce編號,多個reduce會有多個數據文件
啟動腳本和停止腳本:
start-hdfs-ha-rm-ha.sh
stop-hdfs-ha-rm-ha.sh
動手寫wordcount
1、新建eclipse的java項目
2、添加hadoop的jar包依賴
121個jar包
$HADOOP_HOME/share/hadoop/{common,common/lib,hdfs,hdfs/lib,mapreduce,mapreduce/lib,tools/lib,yarn,yarn/lib}.jar
3、添加hadoop的配置文件到類路徑
從集群拷貝這四個文件到當前項目類路徑
core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
4、編寫Mapper、Reducer以及MainClass
wordcount
WCMapper.java
WCReducer.java
MainClass.java
5、打包
只打包三個類就可以。
6、上傳
7、運行
yarn jar </path/to/your/jar.jar> /<inputpath> /<outputpath>
如果想本地運行,則可以如此設置:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-Uof8DGaX-1617443696230)(media/1a2f3abbec656874452bccf4caf08cd3.png)]
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
5、打包
只打包三個類就可以。
6、上傳
7、運行
yarn jar </path/to/your/jar.jar> /<inputpath> /<outputpath>
如果想本地運行,則可以如此設置:
[外鏈圖片轉存中…(img-Uof8DGaX-1617443696230)]
總結
- 上一篇: hdu 2977 Color Squar
- 下一篇: Java进阶——注解