Hadoop学习笔记—4.初识MapReduce
一、神馬是高大上的MapReduce
MapReduce是Google的一項(xiàng)重要技術(shù),它首先是一個(gè) 編程模型 ,用以進(jìn)行大數(shù)據(jù)量的計(jì)算。對(duì)于大 數(shù)據(jù)量的計(jì)算,通常采用的處理手法就是并行計(jì)算。但對(duì)許多開發(fā)者來說,自己完完全全實(shí)現(xiàn)一個(gè)并行計(jì)算程序難度太大,而MapReduce就是一種簡(jiǎn)化并行 計(jì)算的編程模型,它使得那些沒有多有多少并行計(jì)算經(jīng)驗(yàn)的開發(fā)人員也可以開發(fā)并行應(yīng)用程序。這也就是MapReduce的價(jià)值所在, 通過簡(jiǎn)化編程模型,降低了開發(fā)并行應(yīng)用的入門門檻 。
1.1 MapReduce是什么
Hadoop MapReduce是一個(gè)軟件框架,基于該框架能夠容易地編寫應(yīng)用程序,這些應(yīng)用程序能夠運(yùn)行在由上千個(gè)商用機(jī)器組成的大集群上,并以一種可靠的,具有容 錯(cuò)能力的方式并行地處理上TB級(jí)別的海量數(shù)據(jù)集。這個(gè)定義里面有著這些關(guān)鍵詞,一是軟件框架,二是并行處理,三是可靠且容錯(cuò),四是大規(guī)模集群,五是海量數(shù) 據(jù)集。
因此,對(duì)于MapReduce,可以簡(jiǎn)潔地認(rèn)為,它是一個(gè)軟件框架,海量數(shù)據(jù)是它的“菜”,它在大規(guī)模集群上以一種可靠且容錯(cuò)的方式并行地“烹飪這道菜”。
1.2 MapReduce做什么
簡(jiǎn)單地講,MapReduce可以做 大數(shù)據(jù)處理 。所謂大數(shù)據(jù)處理,即以價(jià)值為導(dǎo)向,對(duì)大數(shù)據(jù)加工、挖掘和優(yōu)化等各種處理。
MapReduce擅長(zhǎng)處理大數(shù)據(jù),它為什么具有這種能力呢?這可由MapReduce的設(shè)計(jì)思想發(fā)覺。MapReduce的思想就是“ 分而治之 ”。
(1)Mapper負(fù)責(zé)“分”,即把復(fù)雜的任務(wù)分解為若干個(gè)“簡(jiǎn)單的任務(wù)”來處理。“簡(jiǎn)單的任務(wù)”包含三層含義:一是數(shù)據(jù)或計(jì)算的規(guī)模相對(duì)原任 務(wù)要大大縮小;二是就近計(jì)算原則,即任務(wù)會(huì)分配到存放著所需數(shù)據(jù)的節(jié)點(diǎn)上進(jìn)行計(jì)算;三是這些小任務(wù)可以并行計(jì)算,彼此間幾乎沒有依賴關(guān)系。
(2)Reducer負(fù)責(zé)對(duì)map階段的結(jié)果進(jìn)行匯總。至于需要多少個(gè)Reducer,用戶可以根據(jù)具體問題,通過在mapred-site.xml配置文件里設(shè)置參數(shù)mapred.reduce.tasks的值,缺省值為1。
一個(gè)比較形象的語言解釋MapReduce:
We want to count all the books in the library. You count up shelf #1, I count up shelf #2. That’s map. The more people we get, the faster it goes.
我們要數(shù)圖書館中的所有書。你數(shù)1號(hào)書架,我數(shù)2號(hào)書架。這就是“ Map ”。我們?nèi)嗽蕉?#xff0c;數(shù)書就更快。
Now we get together and add our individual counts. That’s reduce.
現(xiàn)在我們到一起,把所有人的統(tǒng)計(jì)數(shù)加在一起。這就是“ Reduce ”。
1.3 MapReduce工作機(jī)制
MapReduce的整個(gè)工作過程如上圖所示,它包含如下4個(gè)獨(dú)立的實(shí)體:
實(shí)體一: 客戶端 ,用來提交MapReduce作業(yè)。
實(shí)體二: JobTracker ,用來協(xié)調(diào)作業(yè)的運(yùn)行。
實(shí)體三: TaskTracker ,用來處理作業(yè)劃分后的任務(wù)。
實(shí)體四: HDFS ,用來在其它實(shí)體間共享作業(yè)文件。
通過審閱MapReduce的工作流程圖,可以看出MapReduce整個(gè)工作過程有序地包含如下工作環(huán)節(jié):
二、Hadoop中的MapReduce框架
在Hadoop中,一個(gè)MapReduce作業(yè)通常會(huì)把輸入的數(shù)據(jù)集切分為若干獨(dú)立的數(shù)據(jù)塊,由Map任務(wù)以完全并行的方式去處理它們。框架會(huì) 對(duì)Map的輸出先進(jìn)行排序,然后把結(jié)果輸入給Reduce任務(wù)。通常作業(yè)的輸入和輸出都會(huì)被存儲(chǔ)在文件系統(tǒng)中,整個(gè)框架負(fù)責(zé)任務(wù)的調(diào)度和監(jiān)控,以及重新執(zhí) 行已經(jīng)關(guān)閉的任務(wù)。
通常,MapReduce框架和分布式文件系統(tǒng)是運(yùn)行在一組相同的節(jié)點(diǎn)上,也就是說,計(jì)算節(jié)點(diǎn)和存儲(chǔ)節(jié)點(diǎn)通常都是在一起的。這種配置允許框架在那些已經(jīng)存好數(shù)據(jù)的節(jié)點(diǎn)上高效地調(diào)度任務(wù),這可以使得整個(gè)集群的網(wǎng)絡(luò)帶寬被非常高效地利用。
2.1 MapReduce框架的組成
(1)JobTracker
JobTracker負(fù)責(zé)調(diào)度構(gòu)成一個(gè)作業(yè)的所有任務(wù),這些任務(wù)分布在不同的TaskTracker上(由上圖的JobTracker可以看到 2 assign map 和 3 assign reduce)。你可以將其理解為公司的項(xiàng)目經(jīng)理,項(xiàng)目經(jīng)理接受項(xiàng)目需求,并劃分具體的任務(wù)給下面的開發(fā)工程師。
(2)TaskTracker
TaskTracker負(fù)責(zé)執(zhí)行由JobTracker指派的任務(wù),這里我們就可以將其理解為開發(fā)工程師,完成項(xiàng)目經(jīng)理安排的開發(fā)任務(wù)即可。
2.2 MapReduce的輸入輸出
MapReduce框架運(yùn)轉(zhuǎn)在 <key,value> 鍵值對(duì)上,也就是說,框架把作業(yè)的輸入看成是一組<key,value>鍵值對(duì),同樣也產(chǎn)生一組<key,value>鍵值對(duì)作為作業(yè)的輸出,這兩組鍵值對(duì)有可能是不同的。
一個(gè)MapReduce作業(yè)的輸入和輸出類型如下圖所示:可以看出在整個(gè)流程中,會(huì)有三組<key,value>鍵值對(duì)類型的存在。
2.3 MapReduce的處理流程
這里以WordCount單詞計(jì)數(shù)為例,介紹map和reduce兩個(gè)階段需要進(jìn)行哪些處理。單詞計(jì)數(shù)主要完成的功能是:統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù),如圖所示:
(1)map任務(wù)處理
(2)reduce任務(wù)處理
三、第一個(gè)MapReduce程序:WordCount
WordCount單詞計(jì)數(shù)是最簡(jiǎn)單也是最能體現(xiàn)MapReduce思想的程序之一,該程序完整的代碼可以在Hadoop安裝包的src/examples目錄下找到。
WordCount單詞計(jì)數(shù)主要完成的功能是: 統(tǒng)計(jì)一系列文本文件中每個(gè)單詞出現(xiàn)的次數(shù) ;
3.1 初始化一個(gè)words.txt文件并上傳HDFS
首先在Linux中通過Vim編輯一個(gè)簡(jiǎn)單的words.txt,其內(nèi)容很簡(jiǎn)單如下所示:
Hello Edison Chou Hello Hadoop RPC Hello Wncud Chou Hello Hadoop MapReduce Hello Dick Gu通過Shell命令將其上傳到一個(gè)指定目錄中,這里指定為:/testdir/input
3.2 自定義Map函數(shù)
在Hadoop 中,?map 函數(shù)位于內(nèi)置類org.apache.hadoop.mapreduce. Mapper <KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函數(shù)位于內(nèi)置類org.apache.hadoop. mapreduce. Reducer <KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。
我們要做的就是 覆蓋map 函數(shù)和reduce 函數(shù) ,首先我們來覆蓋map函數(shù):繼承Mapper類并重寫map方法
/*** @author Edison Chou* @version 1.0* @param KEYIN* →k1 表示每一行的起始位置(偏移量offset)* @param VALUEIN* →v1 表示每一行的文本內(nèi)容* @param KEYOUT* →k2 表示每一行中的每個(gè)單詞* @param VALUEOUT* →v2 表示每一行中的每個(gè)單詞的出現(xiàn)次數(shù),固定值為1*/ public static class MyMapper extendsMapper<LongWritable, Text, Text, LongWritable> {protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {String[] spilted = value.toString().split(" ");for (String word : spilted) {context.write(new Text(word), new LongWritable(1L));}}; }Mapper 類,有四個(gè)泛型,分別是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面兩個(gè)KEYIN、VALUEIN 指的是map 函數(shù)輸入的參數(shù)key、value 的類型;后面兩個(gè)KEYOUT、VALUEOUT 指的是map 函數(shù)輸出的key、value 的類型;
從代碼中可以看出,在Mapper類和Reducer類中都使用了Hadoop自帶的基本數(shù)據(jù)類型,例如String對(duì)應(yīng)Text,long對(duì)應(yīng) LongWritable,int對(duì)應(yīng)IntWritable。這是因?yàn)镠DFS涉及到序列化的問題,Hadoop的基本數(shù)據(jù)類型都實(shí)現(xiàn)了一個(gè) Writable接口,而實(shí)現(xiàn)了這個(gè)接口的類型都支持序列化。
這里的map函數(shù)中通過空格符號(hào)來分割文本內(nèi)容,并對(duì)其進(jìn)行記錄;
3.3 自定義Reduce函數(shù)
現(xiàn)在我們來覆蓋reduce函數(shù):繼承Reducer類并重寫reduce方法
/*** @author Edison Chou* @version 1.0* @param KEYIN* →k2 表示每一行中的每個(gè)單詞* @param VALUEIN* →v2 表示每一行中的每個(gè)單詞的出現(xiàn)次數(shù),固定值為1* @param KEYOUT* →k3 表示每一行中的每個(gè)單詞* @param VALUEOUT* →v3 表示每一行中的每個(gè)單詞的出現(xiàn)次數(shù)之和*/ public static class MyReducer extendsReducer<Text, LongWritable, Text, LongWritable> {protected void reduce(Text key,java.lang.Iterable<LongWritable> values,Reducer<Text, LongWritable, Text, LongWritable>.Context context)throws java.io.IOException, InterruptedException {long count = 0L;for (LongWritable value : values) {count += value.get();}context.write(key, new LongWritable(count));}; }Reducer 類,也有四個(gè)泛型,同理,分別指的是reduce 函數(shù)輸入的key、value類型(這里輸入的key、value類型通常和map的輸出key、value類型保持一致)和輸出的key、value 類型。
這里的reduce函數(shù)主要是將傳入的<k2,v2>進(jìn)行最后的合并統(tǒng)計(jì),形成最后的統(tǒng)計(jì)結(jié)果。
3.4 設(shè)置Main函數(shù)
(1)設(shè)定輸入目錄,當(dāng)然也可以作為參數(shù)傳入
public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";(2)設(shè)定輸出目錄( 輸出目錄需要是空目錄 ),當(dāng)然也可以作為參數(shù)傳入
public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";(3)Main函數(shù)的主要代碼
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 0.0:首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(conf, "WordCount"); job.setJarByClass(MyWordCountJob.class); // 1.0:指定輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 1.1:指定對(duì)輸入數(shù)據(jù)進(jìn)行格式化處理的類(可以省略) job.setInputFormatClass(TextInputFormat.class); // 1.2:指定自定義的Mapper類 job.setMapperClass(MyMapper.class); // 1.3:指定map輸出的<K,V>類型(如果<k3,v3>的類型與<k2,v2>的類型一致則可以省略) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 1.4:分區(qū)(可以省略) job.setPartitionerClass(HashPartitioner.class); // 1.5:設(shè)置要運(yùn)行的Reducer的數(shù)量(可以省略) job.setNumReduceTasks(1); // 1.6:指定自定義的Reducer類 job.setReducerClass(MyReducer.class); // 1.7:指定reduce輸出的<K,V>類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 1.8:指定輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); // 1.9:指定對(duì)輸出數(shù)據(jù)進(jìn)行格式化處理的類(可以省略) job.setOutputFormatClass(TextOutputFormat.class); // 2.0:提交作業(yè) boolean success = job.waitForCompletion(true); if (success) { System.out.println("Success"); System.exit(0); } else { System.out.println("Failed"); System.exit(1); } }在Main函數(shù)中,主要做了三件事:一是指定輸入、輸出目錄;二是指定自定義的Mapper類和Reducer類;三是提交作業(yè);匆匆看下來,代碼有點(diǎn)多,但有些其實(shí)是可以省略的。
(4)完整代碼如下所示
View Code
3.5 運(yùn)行吧小DEMO
(1)調(diào)試查看控制臺(tái)狀態(tài)信息
(2)通過Shell命令查看統(tǒng)計(jì)結(jié)果
四、使用ToolRunner類改寫WordCount
Hadoop有個(gè)ToolRunner類,它是個(gè)好東西,簡(jiǎn)單好用。無論在《Hadoop權(quán)威指南》還是Hadoop項(xiàng)目源碼自帶的example,都推薦使用ToolRunner。
4.1 最初的寫法
下面我們看下src/example目錄下WordCount.java文件,它的代碼結(jié)構(gòu)是這樣的:
public class WordCount {// 略...public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();// 略...Job job = new Job(conf, "word count");// 略...System.exit(job.waitForCompletion(true) ? 0 : 1);} }WordCount.java中使用到了GenericOptionsParser這個(gè)類,它的作用是 將命令行中參數(shù)自動(dòng)設(shè)置到變量conf中 。舉個(gè)例子,比如我希望通過命令行設(shè)置reduce task數(shù)量,就這么寫:
bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5上面這樣就可以了,不需要將其硬編碼到j(luò)ava代碼中,很輕松就可以將參數(shù)與代碼分離開。
4.2 加入ToolRunner的寫法
至此,我們還沒有說到ToolRunner,上面的代碼我們使用了GenericOptionsParser幫我們解析命令行參數(shù),編寫 ToolRunner的程序員更懶,它將 GenericOptionsParser調(diào)用隱藏到自身run方法,被自動(dòng)執(zhí)行了,修改后的代碼變成了這樣:
public class WordCount extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { Job job = new Job(getConf(), "word count"); // 略... System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new WordCount(), args); System.exit(res); } }看看這段代碼上有什么不同:
(1)讓W(xué)ordCount 繼承Configured并實(shí)現(xiàn)Tool接口 。
(2) 重寫Tool接口的run方法 ,run方法不是static類型,這很好。
(3)在WordCount中我們將 通過getConf()獲取Configuration對(duì)象 。
可以看出,通過簡(jiǎn)單的幾步,就可以實(shí)現(xiàn)代碼與配置隔離、上傳文件到DistributeCache等功能。修改MapReduce參數(shù)不需要修改java代碼、打包、部署,提高工作效率。
4.3 重寫WordCount程序
public class MyJob extends Configured implements Tool {public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... } };}public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, java.lang.Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException { ...... };}// 輸入文件路徑public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";// 輸出文件路徑public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";@Overridepublic int run(String[] args) throws Exception { // 首先刪除輸出路徑的已有生成文件 FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf()); Path outPath = new Path(OUTPUT_PATH); if (fs.exists(outPath)) { fs.delete(outPath, true); } Job job = new Job(getConf(), "WordCount"); // 設(shè)置輸入目錄 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); // 設(shè)置自定義Mapper job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設(shè)置自定義Reducer job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); // 設(shè)置輸出目錄 FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0;}public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new MyJob(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); }} }
推薦閱讀:
- Hadoop學(xué)習(xí)筆記—4.初識(shí)MapReduce ?
- Hadoop學(xué)習(xí)筆記—3.Hadoop RPC機(jī)制 ?
- Hadoop學(xué)習(xí)筆記—2.不怕故障的海量 ?
- Hadoop學(xué)習(xí)筆記—1.基礎(chǔ)概論與環(huán)境
作者:周旭龍
出處:http://edisonchou.cnblogs.com/
總結(jié)
以上是生活随笔為你收集整理的Hadoop学习笔记—4.初识MapReduce的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 怎样在 Markdown 中使程序代码带
- 下一篇: Hadoop学习笔记系列文章导航