MapReduce编程基础
MapReduce編程基礎
1.?WordCount示例及MapReduce程序框架
2. ?MapReduce程序執行流程
3. ?深入學習MapReduce編程(1)
4. 參考資料及代碼下載?
<1>. WordCount示例及MapReduce程序框架?
首先通過一個簡單的程序來實際運行一個MapReduce程序,然后通過這個程序我們來哦那個結一下MapReduce編程模型。
下載源程序:/Files/xuqiang/WordCount.rar,將該程序打包成wordcount.jar下面的命令,隨便寫一個文本文件,這里是WordCountMrtrial,并上傳到hdfs上,這里的路徑是/tmp/WordCountMrtrial,運行下面的命令:
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ ./bin/hadoop jar wordcount.jar WordCount /tmp/WordCountMrtrial /tmp/result
?如果該任務運行完成之后,將在hdfs的/tmp/result目錄下生成類似于這樣的結果:
?gentleman 11
get 12 give 8 go 6 good 9 government 16運行一個程序的基本上就是這樣一個過程,我們來看看具體程序:
main函數中首先生成一個Job對象,?Job job = new Job(conf, "word count");然后設置job的MapperClass,ReducerClass,設置輸入文件路徑FileInputFormat.addInputPath(job, new Path(otherArgs[0]));設置輸出文件路徑:FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));等待程序運行完成:System.exit(job.waitForCompletion(true) ? 0 : 1);可以看出main中僅僅是啟動了一個job,然后設置該job相關的參數,具體實現MapReduce是mapper類和reducer類。
?TokenizerMapper類中map函數將一行分割成<K2, V2>,然后IntSumReducer的reduce將<K2, list<V2>>轉換成最終結果<K3, V3>。
?
通過這個示例基本上也能總結出簡單的MapReduce編程的模型:一個Mapper類,一個Reducer類,一個Driver類。
?
<2>. MapReduce程序執行流程?
?這里所描述的執行流程更加注重是從程序的角度去理解,更加全面的流程可參考[這里]。
?
首先用戶指定待處理的文件,在WordCount就是文件WordCountMrtrial,這是hadoop根據設定的InputDataFormat來將輸入文件分割成一個record(key/value對),然后將這些record傳遞給map函數,在WordCount示例中,對應的record就是<line_number行號, line_content該行內容>;
然后map函數根據輸入的record,形成<K2, V2>,在WordCount示例中形成<K2, V2>就是<single_word, word_count>,例如<"a", 1>;
如果map過程完成之后,hadoop將這些生成的<K2, V2>按照K2進行分組,形成<K2,list(V2) >,之后傳遞給reduce函數,在該函數中最終得到程序的輸出結果<K3, V3>。
<3>. 深入學習MapReduce編程(1)
3.1 hadoop data types
由于在hadoop需要將key/value對序列化,然后通過網絡network發送到集群中的其他機器上,所以說hadoop中的類型需要能夠序列化。
具體而言,自定義的類型,如果一個類class實現了Writable interface的話,那么這個可以作為value類型,如果一個class實現了WritableComparable<T> interface的話,那么這個class可以作為value類型或者是key類型。?
hadoop本身已經實現了一些預定義的類型predefined classes,并且這些類型實現了WritableComparable<T>接口。
3.2 Mapper
如果一個類想要成為一個mapper,那么該類需要實現Mapper接口,同時繼承自MapReduceBase。在MapReduceBase類中,兩個方法是特別需要注意的:
void configure( JobConf job):這個方法是在任務被運行之前調用?
void close():在任務運行完成之后調用
剩下的工作就是編寫map方法,原型如下:
void map(Object key, Text value, Context context?? ? ? ? ? ? ? ? ? ?) throws IOException, InterruptedException;
?這個方法根據<K1, V1>生成<K2, V2>,然后通過context輸出。
同樣的在hadoop中預先定義了如下的Mapper:
?
3.3 Reducer
如果一個類想要成為Reducer的話,需要首先實現Reducer接口,然后需要繼承自MapReduceBase。
當reducer接收從mapper傳遞而來的key/value對,然后根據key來排序,分組,最終生成<K2, list<V2>> ,然后reducer根據<K2, list<V2>>生成<K3, V3>.
同樣在hadoop中預定義了一些Reducer:
?
3.4?Partitioner
?Partitioner的作用主要是將mapper運行的結果“導向directing”到reducer。如果一個類想要成為Partitioner,那么需要實現Partitioner接口,該接口繼承自JobConfigurable,定義如下:
public?interface?Partitioner<K2,?V2>?extends?JobConfigurable?{??/**?
???*?Get?the?paritition?number?for?a?given?key?(hence?record)?given?the?total?
???*?number?of?partitions?i.e.?number?of?reduce-tasks?for?the?job.
???*???
???*?<p>Typically?a?hash?function?on?a?all?or?a?subset?of?the?key.</p>
???*
???*?@param?key?the?key?to?be?paritioned.
???*?@param?value?the?entry?value.
???*?@param?numPartitions?the?total?number?of?partitions.
???*?@return?the?partition?number?for?the?<code>key</code>.
???*/
??int?getPartition(K2?key,?V2?value,?int?numPartitions);
}?
hadoop將根據方法getPartition的返回值確定將mapper的值發送到那個reducer上。返回值相同的key/value對將被“導向“至同一個reducer。
3.5 Input Data Format and Output Data Format
3.5.1 Input Data Format?
上面我們的假設是MapReduce程序的輸入是key/value對,也就是<K1, V1>,但是實際上一般情況下MapReduce程序的輸入是big file的形式,那么如何將這個文件轉換成<K1, V1>,即file -> <K1, V1>。這就需要使用InputFormat接口了。?
下面是幾個常用InputFormat的實現類:
?
當然除了使用hadoop預先定義的InputDataFormat之外,還可以自定義,這是需要實現InputFormat接口。該接口僅僅包含兩個方法:
?
InputSplit[] getSplits(JobConf job, int numSplits) throws ?IOException;該接口實現將大文件分割成小塊split。 RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,??? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? Reporter reporter) throws IOException;?
該方法輸入分割成的split,然后返回RecordReader,通過RecordReader來遍歷該split內的record。?
3.5.2 Output Data Format
每個reducer將自己的輸出寫入到結果文件中,這是使用output data format來配置輸出的文件的格式。hadoop預先實現了:
?
3.6 Streaming in Hadoop
3.6.1 執行流程
我們知道在linux中存在所謂的“流”的概念,也就是說我們可以使用下面的命令:
cat input.txt | RandomSample.py 10 >sampled_output.txt?
同樣在hadoop中我們也可以使用類似的命令,顯然這樣能夠在很大程度上加快程序的開發進程。下面來看看hadoop中流執行的過程:
?
hadoop streaming從標磚輸入STDIN讀取數據,默認的情況下使用\t來分割每行,如果不存在\t的話,那么這時正行的內容將被看作是key,而此時的value內容為空;
然后調用mapper程序,輸出<K2, V2>;
之后,調用Partitioner來將<K2, V2>輸出到對應的reducer上;
reducer根據輸入的<K2, list(V2)> 得到最終結果<K3, V3>并輸出到STDOUT上。?
3.6.2 簡單示例程序?
下面我們假設需要做這樣一個工作,輸入一個文件,文件中每行是一個數字,然后得到該文件中的數字的最大值(當然這里可以使用streaming中自帶的Aggregate)。 首先我們編寫一個python文件(如果對python不是很熟悉,看看[這里]):
3.6.2.1 準備數據
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >url1
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >url2
上傳到hdfs上:
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -mkdir urls
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url1 urls/ xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -put url2 urls/3.6.2.2 編寫mapper?multifetch.py
#!/usr/bin/env?pythonimport?sys,?urllib,?re
title_re?=?re.compile("<title>(.*?)</title>",
????????re.MULTILINE?|?re.DOTALL?|?re.IGNORECASE)
for?line?in?sys.stdin:
????#?We?assume?that?we?are?fed?a?series?of?URLs,?one?per?line
????url?=?line.strip()
????#?Fetch?the?content?and?output?the?title?(pairs?are?tab-delimited)
????match?=?title_re.search(urllib.urlopen(url).read())
????if?match:
????????print?url,?"\t",?match.group(1).strip()
該文件的主要作用是給定一個url,然后輸出該url代表的html頁面的title部分。
在本地測試一下該程序:
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.cs.brandeis.edu" >urls xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ echo "http://www.nytimes.com" >>urlsxuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ sudo chmod u+x ./multifetch.py?
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ cat urls | ./multifetch.py 將輸出:
?http://www.cs.brandeis.edu? Computer Science Department | Brandeis University
http://www.nytimes.com The New York Times - Breaking News, World News & Multimedia3.6.2.3 編寫reducer reducer.py
編寫reducer.py文件?
?#!/usr/bin/env?python from?operator?import?itemgetterimport?sys
for?line?in?sys.stdin:
????line?=?line.strip()
????print?line
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ chmod u+x ./reducer.py??
現在我們的mapper和reducer已經準備好了,那么首先在本地上運行測試一下程序的功能,下面的命令模擬在hadoop上運行的過程:
首先mapper從stdin讀取數據,這里是一行;
然后讀取該行的內容作為一個url,然后得到該url代表的html的title的內容,輸出<url, url-title-content>;
調用sort命令將mapper輸出排序;
將排序完成的結果交給reducer,這里的reducer僅僅是將結果輸出。?
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$?cat?urls?|?./multifetch.py?|?sort?|?./reducer.py?http://www.cs.brandeis.edu?????Computer?Science?Department?|?Brandeis?University
http://www.nytimes.com?????The?New?York?Times?-?Breaking?News,?World?News?&?Multimedia??
顯然程序能夠正確?
3.6.2.4 在hadoop streaming上運行
xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$?bin/hadoop?jar?./mapred/contrib/streaming/hadoop-0.21.0-streaming.jar?\>?-mapper?/home/xuqiang/hadoop/src/hadoop-0.21.0/multifetch.py?\
>?-reducer?/home/xuqiang/hadoop/src/hadoop-0.21.0/reducer.py?\
>?-input?urls/*?\
>?-output?titles?
?程序運行完成之后,查看運行結果:
?xuqiang@ubuntu:~/hadoop/src/hadoop-0.21.0$ bin/hadoop dfs -cat titles/part-00000
<4>. 參考資料及代碼下載
http://pages.cs.brandeis.edu/~cs147a/lab/hadoop-example/?
http://hadoop.apache.org/mapreduce/docs/r0.21.0/streaming.html#Hadoop+Streaming?
<Hadoop In Action>
出處:http://www.cnblogs.com/xuqiang/archive/2011/06/05/2071935.html
總結
以上是生活随笔為你收集整理的MapReduce编程基础的全部內容,希望文章能夠幫你解決所遇到的問題。