用 Hadoop 进行分布式并行编程, 第 2 部分 程序实例与分析
前言
在上一篇文章:“用 Hadoop 進(jìn)行分布式并行編程 第一部分 基本概念與安裝部署”中,介紹了 MapReduce 計(jì)算模型,分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 并且詳細(xì)介紹了如何安裝 Hadoop,如何運(yùn)行基于 Hadoop 的并行程序。在本文中,將針對(duì)一個(gè)具體的計(jì)算任務(wù),介紹如何基于 Hadoop 編寫并行程序,如何使用 IBM 開發(fā)的 Hadoop Eclipse plugin 在 Eclipse 環(huán)境中編譯并運(yùn)行程序。
回頁首
分析 WordCount 程序
我們先來看看 Hadoop 自帶的示例程序 WordCount,這個(gè)程序用于統(tǒng)計(jì)一批文本文件中單詞出現(xiàn)的頻率,完整的代碼可在下載的 Hadoop 安裝包中得到(在 src/examples 目錄中)。
1.實(shí)現(xiàn)Map類
見代碼清單1。這個(gè)類實(shí)現(xiàn) Mapper 接口中的 map 方法,輸入?yún)?shù)中的 value 是文本文件中的一行,利用 StringTokenizer 將這個(gè)字符串拆成單詞,然后將輸出結(jié)果 <單詞,1> 寫入到 org.apache.hadoop.mapred.OutputCollector 中。OutputCollector 由 Hadoop 框架提供, 負(fù)責(zé)收集 Mapper 和 Reducer 的輸出數(shù)據(jù),實(shí)現(xiàn) map 函數(shù)和 reduce 函數(shù)時(shí),只需要簡單地將其輸出的 <key,value> 對(duì)往 OutputCollector 中一丟即可,剩余的事框架自會(huì)幫你處理好。
代碼中 LongWritable, IntWritable, Text 均是 Hadoop 中實(shí)現(xiàn)的用于封裝 Java 數(shù)據(jù)類型的類,這些類都能夠被串行化從而便于在分布式環(huán)境中進(jìn)行數(shù)據(jù)交換,你可以將它們分別視為 long, int, String 的替代品。Reporter 則可用于報(bào)告整個(gè)應(yīng)用的運(yùn)行進(jìn)度,本例中未使用。
代碼清單1
public static class MapClass extends MapReduceBaseimplements Mapper<LongWritable, Text, Text, IntWritable>{private final static IntWritable one = new IntWritable(1);private Text word = new Text();public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {String line = value.toString();StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {word.set(itr.nextToken());output.collect(word, one);}}}2.實(shí)現(xiàn) Reduce 類
見代碼清單 2。這個(gè)類實(shí)現(xiàn) Reducer 接口中的 reduce 方法, 輸入?yún)?shù)中的 key, values 是由 Map 任務(wù)輸出的中間結(jié)果,values 是一個(gè) Iterator, 遍歷這個(gè) Iterator, 就可以得到屬于同一個(gè) key 的所有 value. 此處,key 是一個(gè)單詞,value 是詞頻。只需要將所有的 value 相加,就可以得到這個(gè)單詞的總的出現(xiàn)次數(shù)。
代碼清單 2
public static class Reduce extends MapReduceBaseimplements Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key, Iterator<IntWritable> values,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {int sum = 0;while (values.hasNext()) {sum += values.next().get();}output.collect(key, new IntWritable(sum));}}3.運(yùn)行 Job
在 Hadoop 中一次計(jì)算任務(wù)稱之為一個(gè) job, 可以通過一個(gè) JobConf 對(duì)象設(shè)置如何運(yùn)行這個(gè) job。此處定義了輸出的 key 的類型是 Text, value 的類型是 IntWritable, 指定使用代碼清單1中實(shí)現(xiàn)的 MapClass 作為 Mapper 類, 使用代碼清單2中實(shí)現(xiàn)的 Reduce 作為 Reducer 類和 Combiner 類, 任務(wù)的輸入路徑和輸出路徑由命令行參數(shù)指定,這樣 job 運(yùn)行時(shí)會(huì)處理輸入路徑下的所有文件,并將計(jì)算結(jié)果寫到輸出路徑下。
然后將 JobConf 對(duì)象作為參數(shù),調(diào)用 JobClient 的 runJob, 開始執(zhí)行這個(gè)計(jì)算任務(wù)。至于 main 方法中使用的 ToolRunner 是一個(gè)運(yùn)行 MapReduce 任務(wù)的輔助工具類,依樣畫葫蘆用之即可。
代碼清單 3
public int run(String[] args) throws Exception {JobConf conf = new JobConf(getConf(), WordCount.class);conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MapClass.class); conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputPath(new Path(args[0]));conf.setOutputPath(new Path(args[1]));JobClient.runJob(conf);return 0;}public static void main(String[] args) throws Exception {if(args.length != 2){System.err.println("Usage: WordCount <input path> <output path>");System.exit(-1);}int res = ToolRunner.run(new Configuration(), new WordCount(), args);System.exit(res);} }以上就是 WordCount 程序的全部細(xì)節(jié),簡單到讓人吃驚,您都不敢相信就這么幾行代碼就可以分布式運(yùn)行于大規(guī)模集群上,并行處理海量數(shù)據(jù)集。
4. 通過 JobConf 定制計(jì)算任務(wù)
通過上文所述的 JobConf 對(duì)象,程序員可以設(shè)定各種參數(shù),定制如何完成一個(gè)計(jì)算任務(wù)。這些參數(shù)很多情況下就是一個(gè) java 接口,通過注入這些接口的特定實(shí)現(xiàn),可以定義一個(gè)計(jì)算任務(wù)( job )的全部細(xì)節(jié)。了解這些參數(shù)及其缺省設(shè)置,您才能在編寫自己的并行計(jì)算程序時(shí)做到輕車熟路,游刃有余,明白哪些類是需要自己實(shí)現(xiàn)的,哪些類用 Hadoop 的缺省實(shí)現(xiàn)即可。表一是對(duì) JobConf 對(duì)象中可以設(shè)置的一些重要參數(shù)的總結(jié)和說明,表中第一列中的參數(shù)在 JobConf 中均會(huì)有相應(yīng)的 get/set 方法,對(duì)程序員來說,只有在表中第三列中的缺省值無法滿足您的需求時(shí),才需要調(diào)用這些 set 方法,設(shè)定合適的參數(shù)值,實(shí)現(xiàn)自己的計(jì)算目的。針對(duì)表格中第一列中的接口,除了第三列的缺省實(shí)現(xiàn)之外,Hadoop 通常還會(huì)有一些其它的實(shí)現(xiàn),我在表格第四列中列出了部分,您可以查閱 Hadoop 的 API 文檔或源代碼獲得更詳細(xì)的信息,在很多的情況下,您都不用實(shí)現(xiàn)自己的 Mapper 和 Reducer, 直接使用 Hadoop 自帶的一些實(shí)現(xiàn)即可。
表一 JobConf 常用可定制參數(shù)
| 將輸入的數(shù)據(jù)集切割成小數(shù)據(jù)集 InputSplits, 每一個(gè) InputSplit 將由一個(gè) Mapper 負(fù)責(zé)處理。此外 InputFormat 中還提供一個(gè) RecordReader 的實(shí)現(xiàn), 將一個(gè) InputSplit 解析成 <key,value> 對(duì)提供給 map 函數(shù)。 | TextInputFormat (針對(duì)文本文件,按行將文本文件切割成 InputSplits, 并用 LineRecordReader 將 InputSplit 解析成 <key,value> 對(duì),key 是行在文件中的位置,value 是文件中的一行) | SequenceFileInputFormat |
| 提供一個(gè) RecordWriter 的實(shí)現(xiàn),負(fù)責(zé)輸出最終結(jié)果 | TextOutputFormat (用 LineRecordWriter 將最終結(jié)果寫成純文件文件,每個(gè) <key,value> 對(duì)一行,key 和 value 之間用 tab 分隔) | SequenceFileOutputFormat |
| 輸出的最終結(jié)果中 key 的類型 | LongWritable | ? |
| 輸出的最終結(jié)果中 value 的類型 | Text | ? |
| Mapper 類,實(shí)現(xiàn) map 函數(shù),完成輸入的 <key,value> 到中間結(jié)果的映射 | IdentityMapper (將輸入的 <key,value> 原封不動(dòng)的輸出為中間結(jié)果) | LongSumReducer, LogRegexMapper, InverseMapper |
| 實(shí)現(xiàn) combine 函數(shù),將中間結(jié)果中的重復(fù) key 做合并 | null (不對(duì)中間結(jié)果中的重復(fù) key 做合并) | ? |
| Reducer 類,實(shí)現(xiàn) reduce 函數(shù),對(duì)中間結(jié)果做合并,形成最終結(jié)果 | IdentityReducer (將中間結(jié)果直接輸出為最終結(jié)果) | AccumulatingReducer, LongSumReducer |
| 設(shè)定 job 的輸入目錄, job 運(yùn)行時(shí)會(huì)處理輸入目錄下的所有文件 | null | ? |
| 設(shè)定 job 的輸出目錄,job 的最終結(jié)果會(huì)寫入輸出目錄下 | null | ? |
| 設(shè)定 map 函數(shù)輸出的中間結(jié)果中 key 的類型 | 如果用戶沒有設(shè)定的話,使用 OutputKeyClass | ? |
| 設(shè)定 map 函數(shù)輸出的中間結(jié)果中 value 的類型 | 如果用戶沒有設(shè)定的話,使用 OutputValuesClass | ? |
| 對(duì)結(jié)果中的 key 進(jìn)行排序時(shí)的使用的比較器 | WritableComparable | ? |
| 對(duì)中間結(jié)果的 key 排序后,用此 Partition 函數(shù)將其劃分為R份,每份由一個(gè) Reducer 負(fù)責(zé)處理。 | HashPartitioner (使用 Hash 函數(shù)做 partition) | KeyFieldBasedPartitioner PipesPartitioner |
回頁首
改進(jìn)的 WordCount 程序
現(xiàn)在你對(duì) Hadoop 并行程序的細(xì)節(jié)已經(jīng)有了比較深入的了解,我們來把 WordCount 程序改進(jìn)一下,目標(biāo): (1)原 WordCount 程序僅按空格切分單詞,導(dǎo)致各類標(biāo)點(diǎn)符號(hào)與單詞混雜在一起,改進(jìn)后的程序應(yīng)該能夠正確的切出單詞,并且單詞不要區(qū)分大小寫。(2)在最終結(jié)果中,按單詞出現(xiàn)頻率的降序進(jìn)行排序。
1.修改 Mapper 類,實(shí)現(xiàn)目標(biāo)(1)
實(shí)現(xiàn)很簡單,見代碼清單4中的注釋。
代碼清單 4
public static class MapClass extends MapReduceBaseimplements Mapper<LongWritable, Text, Text, IntWritable> {private final static IntWritable one = new IntWritable(1);private Text word = new Text();private String pattern="[^\\w]"; //正則表達(dá)式,代表不是0-9, a-z, A-Z的所有其它字符public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {String line = value.toString().toLowerCase(); //全部轉(zhuǎn)為小寫字母line = line.replaceAll(pattern, " "); //將非0-9, a-z, A-Z的字符替換為空格StringTokenizer itr = new StringTokenizer(line);while (itr.hasMoreTokens()) {word.set(itr.nextToken());output.collect(word, one);}}}2.實(shí)現(xiàn)目標(biāo)(2)
用一個(gè)并行計(jì)算任務(wù)顯然是無法同時(shí)完成單詞詞頻統(tǒng)計(jì)和排序的,這時(shí)我們可以利用 Hadoop 的任務(wù)管道能力,用上一個(gè)任務(wù)(詞頻統(tǒng)計(jì))的輸出做為下一個(gè)任務(wù)(排序)的輸入,順序執(zhí)行兩個(gè)并行計(jì)算任務(wù)。主要工作是修改代碼清單3中的 run 函數(shù),在其中定義一個(gè)排序任務(wù)并運(yùn)行之。
在 Hadoop 中要實(shí)現(xiàn)排序是很簡單的,因?yàn)樵?MapReduce 的過程中,會(huì)把中間結(jié)果根據(jù) key 排序并按 key 切成 R 份交給 R 個(gè) Reduce 函數(shù),而 Reduce 函數(shù)在處理中間結(jié)果之前也會(huì)有一個(gè)按 key 進(jìn)行排序的過程,故 MapReduce 輸出的最終結(jié)果實(shí)際上已經(jīng)按 key 排好序。詞頻統(tǒng)計(jì)任務(wù)輸出的 key 是單詞,value 是詞頻,為了實(shí)現(xiàn)按詞頻排序,我們指定使用 InverseMapper 類作為排序任務(wù)的 Mapper 類( sortJob.setMapperClass(InverseMapper.class );),這個(gè)類的 map 函數(shù)簡單地將輸入的 key 和 value 互換后作為中間結(jié)果輸出,在本例中即是將詞頻作為 key,單詞作為 value 輸出, 這樣自然就能得到按詞頻排好序的最終結(jié)果。我們無需指定 Reduce 類,Hadoop 會(huì)使用缺省的 IdentityReducer 類,將中間結(jié)果原樣輸出。
還有一個(gè)問題需要解決: 排序任務(wù)中的 Key 的類型是 IntWritable, (sortJob.setOutputKeyClass(IntWritable.class)), Hadoop 默認(rèn)對(duì) IntWritable 按升序排序,而我們需要的是按降序排列。因此我們實(shí)現(xiàn)了一個(gè) IntWritableDecreasingComparator 類, 并指定使用這個(gè)自定義的 Comparator 類對(duì)輸出結(jié)果中的 key (詞頻)進(jìn)行排序:sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class)
詳見代碼清單 5 及其中的注釋。
代碼清單 5
public int run(String[] args) throws Exception {Path tempDir = new Path("wordcount-temp-" + Integer.toString(new Random().nextInt(Integer.MAX_VALUE))); //定義一個(gè)臨時(shí)目錄JobConf conf = new JobConf(getConf(), WordCount.class);try {conf.setJobName("wordcount");conf.setOutputKeyClass(Text.class);conf.setOutputValueClass(IntWritable.class);conf.setMapperClass(MapClass.class);conf.setCombinerClass(Reduce.class);conf.setReducerClass(Reduce.class);conf.setInputPath(new Path(args[0]));conf.setOutputPath(tempDir); //先將詞頻統(tǒng)計(jì)任務(wù)的輸出結(jié)果寫到臨時(shí)目//錄中, 下一個(gè)排序任務(wù)以臨時(shí)目錄為輸入目錄。conf.setOutputFormat(SequenceFileOutputFormat.class);JobClient.runJob(conf);JobConf sortJob = new JobConf(getConf(), WordCount.class);sortJob.setJobName("sort");sortJob.setInputPath(tempDir);sortJob.setInputFormat(SequenceFileInputFormat.class);sortJob.setMapperClass(InverseMapper.class);sortJob.setNumReduceTasks(1); //將 Reducer 的個(gè)數(shù)限定為1, 最終輸出的結(jié)果//文件就是一個(gè)。sortJob.setOutputPath(new Path(args[1]));sortJob.setOutputKeyClass(IntWritable.class);sortJob.setOutputValueClass(Text.class);sortJob.setOutputKeyComparatorClass(IntWritableDecreasingComparator.class);JobClient.runJob(sortJob);} finally {FileSystem.get(conf).delete(tempDir); //刪除臨時(shí)目錄}return 0;}private static class IntWritableDecreasingComparator extends IntWritable.Comparator {public int compare(WritableComparable a, WritableComparable b) {return -super.compare(a, b);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return -super.compare(b1, s1, l1, b2, s2, l2);}}回頁首
在 Eclipse 環(huán)境下進(jìn)行開發(fā)和調(diào)試
在 Eclipse 環(huán)境下可以方便地進(jìn)行 Hadoop 并行程序的開發(fā)和調(diào)試。推薦使用 IBM MapReduce Tools for Eclipse, 使用這個(gè) Eclipse plugin 可以簡化開發(fā)和部署 Hadoop 并行程序的過程。基于這個(gè) plugin, 可以在 Eclipse 中創(chuàng)建一個(gè) Hadoop MapReduce 應(yīng)用程序,并且提供了一些基于 MapReduce 框架的類開發(fā)的向?qū)?#xff0c;可以打包成 JAR 文件,部署一個(gè) Hadoop MapReduce 應(yīng)用程序到一個(gè) Hadoop 服務(wù)器(本地和遠(yuǎn)程均可),可以通過一個(gè)專門的視圖 ( perspective ) 查看 Hadoop 服務(wù)器、Hadoop 分布式文件系統(tǒng)( DFS )和當(dāng)前運(yùn)行的任務(wù)的狀態(tài)。
可在 IBM alphaWorks 網(wǎng)站下載這個(gè)?MapReduce Tool, 或在本文的下載清單中下載。將下載后的壓縮包解壓到你 Eclipse 安裝目錄,重新啟動(dòng) Eclipse 即可使用了。
設(shè)置 Hadoop 主目錄
點(diǎn)擊 Eclipse 主菜單上 Windows->Preferences, 然后在左側(cè)選擇 Hadoop Home Directory,設(shè)定你的 Hadoop 主目錄,如圖一所示:
圖 1
創(chuàng)立一個(gè) MapReduce Project
點(diǎn)擊 Eclipse 主菜單上 File->New->Project, 在彈出的對(duì)話框中選擇 MapReduce Project, 輸入 project name 如 wordcount, 然后點(diǎn)擊 Finish 即可。,如圖 2 所示:
圖 2
此后,你就可以象一個(gè)普通的 Eclipse Java project 那樣,添加入 Java 類,比如你可以定義一個(gè) WordCount 類,然后將本文代碼清單1,2,3中的代碼寫到此類中,添加入必要的 import 語句 ( Eclipse 快捷鍵 ctrl+shift+o 可以幫你),即可形成一個(gè)完整的 wordcount 程序。
在我們這個(gè)簡單的 wordcount 程序中,我們把全部的內(nèi)容都放在一個(gè) WordCount 類中。實(shí)際上 IBM MapReduce tools 還提供了幾個(gè)實(shí)用的向?qū)?( wizard ) 工具,幫你創(chuàng)建單獨(dú)的 Mapper 類,Reducer 類,MapReduce Driver 類(就是代碼清單3中那部分內(nèi)容),在編寫比較復(fù)雜的 MapReduce 程序時(shí),將這些類獨(dú)立出來是非常有必要的,也有利于在不同的計(jì)算任務(wù)中重用你編寫的各種 Mapper 類和 Reducer 類。
在 Eclipse 中運(yùn)行
如圖三所示,設(shè)定程序的運(yùn)行參數(shù):輸入目錄和輸出目錄之后,你就可以在 Eclipse 中運(yùn)行 wordcount 程序了,當(dāng)然,你也可以設(shè)定斷點(diǎn),調(diào)試程序。
圖 3
回頁首
結(jié)束語
到目前為止,我們已經(jīng)介紹了 MapReduce 計(jì)算模型,分布式文件系統(tǒng) HDFS,分布式并行計(jì)算等的基本原理, 如何安裝和部署單機(jī) Hadoop 環(huán)境,實(shí)際編寫了一個(gè) Hadoop 并行計(jì)算程序,并了解了一些重要的編程細(xì)節(jié),了解了如何使用 IBM MapReduce Tools 在 Eclipse 環(huán)境中編譯,運(yùn)行和調(diào)試你的 Hadoop 并行計(jì)算程序。但一個(gè) Hadoop 并行計(jì)算程序,只有部署運(yùn)行在分布式集群環(huán)境中,才能發(fā)揮其真正的優(yōu)勢,在這篇系列文章的第 3 部分中,你將了解到如何部署你的分布式 Hadoop 環(huán)境,如何利用 IBM MapReduce Tools 將你的程序部署到分布式環(huán)境中運(yùn)行等內(nèi)容。
聲明:本文僅代表作者個(gè)人之觀點(diǎn),不代表 IBM 公司之觀點(diǎn)。
回頁首
下載
| wordcount.zip | 8KB |
| mapreduce_plugin.zip | 324KB |
參考資料
學(xué)習(xí)
- 訪問?Hadoop 官方網(wǎng)站,了解 Hadoop 及其子項(xiàng)目 HBase 的信息。
- Hadoop wiki?上, 有許多 Hadoop 的用戶文檔,開發(fā)文檔,示例程序等。
- 閱讀 Google Mapreduce 論文:?MapReduce: Simplified Data Processing on Large Clusters?, 深入了解 Mapreduce 計(jì)算模型。
- 學(xué)習(xí) Hadoop 分布式文件系統(tǒng) HDFS:?The Hadoop Distributed File System:Architecture and Design
- 學(xué)習(xí) Google 文件系統(tǒng) GFS:?The Google File System, Hadoop HDFS 實(shí)現(xiàn)了與 GFS 類似的功能。
- 到 IBM alphaWorks 網(wǎng)站了解并且下載 IBM MapReduce Tools:http://www.alphaworks.ibm.com/tech/mapreducetools,
總結(jié)
以上是生活随笔為你收集整理的用 Hadoop 进行分布式并行编程, 第 2 部分 程序实例与分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 用 Hadoop 进行分布式并行编程,
- 下一篇: 用 Hadoop 进行分布式并行编程,