如何在Hadoop上编写MapReduce程序
1970年,IBM的研究員E.F.Codd博士在刊物《Communication of the ACM》上發(fā)表了一篇名為“A Relational Model of Data for Large Shared Data Banks”的論文,提出了關(guān)系模型的概念,標(biāo)志著關(guān)系數(shù)據(jù)庫(kù)的誕生,隨后幾十年,關(guān)系數(shù)據(jù)庫(kù)及其結(jié)構(gòu)化查詢語言SQL成為程序員必須掌握的基本技能之一。
2005年4月,Jeffrey Dean和Sanjay Ghemawat在國(guó)際會(huì)議OSDI上發(fā)表“MapReduce: Simplified Data Processing on Large Cluster”,標(biāo)志著google的大規(guī)模數(shù)據(jù)處理系統(tǒng)MapReduce公開。受這篇論文的啟發(fā),當(dāng)年秋天,Hadoop 由 Apache Software Foundation 公司作為 Lucene 的子項(xiàng)目 Nutch 的一部分正式被引入,2006 年 3 月份,MapReduce 和 Nutch Distributed File System (NDFS) 分別被納入稱為 Hadoop 的項(xiàng)目中。如今,Hadoop已經(jīng)被超過50%的互聯(lián)網(wǎng)公司使用,其他很多公司正準(zhǔn)備使用Hadoop來處理海量數(shù)據(jù),隨著Hadoop越來越受歡迎,也許在將來的某段時(shí)間,Hadoop會(huì)成為程序員必須掌握的技能之一,如果真是這樣的話,學(xué)會(huì)如何在Hadoop上編寫MapReduce程序便是學(xué)習(xí)Hadoop的開始。
本文介紹了在Hadoop上編寫MapReduce程序的基本方法,包括MapReduce程序的構(gòu)成,不同語言開發(fā)MapReduce的方法等。
2. Hadoop 作業(yè)構(gòu)成
2.1 Hadoop作業(yè)執(zhí)行流程
用戶配置并將一個(gè)Hadoop作業(yè)提到Hadoop框架中,Hadoop框架會(huì)把這個(gè)作業(yè)分解成一系列map tasks 和reduce tasks。Hadoop框架負(fù)責(zé)task分發(fā)和執(zhí)行,結(jié)果收集和作業(yè)進(jìn)度監(jiān)控。
下圖給出了一個(gè)作業(yè)從開始執(zhí)行到結(jié)束所經(jīng)歷的階段和每個(gè)階段被誰控制(用戶 or Hadoop框架)。
下圖詳細(xì)給出了用戶編寫MapRedue作業(yè)時(shí)需要進(jìn)行那些工作以及Hadoop框架自動(dòng)完成的工作:
在編寫MapReduce程序時(shí),用戶分別通過InputFormat和OutputFormat指定輸入和輸出格式,并定義Mapper和Reducer指定map階段和reduce階段的要做的工作。在Mapper或者Reducer中,用戶只需指定一對(duì)key/value的處理邏輯,Hadoop框架會(huì)自動(dòng)順序迭代解析所有key/value,并將每對(duì)key/value交給Mapper或者Reducer處理。表面上看來,Hadoop限定數(shù)據(jù)格式必須為key/value形式,過于簡(jiǎn)單,很難解決復(fù)雜問題,實(shí)際上,可以通過組合的方法使key或者value(比如在key或者value中保存多個(gè)字段,每個(gè)字段用分隔符分開,或者value是個(gè)序列化后的對(duì)象,在Mapper中使用時(shí),將其反序列化等)保存多重信息,以解決輸入格式較復(fù)雜的應(yīng)用。
2.2 用戶的工作
用戶編寫MapReduce需要實(shí)現(xiàn)的類或者方法有:
(1) InputFormat接口
用戶需要實(shí)現(xiàn)該接口以指定輸入文件的內(nèi)容格式。該接口有兩個(gè)方法
| 1 2 3 4 5 6 7 8 9 10 11 | public interface InputFormat<K, V> { ?????InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; ?????RecordReader<K, V> getRecordReader(InputSplit split, ?????JobConf job, ?????Reporter reporter) throws IOException; } |
其中g(shù)etSplits函數(shù)將所有輸入數(shù)據(jù)分成numSplits個(gè)split,每個(gè)split交給一個(gè)map task處理。getRecordReader函數(shù)提供一個(gè)用戶解析split的迭代器對(duì)象,它將split中的每個(gè)record解析成key/value對(duì)。
Hadoop本身提供了一些InputFormat:
(2)Mapper接口
用戶需繼承Mapper接口實(shí)現(xiàn)自己的Mapper,Mapper中必須實(shí)現(xiàn)的函數(shù)是
| 1 2 3 4 5 6 7 8 9 | void map(K1 key, ????V1 value, ????OutputCollector<K2,V2> output, ????Reporter reporter ) throws IOException |
其中,<K1 V1>是通過Inputformat中的RecordReader對(duì)象解析處理 的,OutputCollector獲取map()的輸出結(jié)果,Reporter保存了當(dāng)前task處理進(jìn)度。
Hadoop本身提供了一些Mapper供用戶使用:
(3)Partitioner接口
用戶需繼承該接口實(shí)現(xiàn)自己的Partitioner以指定map task產(chǎn)生的key/value對(duì)交給哪個(gè)reduce task處理,好的Partitioner能讓每個(gè)reduce task處理的數(shù)據(jù)相近,從而達(dá)到負(fù)載均衡。Partitioner中需實(shí)現(xiàn)的函數(shù)是
getPartition(? K2?? key, V2 value, int numPartitions)
該函數(shù)返回<K2 V2>對(duì)應(yīng)的reduce task ID。
用戶如果不提供Partitioner,Hadoop會(huì)使用默認(rèn)的(實(shí)際上是個(gè)hash函數(shù))。
(4)Combiner
Combiner使得map task與reduce task之間的數(shù)據(jù)傳輸量大大減小,可明顯提高性能。大多數(shù)情況下,Combiner與Reducer相同。
(5)Reducer接口
用戶需繼承Reducer接口實(shí)現(xiàn)自己的Reducer,Reducer中必須實(shí)現(xiàn)的函數(shù)是
| 1 2 3 4 5 6 7 8 9 | void reduce(K2 key, ?????Iterator<V2> values, ?????OutputCollector<K3,V3> output, ?????Reporter reporter ) throws IOException |
Hadoop本身提供了一些Reducer供用戶使用:
(6)OutputFormat
用戶通過OutputFormat指定輸出文件的內(nèi)容格式,不過它沒有split。每個(gè)reduce task將其數(shù)據(jù)寫入自己的文件,文件名為part-nnnnn,其中nnnnn為reduce task的ID。
Hadoop本身提供了幾個(gè)OutputFormat:
3. 分布式緩存
Haoop中自帶了一個(gè)分布式緩存,即DistributedCache對(duì)象,方便map task之間或者reduce task之間共享一些信息,比如某些實(shí)際應(yīng)用中,所有map task要讀取同一個(gè)配置文件或者字典,則可將該配置文件或者字典放到分布式緩存中。
4.?多語言編寫MapReduce作業(yè)
Hadoop采用java編寫,因而Hadoop天生支持java語言編寫作業(yè),但在實(shí)際應(yīng)用中,有時(shí)候,因要用到非java的第三方庫(kù)或者其他原因,要采用C/C++或者其他語言編寫MapReduce作業(yè),這時(shí)候可能要用到Hadoop提供的一些工具。
如果你要用C/C++編寫MpaReduce作業(yè),可使用的工具有Hadoop Streaming或者Hadoop Pipes。
如果你要用Python編寫MapReduce作業(yè),可以使用Hadoop Streaming或者Pydoop。
如果你要使用其他語言,如shell,php,ruby等,可使用Hadoop Streaming。
關(guān)于Hadoop Streaming編程,可參見我的這篇博文:《Hadoop Streaming編程》(http://dongxicheng.org/mapreduce/hadoop-streaming-programming/?)
關(guān)于Pydoop編程,可參見其官方網(wǎng)站:http://sourceforge.net/projects/pydoop/
關(guān)于Hadoop pipes編程,可參見《Hadoop Tutorial 2.2 — Running C++ Programs on Hadoop》。
5.?編程方式比較
(1)java。 Hadoop支持的最好最全面的語言,而且提供了很多工具方便程序員開發(fā)。
(2)Hadoop Streaming。 它最大的優(yōu)點(diǎn)是支持多種語言,但效率較低,reduce task需等到map 階段完成后才能啟動(dòng);它不支持用戶自定義InputFormat,如果用戶想指定輸入文件格式,可使用java語言編寫或者在命令行中指定分隔符;它采用標(biāo)準(zhǔn)輸入輸出讓C/C++與java通信,因而只支持text數(shù)據(jù)格式。
(3)Hadoop Pipes。 專門為C/C++語言設(shè)計(jì),由于其采用了socket方式讓C/C++與java通信,因而其效率較低(其優(yōu)勢(shì)在于,但作業(yè)需要大量,速度很快)。它支持用戶(用C/C++)編寫RecordReader。
(4)Pydoop。它是專門方便python程序員編寫MapReduce作業(yè)設(shè)計(jì)的,其底層使用了Hadoop Streaming接口和libhdfs庫(kù)。
6. 總結(jié)
Hadoop使得分布式程序的編寫變得異常簡(jiǎn)單,很多情況下,用戶只需寫map()和reduce()兩個(gè)函數(shù)即可(InputFormat,Outputformat可用系統(tǒng)缺省的)。正是由于Hadoop編程的簡(jiǎn)單性,越來越多的公司或者研究單位開始使用Hadoop。
7. 注意事項(xiàng)
(1) Hadoop默認(rèn)的InputFormat是TextInputFormat,它將文件中的每一行作為value,該行的偏移量為key。
(2)如果你的輸入是文本文件,且每一行包括key,value,則可使用Hadoop中自帶的KeyValueTextInputFormat,它默認(rèn)的每行是一個(gè)key/value對(duì),且key與value的分割如為TAB(’\t‘),如果想修改key/value之間的分隔符,如將分割符改為“,”,可使用conf.set(“key.value.separator.in.input.line”,”,”);或者-D key.value.separator.in.input.line=,。
8. 參考資料
(1) 書籍 Jason Venner?《Pro Hadoop》
(2) 書籍 Chuck Lam?《Hadoop in Action》
(3) 書籍 Tom White?《Hadoop The Definitive Guide》
(4) Hadoop分布式緩存例子:書籍《Hadoop The Definitive Guide》 第八章? 最后一節(jié)“Distributed Cache”
(5) Hadoop Pipes例子:源代碼中$HAOOOP_HOME/src/examples/pipes路徑下。
(6) Hadoop Pipes資料:
http://developer.yahoo.com/hadoop/tutorial/module4.html#pipes
http://wiki.apache.org/hadoop/C%2B%2BWordCount
原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明:?轉(zhuǎn)載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce/writing-hadoop-programes/
總結(jié)
以上是生活随笔為你收集整理的如何在Hadoop上编写MapReduce程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。