Hadoop pipes编程
Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader, Mapper, Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:Hadoop Pipes設計原理。
本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數,怎樣提高效率等。
2. Hadoop pipes編程初體驗
Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。
(1) wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader 為LineRecordReader(位于InputTextInputFormat中,按行讀取數據,行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)
(2) wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫
接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。
在Hadoop的安裝目錄下,執行下面命令:
| 1 | ant -Dcompile.c++=yes examples |
則wordcount-simple.cc生成的可執行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:
| 1 | bin/hadoop? -put? build/c++-examples/Linux-amd64-64/bin/wordcount-simple? /user/XXX/ bin/ |
上傳一份數據到HDFS的/user/XXX /pipes_test_data目錄下:
| 1 | bin/hadoop? -put? data.txt? /user/XXX /pipes_test_data |
直接使用下面命令提交作業:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 | bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -D mapred.job.name= wordcount \ -input /user/XXX /pipes_test_data \ -output /user/XXX /pipes_test_output \ -program /user/XXX/ bin/wordcount-simple |
3. Hadoop pipes編程方法
先從最基礎的兩個組件Mapper和Reducer說起。
(1) Mapper編寫方法
用戶若要實現Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:
| 1 2 3 4 5 6 7 | class Mapper: public Closable { public: virtual void map(MapContext& context) = 0; }; |
用戶必須實現map函數,它的參數是MapContext,該類的聲明如下:
| 1 2 3 4 5 6 7 8 9 10 11 | class MapContext: public TaskContext { public: virtual const std::string& getInputSplit() = 0; virtual const std::string& getInputKeyClass() = 0; virtual const std::string& getInputValueClass() = 0; }; |
而TaskContext類地聲明如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | class TaskContext { public: class Counter { …… public: Counter(int counterId) : id(counterId) {} Counter(const Counter& counter) : id(counter.id) {} …… }; virtual const JobConf* getJobConf() = 0; virtual const std::string& getInputKey() = 0; virtual const std::string& getInputValue() = 0; virtual void emit(const std::string& key, const std::string& value) = 0; virtual void progress() = 0; ……. }; |
用戶可以從context參數中獲取當前的key,value,progress和inputsplit等數據信息,此外,還可以調用emit將結果回傳給Java代碼。
Mapper的構造函數帶有一個HadoopPipes::TaskContext參數,用戶可以通過它注冊一些全局counter,對于程序調試和跟蹤作業進度非常有用:
如果你想注冊全局counter,在構造函數添加一些類似的代碼:
| 1 2 3 4 5 6 7 | WordCountMap(HadoopPipes::TaskContext& context) { inputWords1 = context.getCounter(“group”, ”counter1”); inputWords2 = context.getCounter(“group”, ”counter2”); } |
當需要增加counter值時,可以這樣:
| 1 2 3 | context.incrementCounter(inputWords1, 1); context.incrementCounter(inputWords2, 1); |
其中getCounter的兩個參數分別為組名和組內計數器名,一個組中可以存在多個counter。
用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。
(2) Reducer編寫方法
Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。
與Mapper組件唯一不同的地方時,map函數的參數類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。
接下來介紹RecordReader, Partitioner和RecordWriter的編寫方法:
(3) RecordReader編寫方法
用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:
| 1 2 3 4 5 6 7 8 9 | class RecordReader: public Closable { public: virtual bool next(std::string& key, std::string& value) = 0; virtual float getProgress() = 0; }; |
用戶需要實現next和 getProgress兩個方法。
用戶自定義的RecordReader的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getInputSplit()的方法,用戶可以獲取經過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現next方法。
下面介紹一下反序列化InputSplit對象的方法:
【1】 如果Java端采用的InputFormat為WordCountInpuFormat,可以這樣:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | class XXXReader: public HadoopPipes::RecordReader { public: XXXReader (HadoopPipes::MapContext& context) { std::string filename; HadoopUtils::StringInStream stream(context.getInputSplit()); HadoopUtils::deserializeString(filename, stream); …… }; |
【2】 如果Java端采用的InputFormat為TextInpuFormat,可以這樣:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 | class XXXReader: public HadoopPipes::RecordReader { public: XXXReader (HadoopPipes::MapContext& context) { std::string filename; HadoopUtils::StringInStream stream(context.getInputSplit()); readString(filename, stream); int start = (int)readLong(stream); int len = (int)readLong(stream); …… private: void readString(std::string& t, HadoopUtils::StringInStream& stream) { int len = readShort(stream); if (len > 0) { // resize the string to the right length t.resize(len); // read into the string in 64k chunks const int bufSize = 65536; int offset = 0; char buf[bufSize]; while (len > 0) { int chunkLength = len > bufSize ? bufSize : len; stream.read(buf, chunkLength); t.replace(offset, chunkLength, buf, chunkLength); offset += chunkLength; len -= chunkLength; } } else { t.clear(); } } long readLong(HadoopUtils::StringInStream& stream) { long n; char b; stream.read(&b, 1); n = (long)(b & 0xff) << 56 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 48 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 40 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 32 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 24 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 16 ; stream.read(&b, 1); n |= (long)(b & 0xff) << 8 ; stream.read(&b, 1); n |= (long)(b & 0xff) ; return n; } }; |
(4) Partitioner編寫方法
用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:
| 1 2 3 4 5 6 7 8 9 | class Partitioner { public: virtual int partition(const std::string& key, int numOfReduces) = 0; virtual ~Partitioner() {} }; |
用戶需要實現partition方法和 析構函數。
對于partition方法,框架會自動為它傳入兩個參數,分別為key值和reduce task的個數numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。
(5) RecordWriter編寫方法
用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:
| 1 2 3 4 5 6 7 8 9 | class RecordWriter: public Closable { public: virtual void emit(const std::string& key, const std::string& value) = 0; }; |
用戶自定的RecordWriter的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數,如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | class MyWriter: public HadoopPipes::RecordWriter { public: MyWriter(HadoopPipes::ReduceContext& context) { const HadoopPipes::JobConf* job = context.getJobConf(); int part = job->getInt("mapred.task.partition"); std::string outDir = job->get("mapred.work.output.dir"); …… } } |
用戶需實現emit方法,將數據寫入某個文件。
4. Hadoop pipes編程示例
網上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。
該運行需要具備以下前提:
(1) 采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2) 輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test (注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode 為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑。例如,bin/hadoop fs –ls file:///home/xxx/pipes_test表示列出本地磁盤上/home/xxx/pipes_tes下的文件)
待確定好各個datanode的本地磁盤上有輸入數據/home/xxx/pipes_test/data.txt后,用戶首先上傳可執行文件到HDFS中:
| 1 | bin/hadoop? -put? build/c++-examples/Linux-amd64-64/bin/wordcount-nopipe? /user/XXX/bin/ |
然后使用下面命令提交該作業:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=false \ -D hadoop.pipes.java.recordwriter=false \ -D mapred.job.name=wordcount \ -D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \ -libjars hadoop-0.20.2-test.jar \ -input file:///home/xxx/pipes_test/data.txt \ -output file:///home/xxx/pipes_output \ -program /user/XXX/bin/wordcount-nopipe |
5. Hadoop pipes高級編程
如果用戶需要在mapreduce作業中加載詞典或者傳遞參數,可這樣做:
(1) 提交作業時,用-files選項,將詞典(需要傳遞參數可以放到一個配置文件中)上傳給各個datanode,如:
| 1 2 3 4 5 6 7 8 9 10 11 | bin/hadoop pipes \ -D hadoop.pipes.java.recordreader=false \ -D hadoop.pipes.java.recordwriter=false \ -D mapred.job.name=wordcount \ -files dic.txt \ …. |
(2) 在Mapper或者Reducer的構造函數中,將字典文件以本地文件的形式打開,并把內容保存到一個map或者set中,然后再map()或者reduce()函數中使用即可,如:
| 1 2 3 4 5 6 7 | WordCountMap(HadoopPipes::TaskContext& context) { file = fopen(“dic.txt”, "r"); //C庫函數 ……. } |
為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現,底層還是要調用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。
6. 總結
Hadoop pipes使C++程序員編寫MapReduce作業變得可能,它簡單好用,提供了用戶所需的大部分功能。
1.Hadoop pipes編程介紹
Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:
本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數,怎樣提高效率等。
2.Hadoop pipes編程初體驗
Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。
(1)wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader為LineRecordReader(位于InputTextInputFormat中,按行讀取數據,行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)
(2)wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫
接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。
在Hadoop的安裝目錄下,執行下面命令:
ant -Dcompile.c++=yes examples
則wordcount-simple.cc生成的可執行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:
bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/
上傳一份數據到HDFS的/user/XXX /pipes_test_data目錄下:
bin/hadoop-putdata.txt/user/XXX /pipes_test_data
直接使用下面命令提交作業:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple
3.Hadoop pipes編程方法
先從最基礎的兩個組件Mapper和Reducer說起。
(1)Mapper編寫方法
用戶若要實現Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:
class Mapper: public Closable {
public:
virtual void map(MapContext& context) = 0;
};
用戶必須實現map函數,它的參數是MapContext,該類的聲明如下:
class MapContext: public TaskContext {
public:
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};
而TaskContext類地聲明如下:
class TaskContext {
public:
class Counter {
……
public:
Counter(int counterId) : id(counterId) {}
Counter(const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit(const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};
用戶可以從context參數中獲取當前的key,value,progress和inputsplit等數據信息,此外,還可以調用emit將結果回傳給Java代碼。
Mapper的構造函數帶有一個HadoopPipes::TaskContext參數,用戶可以通過它注冊一些全局counter,對于程序調試和跟蹤作業進度非常有用:
如果你想注冊全局counter,在構造函數添加一些類似的代碼:
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}
當需要增加counter值時,可以這樣:
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);
其中getCounter的兩個參數分別為組名和組內計數器名,一個組中可以存在多個counter。
用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。
(2)Reducer編寫方法
Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。
與Mapper組件唯一不同的地方時,map函數的參數類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。
接下來介紹RecordReader,Partitioner和RecordWriter的編寫方法:
(3)RecordReader編寫方法
用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:
class RecordReader: public Closable {
public:
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};
用戶需要實現next和getProgress兩個方法。
用戶自定義的RecordReader的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getInputSplit()的方法,用戶可以獲取經過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現next方法。
(4)Partitioner編寫方法
用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:
class Partitioner {
public:
virtual int partition(const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
用戶需要實現partition方法和析構函數。
對于partition方法,框架會自動為它傳入兩個參數,分別為key值和reduce task的個數numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。
(5)RecordWriter編寫方法
用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:
class RecordWriter: public Closable {
public:
virtual void emit(const std::string& key,
const std::string& value) = 0;
};
用戶自定的RecordWriter的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數,如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。
class WordCountWriter: public HadoopPipes::RecordWriter {
public:
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt(“mapred.task.partition”);
std::string outDir = job->get(“mapred.work.output.dir”);
……
}
}
用戶需實現emit方法,將數據寫入某個文件。
4.Hadoop pipes編程示例
網上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。
該運行需要具備以下前提:
(1)?采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2)?輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test(注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑)
待確定好各個datanode的本地磁盤上有輸入數據/home/xxx/pipes_test/data.txt后,用戶首先上傳可執行文件到HDFS中:
bin/hadoop-putbuild/c++-examples/Linux-amd64-64/bin/wordcount-simple/user/XXX/ bin/
然后使用下面命令運行該程序:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file:/home/xxx/pipes_test/data.txt \
-output file:/home/xxx/pipes_output \
-program /user/XXX/ bin/wordcount-nopipe
5.Hadoop pipes高級編程
如果用戶需要在mapreduce作業中加載詞典或者傳遞參數,可這樣做:
(1)?提交作業時,用-files選項,將詞典(需要傳遞參數可以放到一個配置文件中)上傳給各個datanode,如
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-files dic.txt \
….
(2)在Mapper或者Reducer的構造函數中,將字典文件以本地文件的形式打開,并把內容保存到一個map或者set中,然后再map()或者reduce()函數中使用即可,如
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen(“dic.txt”, “r”); //C庫函數
…….
}
為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現,底層還是要調用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。
6.總結
1. Hadoop pipes編程介紹
Hadoop pipes允許C++程序員編寫mapreduce程序,它允許用戶混用C++和Java的RecordReader,Mapper,Partitioner,Rducer和RecordWriter等五個組件。關于Hadoop pipes的設計思想,可參見我這篇文章:
本文介紹了Hadoop pipes編程的基本方法,并給出了若干編程示例,最后介紹了Hadoop pipes高級編程方法,包括怎樣在MapReduce中加載詞典,怎么傳遞參數,怎樣提高效率等。
2. Hadoop pipes編程初體驗
Hadoop-0.20.2源代碼中自帶了三個pipes編程示例,它們位于目錄src/examples/pipes/impl中,分別為wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面簡要介紹一下這三個程序。
(1) wordcount-simple.cc:Mapper和Reducer組件采用C++語言編寫,RecordReader, Partitioner和RecordWriter采用Java語言編寫,其中,RecordReader 為LineRecordReader(位于InputTextInputFormat中,按行讀取數據,行所在的偏移量為key,行中的字符串為value),Partitioner為PipesPartitioner,RecordWriter為LineRecordWriter(位于InputTextOutputFormat中,輸出格式為”key\tvalue\n”)
(2) wordcount-part.cc:Mapper,Partitioner和Reducer組件采用C++語言編寫,其他采用Java編寫
(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++編寫
接下來簡單介紹一下wordcount-simple.cc的編譯和運行方法。
在Hadoop的安裝目錄下,執行下面命令:
ant -Dcompile.c++=yes examples
則wordcount-simple.cc生成的可執行文件wordcount-simple被保存到了目錄build/c++-examples/Linux-amd64-64/bin/中,然后將該可執行文件上傳到HDFS的某一個目錄下,如/user/XXX/ bin下:
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/
上傳一份數據到HDFS的/user/XXX /pipes_test_data目錄下:
bin/hadoop -put data.txt /user/XXX /pipes_test_data
直接使用下面命令提交作業:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-D mapred.job.name= wordcount \
-input /user/XXX /pipes_test_data \
-output /user/XXX /pipes_test_output \
-program /user/XXX/ bin/wordcount-simple
3. Hadoop pipes編程方法
先從最基礎的兩個組件Mapper和Reducer說起。
(1) Mapper編寫方法
用戶若要實現Mapper組件,需繼承HadoopPipes::Mapper虛基類,它的定義如下:
class Mapper: public Closable {
public:
virtual void map(MapContext& context) = 0;
};
用戶必須實現map函數,它的參數是MapContext,該類的聲明如下:
class MapContext: public TaskContext {
public:
virtual const std::string& getInputSplit() = 0;
virtual const std::string& getInputKeyClass() = 0;
virtual const std::string& getInputValueClass() = 0;
};
而TaskContext類地聲明如下:
class TaskContext {
public:
class Counter {
……
public:
Counter(int counterId) : id(counterId) {}
Counter(const Counter& counter) : id(counter.id) {}
……
};
virtual const JobConf* getJobConf() = 0;
virtual const std::string& getInputKey() = 0;
virtual const std::string& getInputValue() = 0;
virtual void emit(const std::string& key, const std::string& value) = 0;
virtual void progress() = 0;
…….
};
用戶可以從context參數中獲取當前的key,value,progress和inputsplit等數據信息,此外,還可以調用emit將結果回傳給Java代碼。
Mapper的構造函數帶有一個HadoopPipes::TaskContext參數,用戶可以通過它注冊一些全局counter,對于程序調試和跟蹤作業進度非常有用:
如果你想注冊全局counter,在構造函數添加一些類似的代碼:
WordCountMap(HadoopPipes::TaskContext& context) {
inputWords1 = context.getCounter(“group”, ”counter1”);
inputWords2 = context.getCounter(“group”, ”counter2”);
}
當需要增加counter值時,可以這樣:
context.incrementCounter(inputWords1, 1);
context.incrementCounter(inputWords2, 1);
其中getCounter的兩個參數分別為組名和組內計數器名,一個組中可以存在多個counter。
用戶自定義的counter會在程序結束時,輸出到屏幕上,當然,用戶可以用通過web界面看到。
(2) Reducer編寫方法
Reducer組件的編寫方法跟Mapper組件類似,它需要繼承虛基類public HadoopPipes::Reducer。
與Mapper組件唯一不同的地方時,map函數的參數類型為HadoopPipes::ReduceContext,它包含一個nextValue()方法,這允許用于遍歷當前key對應的value列表,依次進行處理。
接下來介紹RecordReader, Partitioner和RecordWriter的編寫方法:
(3) RecordReader編寫方法
用戶自定義的RecordReader類需要繼承虛基類HadoopPipes::RecordReader,它的聲明如下:
class RecordReader: public Closable {
public:
virtual bool next(std::string& key, std::string& value) = 0;
virtual float getProgress() = 0;
};
用戶需要實現next和 getProgress兩個方法。
用戶自定義的RecordReader的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getInputSplit()的方法,用戶可以獲取經過序列化的InpuSplit對象,Java端采用不同的InputFormat可導致InputSplit對象格式不同,但對于大多數InpuSplit對象,它們可以提供至少三個信息:當前要處理的InputSplit所在的文件名,所在文件中的偏移量,它的長度。用戶獲取這三個信息后,可使用libhdfs庫讀取文件,以實現next方法。
(4) Partitioner編寫方法
用戶自定義的Partitioner類需要繼承虛基類HadoopPipes:: Partitioner,它的聲明如下:
class Partitioner {
public:
virtual int partition(const std::string& key, int numOfReduces) = 0;
virtual ~Partitioner() {}
};
用戶需要實現partition方法和 析構函數。
對于partition方法,框架會自動為它傳入兩個參數,分別為key值和reduce task的個數numOfReduces,用戶只需返回一個0~ numOfReduces-1的值即可。
(5) RecordWriter編寫方法
用戶自定義的RecordWriter類需要繼承虛基類HadoopPipes:: RecordWriter,它的聲明如下:
class RecordWriter: public Closable {
public:
virtual void emit(const std::string& key,
const std::string& value) = 0;
};
用戶自定的RecordWriter的構造函數可攜帶類型為HadoopPipes::MapContext的參數,通過該參數的getJobConf()可獲取一個HadoopPipes::JobConf的對象,用戶可從該對象中獲取該reduce task的各種參數,如:該reduce task的編號(這對于確定輸出文件名有用),reduce task的輸出目錄等。
class WordCountWriter: public HadoopPipes::RecordWriter {
public:
MyWriter(HadoopPipes::ReduceContext& context) {
const HadoopPipes::JobConf* job = context.getJobConf();
int part = job->getInt(“mapred.task.partition”);
std::string outDir = job->get(“mapred.work.output.dir”);
……
}
}
用戶需實現emit方法,將數據寫入某個文件。
4. Hadoop pipes編程示例
網上有很多人懷疑Hadoop pipes自帶的程序wordcount-nopipe.cc不能運行,各個論壇都有討論,在此介紹該程序的設計原理和運行方法。
該運行需要具備以下前提:
(1) 采用的InputFormat為WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中
(2) 輸入目錄和輸出目錄需位于各個datanode的本地磁盤上,格式為:file:///home/xxx/pipes_test (注意,hdfs中的各種接口同時支持本地路徑和HDFS路徑,如果是HDFS上的路徑,需要使用hdfs://host:9000/user/xxx,表示/user/xxx為namenode 為host的hdfs上的路徑,而本地路徑,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test為本地路徑)
待確定好各個datanode的本地磁盤上有輸入數據/home/xxx/pipes_test/data.txt后,用戶首先上傳可執行文件到HDFS中:
bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/
然后使用下面命令運行該程序:
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \
-libjars hadoop-0.20.2-test.jar \
-input file:/home/xxx/pipes_test/data.txt \
-output file:/home/xxx/pipes_output \
-program /user/XXX/ bin/wordcount-nopipe
5. Hadoop pipes高級編程
如果用戶需要在mapreduce作業中加載詞典或者傳遞參數,可這樣做:
(1) 提交作業時,用-files選項,將詞典(需要傳遞參數可以放到一個配置文件中)上傳給各個datanode,如
bin/hadoop pipes \
-D hadoop.pipes.java.recordreader=false \
-D hadoop.pipes.java.recordwriter=false \
-D mapred.job.name=wordcount \
-files dic.txt \
….
(2) 在Mapper或者Reducer的構造函數中,將字典文件以本地文件的形式打開,并把內容保存到一個map或者set中,然后再map()或者reduce()函數中使用即可,如
WordCountMap(HadoopPipes::TaskContext& context) {
file = fopen(“dic.txt”, “r”); //C庫函數
…….
}
為了提高系能,RecordReader和RecordWriter最好采用Java代碼實現(或者重用Hadoop中自帶的),這是因為Hadoop自帶的C++庫libhdfs采用JNI實現,底層還是要調用Java相關接口,效率很低,此外,如果要處理的文件為二進制文件或者其他非文本文件,libhdfs可能不好處理。
6. 總結
Hadoop pipes使C++程序員編寫MapReduce作業變得可能,它簡單好用,提供了用戶所需的大部分功能。
Hadoop pipes使C++程序員編寫MapReduce作業變得可能,它簡單好用,提供了用戶所需的大部分功能。
原創文章,轉載請注明:?轉載自董的博客
本文鏈接地址:?http://dongxicheng.org/mapreduce/hadoop-pipes-programming/
總結
以上是生活随笔為你收集整理的Hadoop pipes编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop pipes设计原理
- 下一篇: YARN编程实例—Unmanaged A