大数据之MapReduce详解(MR的运行机制及配合WordCount实例来说明运行机制)
- 目錄
- 前言:
- 1、MapReduce原理
- 2、mapreduce實踐(WordCount實例)
目錄
今天先總體說下MapReduce的相關(guān)知識,后續(xù)將會詳細說明對應(yīng)的shuffle、mr與yarn的聯(lián)系、以及mr的join操作的等知識。以下內(nèi)容全是個人學(xué)習后的見解,如有遺漏或不足請大家多多指教。
前言:
為什么要MAPREDUCE
(1)海量數(shù)據(jù)在單機上處理因為硬件資源限制,無法勝任
(2)而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復(fù)雜度和開發(fā)難度
(3)引入mapreduce框架后,開發(fā)人員可以將絕大部分工作集中在業(yè)務(wù)邏輯的開發(fā)上,而將分布式計算中的復(fù)雜性交由框架來處理。
設(shè)想一個海量數(shù)據(jù)場景下的wordcount需求:
單機版:內(nèi)存受限,磁盤受限,運算能力受限分布式:
1、文件分布式存儲(HDFS)
2、運算邏輯需要至少分成2個階段(一個階段獨立并發(fā),一個階段匯聚)
3、運算程序如何分發(fā)
4、程序如何分配運算任務(wù)(切片)
5、兩階段的程序如何啟動?如何協(xié)調(diào)?
6、整個程序運行過程中的監(jiān)控?容錯?重試?
可見在程序由單機版擴成分布式時,會引入大量的復(fù)雜工作。為了提高開發(fā)效率,可以將分布式程序中的公共功能封裝成框架,讓開發(fā)人員可以將精力集中于業(yè)務(wù)邏輯。
而mapreduce就是這樣一個分布式程序的通用框架,其應(yīng)對以上問題的整體結(jié)構(gòu)如下:
1、MRAppMaster(mapreduce application master)
2、MapTask
3、ReduceTask
1、MapReduce原理
Mapreduce是一個分布式運算程序的編程框架,是用戶開發(fā)“基于hadoop的數(shù)據(jù)分析應(yīng)用”的核心框架;
Mapreduce核心功能是將用戶編寫的業(yè)務(wù)邏輯代碼和自帶默認組件整合成一個完整的分布式運算程序,并發(fā)運行在一個hadoop集群上;
Mapreduce框架結(jié)構(gòu)及核心運行機制
1.1、結(jié)構(gòu)
一個完整的mapreduce程序在分布式運行時有三類實例進程 :
1、MRAppMaster:負責整個程序的過程調(diào)度及狀態(tài)協(xié)調(diào)
2、mapTask:負責map階段的整個數(shù)據(jù)處理流程
3、ReduceTask:負責reduce階段的整個數(shù)據(jù)處理流程
1.2、mapreduce框架的設(shè)計思想
這里面有兩個任務(wù)的分配過程:1、總的任務(wù)切割分配給各個mapTask,不同的mapTask再將得到的hashmap按照首字母劃分,分配給各個reduceTask。
1.3、mapreduce程序運行的整體流程(wordcount運行過程的解析)
流程解析
(job.split:負責任務(wù)的切分,形成一個任務(wù)切片規(guī)劃文件。
wc.jar:要運行的jar包,包含mapper、reducer、Driver等java類。
job.xml:job的其他配置信息:如指定map是哪個類,reduce是那個類,以及輸入數(shù)據(jù)的路徑在哪,輸出數(shù)據(jù)的路徑在哪等配置信息。)
前提:客戶端提交任務(wù)給yarn后(提交前會進行任務(wù)的規(guī)劃),yarn利用ResouceManager去找到mrAppmaster.
1、 一個mr程序啟動的時候,最先啟動的是MRAppMaster,MRAppMaster啟動后根據(jù)本次job的描述信息,計算出需要的maptask實例數(shù)量,然后向集群申請機器啟動相應(yīng)數(shù)量的maptask進程
2、 maptask進程啟動之后,根據(jù)給定的數(shù)據(jù)切片范圍進行數(shù)據(jù)處理,主體流程為:
a) 利用客戶指定的inputformat來獲取RecordReader讀取數(shù)據(jù),形成輸入KV對(框架干的事)
b) 將輸入KV對傳遞給客戶定義的map()方法,做邏輯運算,并將map()方法輸出的KV對收集到緩存
c) 將緩存中的KV對按照K分區(qū)排序后不斷溢寫到磁盤文件
3、 MRAppMaster監(jiān)控到所有maptask進程任務(wù)完成之后,會根據(jù)客戶指定的參數(shù)啟動相應(yīng)數(shù)量的reducetask進程,并告知reducetask進程要處理的數(shù)據(jù)范圍(數(shù)據(jù)分區(qū))
4、 Reducetask進程啟動之后,根據(jù)MRAppMaster告知的待處理數(shù)據(jù)所在位置,從若干臺maptask運行所在機器上獲取到若干個maptask輸出結(jié)果文件,并在本地進行重新歸并排序,然后按照相同key的KV為一個組,調(diào)用客戶定義的reduce()方法進行邏輯運算,并收集運算輸出的結(jié)果KV,然后調(diào)用客戶指定的outputformat將結(jié)果數(shù)據(jù)輸出到外部存儲(對應(yīng)的就是context.write方法)
2、mapreduce實踐(WordCount實例)
編程規(guī)范:
(1)用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端) (2)Mapper的輸入數(shù)據(jù)是KV對的形式(KV的類型可自定義) (3)Mapper的輸出數(shù)據(jù)是KV對的形式(KV的類型可自定義) (4)Mapper中的業(yè)務(wù)邏輯寫在map()方法中 (5)map()方法(maptask進程)對每一個<K,V>調(diào)用一次 (6)Reducer的輸入數(shù)據(jù)類型對應(yīng)Mapper的輸出數(shù)據(jù)類型,也是KV (7)Reducer的業(yè)務(wù)邏輯寫在reduce()方法中 (8)Reducetask進程對每一組相同k的<k,v>組調(diào)用一次reduce()方法 (9)用戶自定義的Mapper和Reducer都要繼承各自的父類 (10)整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象WordCount程序
mapper類
reducer類
package mr_test; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** //生命周期:框架每傳遞進來一個k相同的value 組,reduce方法就被調(diào)用一次* KEYIN, VALUEIN 對應(yīng) mapper輸出的KEYOUT,VALUEOUT類型對應(yīng)* KEYOUT, VALUEOUT 是自定義reduce邏輯處理結(jié)果的輸出數(shù)據(jù)類型* KEYOUT是單詞* VLAUEOUT是總次數(shù)*/ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /*** <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>* <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>* <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>* 入?yún)ey,是一組相同單詞kv對的key*/@Overrideprotected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count=0;for(IntWritable value:values){count+=value.get(); }context.write(key, new IntWritable(count));} }Driver類 用來描述job并提交job
package mr_test; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /*** 相當于一個yarn集群的客戶端* 需要在此封裝我們的mr程序的相關(guān)運行參數(shù),指定jar包* 最后提交給yarn*/ public class WordcountDriver {public static void main(String[] args) throws IOException, Exception, InterruptedException {Configuration cf = new Configuration(); // 把這個程序打包成一個Job來運行Job job = Job.getInstance(); //指定本程序的jar包所在的本地路徑j(luò)ob.setJarByClass(WordcountDriver.class); //指定本業(yè)務(wù)job要使用的mapper/Reducer業(yè)務(wù)類job.setMapperClass(WorldcountMapper.class);job.setReducerClass(WordcountReducer.class); //指定mapper輸出數(shù)據(jù)的kv類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); //指定最終輸出的數(shù)據(jù)的kv類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); //指定job的輸入原始文件所在目錄FileInputFormat.setInputPaths(job, new Path(args[0])); //指定job的輸出結(jié)果所在目錄FileOutputFormat.setOutputPath(job, new Path(args[1])); //將job中配置的相關(guān)參數(shù),以及job所用的java類所在的jar包,提交給yarn去運行boolean res = job.waitForCompletion(true);System.exit(res?0:1); } }總結(jié)
以上是生活随笔為你收集整理的大数据之MapReduce详解(MR的运行机制及配合WordCount实例来说明运行机制)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 清除windows 系统中 任意已安装任
- 下一篇: win7打印服务经常无缘无故自动停止pr