Hadoop Streaming 编程
Hadoop Streaming是Hadoop提供的一個編程工具,它允許用戶使用任何可執行文件或者腳本文件作為Mapper和Reducer,例如:
采用shell腳本語言中的一些命令作為mapper和reducer(cat作為mapper,wc作為reducer)
$HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper cat \
-reducer wc
本文安排如下,第二節介紹Hadoop Streaming的原理,第三節介紹Hadoop Streaming的使用方法,第四節介紹Hadoop Streaming的程序編寫方法,在這一節中,用C++、C、shell腳本 和python實現了WordCount作業,第五節總結了常見的問題。文章最后給出了程序下載地址。(本文內容基于Hadoop-0.20.2版本)
(注:如果你采用的語言為C或者C++,也可以使用Hadoop Pipes,具體可參考這篇文章:Hadoop Pipes編程。)
關于Hadoop Streaming高級編程方法,可參考這篇文章:Hadoop Streaming高級編程,Hadoop編程實例。
2、Hadoop Streaming原理
mapper和reducer會從標準輸入中讀取用戶數據,一行一行處理后發送給標準輸出。Streaming工具會創建MapReduce作業,發送給各個tasktracker,同時監控整個作業的執行過程。
如果一個文件(可執行或者腳本)作為mapper,mapper初始化時,每一個mapper任務會把該文件作為一個單獨進程啟動,mapper任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。 同時,mapper收集可執行文件進程標準輸出的內容,并把收到的每一行內容轉化成key/value對,作為mapper的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。如果沒有tab,整行作為key值,value值為null。
對于reducer,類似。
以上是Map/Reduce框架和streaming mapper/reducer之間的基本通信協議。
3、Hadoop Streaming用法
Usage: $HADOOP_HOME/bin/hadoop jar \
$HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar [options]
options:
(1)-input:輸入文件路徑
(2)-output:輸出文件路徑
(3)-mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
(4)-reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
(5)-file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。
(6)-partitioner:用戶自定義的partitioner程序
(7)-combiner:用戶自定義的combiner程序(必須用java實現)
(8)-D:作業的一些屬性(以前用的是-jonconf),具體有:
1)mapred.map.tasks:map task數目
2)mapred.reduce.tasks:reduce task數目
3)stream.map.input.field.separator/stream.map.output.field.separator: map task輸入/輸出數
據的分隔符,默認均為\t。
4)stream.num.map.output.key.fields:指定map task輸出記錄中key所占的域數目
5)stream.reduce.input.field.separator/stream.reduce.output.field.separator:reduce task輸入/輸出數據的分隔符,默認均為\t。
6)stream.num.reduce.output.key.fields:指定reduce task輸出記錄中key所占的域數目
另外,Hadoop本身還自帶一些好用的Mapper和Reducer:
(1)??? Hadoop聚集功能
Aggregate提供一個特殊的reducer類和一個特殊的combiner類,并且有一系列的“聚合器”(例如“sum”,“max”,“min”等)用于聚合一組value的序列。用戶可以使用Aggregate定義一個mapper插件類,這個類用于為mapper輸入的每個key/value對產生“可聚合項”。Combiner/reducer利用適當的聚合器聚合這些可聚合項。要使用Aggregate,只需指定“-reducer aggregate”。
(2)字段的選取(類似于Unix中的‘cut’)
Hadoop的工具類org.apache.hadoop.mapred.lib.FieldSelectionMapReduc幫助用戶高效處理文本數據,就像unix中的“cut”工具。工具類中的map函數把輸入的key/value對看作字段的列表。 用戶可以指定字段的分隔符(默認是tab),可以選擇字段列表中任意一段(由列表中一個或多個字段組成)作為map輸出的key或者value。 同樣,工具類中的reduce函數也把輸入的key/value對看作字段的列表,用戶可以選取任意一段作為reduce輸出的key或value。
4、Mapper和Reducer實現
本節試圖用盡可能多的語言編寫Mapper和Reducer,包括Java,C,C++,Shell腳本,python等(初學者運行第一個程序時,務必要閱讀第5部分 “常見問題及解決方案”!!!!)。
由于Hadoop會自動解析數據文件到Mapper或者Reducer的標準輸入中,以供它們讀取使用,所有應先了解各個語言獲取標準輸入的方法。
(1)????Java語言:
見Hadoop自帶例子
(2)????C++語言:
| 1 2 3 4 5 | string?key; while(cin>>key){ ??cin>>value; ???…. } |
(3)??C語言:
| 1 2 3 4 5 | char buffer[BUF_SIZE]; while(fgets(buffer, BUF_SIZE - 1, stdin)){ ??int len = strlen(buffer); ??… } |
(4)??Shell腳本
管道
(5)??Python腳本
| 1 2 3 | import?sys for?line?in?sys.stdin: ....... |
為了說明各種語言編寫Hadoop Streaming程序的方法,下面以WordCount為例,WordCount作業的主要功能是對用戶輸入的數據中所有字符串進行計數。
(1)C語言實現
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | //mapper #include <stdio.h> #include <string.h> #include <stdlib.h> #define BUF_SIZE??????? 2048 #define DELIM?? "\n" int main(int argc, char *argv[]){ ?????char buffer[BUF_SIZE]; ?????while(fgets(buffer, BUF_SIZE - 1, stdin)){ ????????????int len = strlen(buffer); ????????????if(buffer[len-1] == '\n') ?????????????buffer[len-1] = 0; ????????????char *querys? = index(buffer, ' '); ????????????char *query = NULL; ????????????if(querys == NULL) continue; ????????????querys += 1; /*? not to include '\t' */ ????????????query = strtok(buffer, " "); ????????????while(query){ ???????????????????printf("%s\t1\n", query); ???????????????????query = strtok(NULL, " "); ????????????} ?????} ?????return 0; } //--------------------------------------------------------------------------------------- //reducer #include <stdio.h> #include <string.h> #include <stdlib.h> #define BUFFER_SIZE???? 1024 #define DELIM?? "\t" int main(int argc, char *argv[]){ ?char strLastKey[BUFFER_SIZE]; ?char strLine[BUFFER_SIZE]; ?int count = 0; ?*strLastKey = '\0'; ?*strLine = '\0'; ?while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){ ???char *strCurrKey = NULL; ???char *strCurrNum = NULL; ???strCurrKey? = strtok(strLine, DELIM); ???strCurrNum = strtok(NULL, DELIM); /* necessary to check error but.... */ ???if( strLastKey[0] == '\0'){ ?????strcpy(strLastKey, strCurrKey); ???} ???if(strcmp(strCurrKey, strLastKey)) { ?????printf("%s\t%d\n", strLastKey, count); ?????count = atoi(strCurrNum); ???} else { ?????count += atoi(strCurrNum); ???} ???strcpy(strLastKey, strCurrKey); ?} ?printf("%s\t%d\n", strLastKey, count); /* flush the count */ ?return 0; } |
(2)C++語言實現
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | //mapper #include <stdio.h> #include <string> #include <iostream> using namespace std; int main(){ ????????string key; ????????string value = "1"; ????????while(cin>>key){ ????????????????cout<<key<<"\t"<<value<<endl; ????????} ????????return 0; } //------------------------------------------------------------------------------------------------------------ //reducer #include <string> #include <map> #include <iostream> #include <iterator> using namespace std; int main(){ ????????string key; ????????string value; ????????map<string, int> word2count; ????????map<string, int>::iterator it; ????????while(cin>>key){ ????????????????cin>>value; ????????????????it = word2count.find(key); ????????????????if(it != word2count.end()){ ????????????????????????(it->second)++; ????????????????} ????????????????else{ ????????????????????????word2count.insert(make_pair(key, 1)); ????????????????} ????????} ????????for(it = word2count.begin(); it != word2count.end(); ++it){ ????????????????cout<<it->first<<"\t"<<it->second<<endl; ????????} ????????return 0; } |
(3)shell腳本語言實現
簡約版,每行一個單詞:
| 1 2 3 4 5 | $HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/hadoop-streaming.jar \ ????-input myInputDirs \ ????-output myOutputDir \ ????-mapper cat \ ???-reducer? wc |
詳細版,每行可有多個單詞(由史江明編寫):?mapper.sh
| 1 2 3 4 5 6 7 | #! /bin/bash while read LINE; do ??for word in $LINE ??do ????echo "$word 1" ??done done |
reducer.sh
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | #! /bin/bash count=0 started=0 word="" while read LINE;do ??newword=`echo $LINE | cut -d ' '? -f 1` ??if [ "$word" != "$newword" ];then ????[ $started -ne 0 ] && echo "$word\t$count" ????word=$newword ????count=1 ????started=1 ??else ????count=$(( $count + 1 )) ??fi done echo "$word\t$count" |
(4)Python腳本語言實現
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | #!/usr/bin/env python import sys # maps words to their counts word2count = {} # input comes from STDIN (standard input) for line in sys.stdin: ????# remove leading and trailing whitespace ????line = line.strip() ????# split the line into words while removing any empty strings ????words = filter(lambda word: word, line.split()) ????# increase counters ????for word in words: ????????# write the results to STDOUT (standard output); ????????# what we output here will be the input for the ????????# Reduce step, i.e. the input for reducer.py ????????# ????????# tab-delimited; the trivial word count is 1 ????????print '%s\t%s' % (word, 1) #--------------------------------------------------------------------------------------------------------- #!/usr/bin/env python from operator import itemgetter import sys # maps words to their counts word2count = {} # input comes from STDIN for line in sys.stdin: ????# remove leading and trailing whitespace ????line = line.strip() ????# parse the input we got from mapper.py ????word, count = line.split() ????# convert count (currently a string) to int ????try: ????????count = int(count) ????????word2count[word] = word2count.get(word, 0) + count ????except ValueError: ????????# count was not a number, so silently ????????# ignore/discard this line ????????pass # sort the words lexigraphically; # # this step is NOT required, we just do it so that our # final output will look more like the official Hadoop # word count examples sorted_word2count = sorted(word2count.items(), key=itemgetter(0)) # write the results to STDOUT (standard output) for word, count in sorted_word2count: ????print '%s\t%s'% (word, count) |
5、常見問題及解決方案
(1)作業總是運行失敗,
提示找不多執行程序, 比如“Caused by: java.io.IOException: Cannot run?program “/user/hadoop/Mapper”: error=2, No such file or directory”:
可在提交作業時,采用-file選項指定這些文件, 比如上面例子中,可以使用“-file Mapper -file Reducer” 或者 “-file Mapper.py -file Reducer.py”, 這樣,Hadoop會將這兩個文件自動分發到各個節點上,比如:
$HADOOP_HOME/bin/hadoop? jar $HADOOP_HOME/contrib/streaming/hadoop-*-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper Mapper.py\
-reducer Reducerr.py\
-file?Mapper.py?\
-file Reducer.py
(2)用腳本編寫時,第一行需注明腳本解釋器,默認是shell ? (3)如何對Hadoop Streaming程序進行測試? ? Hadoop Streaming程序的一個優點是易于測試,比如在Wordcount例子中,可以運行以下命令在本地進行測試:
cat input.txt | python?Mapper.py | sort | python Reducer.py
或者
cat input.txt | ./Mapper | sort | ./Reducer
總結
以上是生活随笔為你收集整理的Hadoop Streaming 编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 使用Python实现Hadoop Map
- 下一篇: Hadoop YARN