hadoop 入门实例【转】
原文鏈接:http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html
1、數據去重
?"數據去重"主要是為了掌握和利用并行化思想來對數據進行有意義的篩選。統計大數據集上的數據種類個數、從網站日志中計算訪問地等這些看似龐雜的任務都會涉及數據去重。下面就進入這個實例的MapReduce程序設計。
1.1 實例描述
對數據文件中的數據進行去重。數據文件中的每行都是一個數據。
樣例輸入如下所示:
?????1)file1:
?
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
?
?????2)file2:
?
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-6 c
2012-3-7 d
2012-3-3 c
?
???? 樣例輸出如下所示:
?
2012-3-1 a
2012-3-1 b
2012-3-2 a
2012-3-2 b
2012-3-3 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-6 c
2012-3-7 c
2012-3-7 d
?
1.2 設計思路
數據去重的最終目標是讓原始數據中出現次數超過一次的數據在輸出文件中只出現一次。我們自然而然會想到將同一個數據的所有記錄都交給一臺reduce機器,無論這個數據出現多少次,只要在最終結果中輸出一次就可以了。具體就是reduce的輸入應該以數據作為key,而對value-list則沒有要求。當reduce接收到一個<key,value-list>時就直接將key復制到輸出的key中,并將value設置成空值。
在MapReduce流程中,map的輸出<key,value>經過shuffle過程聚集成<key,value-list>后會交給reduce。所以從設計好的reduce輸入可以反推出map的輸出key應為數據,value任意。繼續反推,map輸出數據的key為數據,而在這個實例中每個數據代表輸入文件中的一行內容,所以map階段要完成的任務就是在采用Hadoop默認的作業輸入方式之后,將value設置為key,并直接輸出(輸出中的value任意)。map中的結果經過shuffle過程之后交給reduce。reduce階段不會管每個key有多少個value,它直接將輸入的key復制為輸出的key,并輸出就可以了(輸出中的value被設置成空了)。
1.3 程序代碼
???? 程序代碼如下所示:
?
package?com.hebut.mr;
?
import?java.io.IOException;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?Dedup {
?
????//map將輸入中的value復制到輸出數據的key上,并直接輸出
????public?static?class?Map?extends?Mapper<Object,Text,Text,Text>{
????????private?static?Text?line=new?Text();//每行數據
???????
????????//實現map函數
????????public?void?map(Object key,Text value,Context context)
????????????????throws?IOException,InterruptedException{
????????????line=value;
??????????? context.write(line,?new?Text(""));
??????? }
???????
??? }
???
????//reduce將輸入中的key復制到輸出數據的key上,并直接輸出
????public?static?class?Reduce?extends?Reducer<Text,Text,Text,Text>{
????????//實現reduce函數
????????public?void?reduce(Text key,Iterable<Text> values,Context context)
????????????????throws?IOException,InterruptedException{
??????????? context.write(key,?new?Text(""));
??????? }
???????
??? }
???
????public?static?void?main(String[] args)?throws?Exception{
??????? Configuration conf =?new?Configuration();
????????//這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
???????
??????? String[] ioArgs=new?String[]{"dedup_in","dedup_out"};
???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();
?????if?(otherArgs.length?!= 2) {
???? System.err.println("Usage: Data Deduplication <in> <out>");
???? System.exit(2);
???? }
?????
???? Job job =?new?Job(conf,?"Data Deduplication");
???? job.setJarByClass(Dedup.class);
?????
?????//設置Map、Combine和Reduce處理類
???? job.setMapperClass(Map.class);
???? job.setCombinerClass(Reduce.class);
???? job.setReducerClass(Reduce.class);
?????
?????//設置輸出類型
???? job.setOutputKeyClass(Text.class);
???? job.setOutputValueClass(Text.class);
?????
?????//設置輸入和輸出目錄
???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
???? System.exit(job.waitForCompletion(true) ? 0 : 1);
???? }
}
?
1.4 代碼結果
?????1)準備測試數據
???? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"dedup_in"文件夾(備注:"dedup_out"不需要創建。)如圖1.4-1所示,已經成功創建。
???? ????
圖1.4-1 創建"dedup_in"?????????????? ??????????????????? 圖1.4.2 上傳"file*.txt"
?
??? ?然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/dedup_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖1.4-2所示,成功上傳之后。
???? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。
?
?
??? 查看兩個文件的內容如圖1.4-3所示:
?
圖1.4-3 文件"file*.txt"內容
2)查看運行結果
???? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"dedup_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖1.4-4所示。
?
圖1.4-4 運行結果
?
??? 此時,你可以對比一下和我們之前預期的結果是否一致。
2、數據排序
"數據排序"是許多實際任務執行時要完成的第一項工作,比如學生成績評比、數據建立索引等。這個實例和數據去重類似,都是先對原始數據進行初步處理,為進一步的數據操作打好基礎。下面進入這個示例。
2.1 實例描述
??? 對輸入文件中數據進行排序。輸入文件中的每行內容均為一個數字,即一個數據。要求在輸出中每行有兩個間隔的數字,其中,第一個代表原始數據在原始數據集中的位次,第二個代表原始數據。
??? 樣例輸入:
????1)file1:
?
2
32
654
32
15
756
65223
?
????2)file2:
?
5956
22
650
92
?
????3)file3:
?
26
54
6
?
??? 樣例輸出:
?
1??? 2
2??? 6
3??? 15
4??? 22
5??? 26
6??? 32
7??? 32
8??? 54
9??? 92
10??? 650
11??? 654
12??? 756
13??? 5956
14??? 65223
?
2.2 設計思路
這個實例僅僅要求對輸入數據進行排序,熟悉MapReduce過程的讀者會很快想到在MapReduce過程中就有排序,是否可以利用這個默認的排序,而不需要自己再實現具體的排序呢?答案是肯定的。
但是在使用之前首先需要了解它的默認排序規則。它是按照key值進行排序的,如果key為封裝int的IntWritable類型,那么MapReduce按照數字大小對key排序,如果key為封裝為String的Text類型,那么MapReduce按照字典順序對字符串排序。
了解了這個細節,我們就知道應該使用封裝int的IntWritable型數據結構了。也就是在map中將讀入的數據轉化成IntWritable型,然后作為key值輸出(value任意)。reduce拿到<key,value-list>之后,將輸入的key作為value輸出,并根據value-list中元素的個數決定輸出的次數。輸出的key(即代碼中的linenum)是一個全局變量,它統計當前key的位次。需要注意的是這個程序中沒有配置Combiner,也就是在MapReduce過程中不使用Combiner。這主要是因為使用map和reduce就已經能夠完成任務了。
2.3 程序代碼
??? 程序代碼如下所示:
?
package?com.hebut.mr;
?
import?java.io.IOException;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?Sort {
?
????//map將輸入中的value化成IntWritable類型,作為輸出的key
????public?static?class?Map?extends
Mapper<Object,Text,IntWritable,IntWritable>{
????????private?static?IntWritable?data=new?IntWritable();
???????
????????//實現map函數
????????public?void?map(Object key,Text value,Context context)
????????????????throws?IOException,InterruptedException{
??????????? String line=value.toString();
????????????data.set(Integer.parseInt(line));
??????????? context.write(data,?new?IntWritable(1));
??????? }
???????
??? }
???
????//reduce將輸入中的key復制到輸出數據的key上,
????//然后根據輸入的value-list中元素的個數決定key的輸出次數
????//用全局linenum來代表key的位次
????public?static?class?Reduce?extends
??????????? Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
???????
????????private?static?IntWritable?linenum?=?new?IntWritable(1);
???????
????????//實現reduce函數
????????public?void?reduce(IntWritable key,Iterable<IntWritable> values,Context context)
????????????????throws?IOException,InterruptedException{
????????????for(IntWritable?val:values){
??????????????? context.write(linenum, key);
????????????????linenum?=?new?IntWritable(linenum.get()+1);
??????????? }
???????????
??????? }
?
??? }
???
????public?static?void?main(String[] args)?throws?Exception{
??????? Configuration conf =?new?Configuration();
????????//這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
???????
??????? String[] ioArgs=new?String[]{"sort_in","sort_out"};
???? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();
?????if?(otherArgs.length?!= 2) {
???? System.err.println("Usage: Data Sort <in> <out>");
???????? System.exit(2);
???? }
?????
???? Job job =?new?Job(conf,?"Data Sort");
???? job.setJarByClass(Sort.class);
?????
?????//設置Map和Reduce處理類
???? job.setMapperClass(Map.class);
???? job.setReducerClass(Reduce.class);
?????
?????//設置輸出類型
???? job.setOutputKeyClass(IntWritable.class);
???? job.setOutputValueClass(IntWritable.class);
?????
?????//設置輸入和輸出目錄
???? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
???? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
???? System.exit(job.waitForCompletion(true) ? 0 : 1);
???? }
}
?
2.4 代碼結果
1)準備測試數據
??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"sort_in"文件夾(備注:"sort_out"不需要創建。)如圖2.4-1所示,已經成功創建。
??????????????
圖2.4-1 創建"sort_in"????????????????????????????????????????????????? 圖2.4.2 上傳"file*.txt"
?
??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/sort_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖2.4-2所示,成功上傳之后。
??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
?
?
查看兩個文件的內容如圖2.4-3所示:
?
圖2.4-3 文件"file*.txt"內容
2)查看運行結果
??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"sort_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖2.4-4所示。
?
圖2.4-4 運行結果
3、平均成績
??? "平均成績"主要目的還是在重溫經典"WordCount"例子,可以說是在基礎上的微變化版,該實例主要就是實現一個計算學生平均成績的例子。
3.1 實例描述
對輸入文件中數據進行就算學生平均成績。輸入文件中的每行內容均為一個學生的姓名和他相應的成績,如果有多門學科,則每門學科為一個文件。要求在輸出中每行有兩個間隔的數據,其中,第一個代表學生的姓名,第二個代表其平均成績。
??? 樣本輸入:
????1)math:
?
張三??? 88
李四??? 99
王五??? 66
趙六??? 77
?
????2)china:
?
張三??? 78
李四??? 89
王五??? 96
趙六??? 67
?
????3)english:
?
張三??? 80
李四??? 82
王五??? 84
趙六??? 86
?
??? 樣本輸出:
?
張三??? 82
李四??? 90
王五??? 82
趙六??? 76
?
3.2 設計思路
??? 計算學生平均成績是一個仿"WordCount"例子,用來重溫一下開發MapReduce程序的流程。程序包括兩部分的內容:Map部分和Reduce部分,分別實現了map和reduce的功能。
????Map處理的是一個純文本文件,文件中存放的數據時每一行表示一個學生的姓名和他相應一科成績。Mapper處理的數據是由InputFormat分解過的數據集,其中InputFormat的作用是將數據集切割成小數據集InputSplit,每一個InputSlit將由一個Mapper負責處理。此外,InputFormat中還提供了一個RecordReader的實現,并將一個InputSplit解析成<key,value>對提供給了map函數。InputFormat的默認值是TextInputFormat,它針對文本文件,按行將文本切割成InputSlit,并用LineRecordReader將InputSplit解析成<key,value>對,key是行在文本中的位置,value是文件中的一行。
??? Map的結果會通過partion分發到Reducer,Reducer做完Reduce操作后,將通過以格式OutputFormat輸出。
??? Mapper最終處理的結果對<key,value>,會送到Reducer中進行合并,合并的時候,有相同key的鍵/值對則送到同一個Reducer上。Reducer是所有用戶定制Reducer類地基礎,它的輸入是key和這個key對應的所有value的一個迭代器,同時還有Reducer的上下文。Reduce的結果由Reducer.Context的write方法輸出到文件中。
3.3 程序代碼
??? 程序代碼如下所示:
?
package?com.hebut.mr;
?
import?java.io.IOException;
import?java.util.Iterator;
import?java.util.StringTokenizer;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?Score {
?
????public?static?class?Map?extends
??????????? Mapper<LongWritable, Text, Text, IntWritable> {
?
????????//?實現map函數
????????public?void?map(LongWritable?key, Text value, Context context)
????????????????throws?IOException, InterruptedException {
????????????//?將輸入的純文本文件的數據轉化成String
??????????? String line = value.toString();
?
????????????//?將輸入的數據首先按行進行分割
????????????StringTokenizer tokenizerArticle =?new?StringTokenizer(line,?"\n");
?
????????????//?分別對每一行進行處理
????????????while?(tokenizerArticle.hasMoreElements()) {
????????????????//?每行按空格劃分
??????????????? StringTokenizer tokenizerLine =?new?StringTokenizer(tokenizerArticle.nextToken());
?
??????????????? String strName = tokenizerLine.nextToken();//?學生姓名部分
??????????????? String strScore = tokenizerLine.nextToken();//?成績部分
?
??????????????? Text name =?new?Text(strName);
????????????????int?scoreInt = Integer.parseInt(strScore);
????????????????//?輸出姓名和成績
??????????????? context.write(name,?new?IntWritable(scoreInt));
??????????? }
??????? }
?
??? }
?
????public?static?class?Reduce?extends
??????????? Reducer<Text, IntWritable, Text, IntWritable> {
????????//?實現reduce函數
????????public?void?reduce(Text key, Iterable<IntWritable> values,
??????????????? Context context)?throws?IOException, InterruptedException {
?
????????????int?sum = 0;
????????????int?count = 0;
?
??????????? Iterator<IntWritable> iterator = values.iterator();
????????????while?(iterator.hasNext()) {
??????????????? sum += iterator.next().get();//?計算總分
??????????????? count++;//?統計總的科目數
??????????? }
?
????????????int?average = (int) sum / count;//?計算平均成績
??????????? context.write(key,?new?IntWritable(average));
??????? }
?
??? }
?
????public?static?void?main(String[] args)?throws?Exception {
??????? Configuration conf =?new?Configuration();
????????//?這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
?
??????? String[] ioArgs =?new?String[] {?"score_in",?"score_out"?};
??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();
????????if?(otherArgs.length?!= 2) {
??????????? System.err.println("Usage: Score Average <in> <out>");
??????????? System.exit(2);
??????? }
?
??????? Job job =?new?Job(conf,?"Score Average");
??????? job.setJarByClass(Score.class);
?
????????//?設置Map、Combine和Reduce處理類
??????? job.setMapperClass(Map.class);
??????? job.setCombinerClass(Reduce.class);
??????? job.setReducerClass(Reduce.class);
?
????????//?設置輸出類型
??????? job.setOutputKeyClass(Text.class);
??????? job.setOutputValueClass(IntWritable.class);
?
????????//?將輸入的數據集分割成小數據塊splites,提供一個RecordReder的實現
??????? job.setInputFormatClass(TextInputFormat.class);
????????//?提供一個RecordWriter的實現,負責數據輸出
??????? job.setOutputFormatClass(TextOutputFormat.class);
?
????????//?設置輸入和輸出目錄
??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
??????? System.exit(job.waitForCompletion(true) ? 0 : 1);
??? }
}
?
3.4 代碼結果
1)準備測試數據
??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"score_in"文件夾(備注:"score_out"不需要創建。)如圖3.4-1所示,已經成功創建。
?
????? ???? ?
圖3.4-1 創建"score_in"????????????????????????????????????????????????????? ?圖3.4.2 上傳三門分數
?
??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/score_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖3.4-2所示,成功上傳之后。
????備注:文本文件的編碼為"UTF-8",默認為"ANSI",可以另存為時選擇,不然中文會出現亂碼。
??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
?
?
查看三個文件的內容如圖3.4-3所示:
?
圖3.4.3 三門成績的內容
2)查看運行結果
??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"score_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖3.4-4所示。
?
圖3.4-4 運行結果
4、單表關聯
??? 前面的實例都是在數據上進行一些簡單的處理,為進一步的操作打基礎。"單表關聯"這個實例要求從給出的數據中尋找所關心的數據,它是對原始數據所包含信息的挖掘。下面進入這個實例。
4.1 實例描述
??? 實例中給出child-parent(孩子——父母)表,要求輸出grandchild-grandparent(孫子——爺奶)表。
??? 樣例輸入如下所示。
????file:
?
child??????? parent
Tom??????? Lucy
Tom??????? Jack
Jone??????? Lucy
Jone??????? Jack
Lucy??????? Mary
Lucy??????? Ben
Jack??????? Alice
Jack??????? Jesse
Terry??????? Alice
Terry??????? Jesse
Philip??????? Terry
Philip??????? Alma
Mark??????? Terry
Mark??????? Alma
?
??? 家族樹狀關系譜:
?
?
圖4.2-1 家族譜
??? 樣例輸出如下所示。
????file:
?
grandchild??????? grandparent
Tom??????????? Alice
Tom??????????? Jesse
Jone??????????? Alice
Jone??????????? Jesse
Tom??????????? Mary
Tom??????????? Ben
Jone??????????? Mary
Jone??????????? Ben
Philip?????????? ? Alice
Philip??????????? Jesse
Mark??????????? Alice
Mark??????????? Jesse
?
4.2 設計思路
?????? 分析這個實例,顯然需要進行單表連接,連接的是左表的parent列和右表的child列,且左表和右表是同一個表。
連接結果中除去連接的兩列就是所需要的結果——"grandchild--grandparent"表。要用MapReduce解決這個實例,首先應該考慮如何實現表的自連接;其次就是連接列的設置;最后是結果的整理。
????? 考慮到MapReduce的shuffle過程會將相同的key會連接在一起,所以可以將map結果的key設置成待連接的列,然后列中相同的值就自然會連接在一起了。再與最開始的分析聯系起來:
要連接的是左表的parent列和右表的child列,且左表和右表是同一個表,所以在map階段將讀入數據分割成child和parent之后,會將parent設置成key,child設置成value進行輸出,并作為左表;再將同一對child和parent中的child設置成key,parent設置成value進行輸出,作為右表。為了區分輸出中的左右表,需要在輸出的value中再加上左右表的信息,比如在value的String最開始處加上字符1表示左表,加上字符2表示右表。這樣在map的結果中就形成了左表和右表,然后在shuffle過程中完成連接。reduce接收到連接的結果,其中每個key的value-list就包含了"grandchild--grandparent"關系。取出每個key的value-list進行解析,將左表中的child放入一個數組,右表中的parent放入一個數組,然后對兩個數組求笛卡爾積就是最后的結果了。
4.3 程序代碼
??? 程序代碼如下所示。
?
package?com.hebut.mr;
?
import?java.io.IOException;
import?java.util.*;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?STjoin {
?
????public?static?int?time?= 0;
?
????/*
???? * map將輸出分割child和parent,然后正序輸出一次作為右表,
???? *?反序輸出一次作為左表,需要注意的是在輸出的value中必須
???? *?加上左右表的區別標識。
???? */
????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {
?
????????//?實現map函數
????????public?void?map(Object key, Text value, Context context)
????????????????throws?IOException, InterruptedException {
??????????? String childname =?new?String();//?孩子名稱
??????????? String parentname =?new?String();//?父母名稱
??????????? String relationtype =?new?String();//?左右表標識
?
????????????//?輸入的一行預處理文本
??????????? StringTokenizer itr=new?StringTokenizer(value.toString());
??????????? String[] values=new?String[2];
????????????int?i=0;
????????????while(itr.hasMoreTokens()){
??????????????? values[i]=itr.nextToken();
??????????????? i++;
??????????? }
???????????
????????????if?(values[0].compareTo("child") != 0) {
??????????????? childname = values[0];
??????????????? parentname = values[1];
?
????????????????//?輸出左表
??????????????? relationtype =?"1";
??????????????? context.write(new?Text(values[1]),?new?Text(relationtype +
????????????????????????"+"+ childname +?"+"?+ parentname));
?
????????????????//?輸出右表
??????????????? relationtype =?"2";
??????????????? context.write(new?Text(values[0]),?new?Text(relationtype +
????????????????????????"+"+ childname +?"+"?+ parentname));
??????????? }
??????? }
?
??? }
?
????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {
?
????????//?實現reduce函數
????????public?void?reduce(Text key, Iterable<Text> values, Context context)
????????????????throws?IOException, InterruptedException {
?
????????????//?輸出表頭
????????????if?(0 ==?time) {
????????????????context.write(new?Text("grandchild"),?new?Text("grandparent"));
????????????????time++;
??????????? }
?
????????????int?grandchildnum = 0;
??????????? String[] grandchild =?new?String[10];
????????????int?grandparentnum = 0;
??????????? String[] grandparent =?new?String[10];
?
????????????Iterator?ite = values.iterator();
????????????while?(ite.hasNext()) {
??????????????? String record = ite.next().toString();
????????????????int?len = record.length();
????????????????int?i = 2;
????????????????if?(0 == len) {
????????????????????continue;
??????????????? }
?
????????????????//?取得左右表標識
????????????????char?relationtype = record.charAt(0);
????????????????//?定義孩子和父母變量
??????????????? String childname =?new?String();
??????????????? String parentname =?new?String();
?
????????????????//?獲取value-list中value的child
????????????????while?(record.charAt(i) !=?'+') {
??????????????????? childname += record.charAt(i);
??????????????????? i++;
??????????????? }
?
??????????????? i = i + 1;
?
????????????????//?獲取value-list中value的parent
????????????????while?(i < len) {
??????????????????? parentname += record.charAt(i);
??????????????????? i++;
??????????????? }
?
????????????????//?左表,取出child放入grandchildren
????????????????if?('1'?== relationtype) {
??????????????????? grandchild[grandchildnum] = childname;
??????????????????? grandchildnum++;
??????????????? }
?
????????????????//?右表,取出parent放入grandparent
????????????????if?('2'?== relationtype) {
??????????????????? grandparent[grandparentnum] = parentname;
??????????????????? grandparentnum++;
??????????????? }
??????????? }
?
????????????//?grandchild和grandparent數組求笛卡爾兒積
????????????if?(0 != grandchildnum && 0 != grandparentnum) {
????????????????for?(int?m = 0; m < grandchildnum; m++) {
????????????????????for?(int?n = 0; n < grandparentnum; n++) {
????????????????????????//?輸出結果
??????????????????????? context.write(new?Text(grandchild[m]),?new?Text(grandparent[n]));
??????????????????? }
??????????????? }
??????????? }
??????? }
??? }
?
????public?static?void?main(String[] args)?throws?Exception {
??????? Configuration conf =?new?Configuration();
????????//?這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
?
??????? String[] ioArgs =?new?String[] {?"STjoin_in",?"STjoin_out"?};
??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();
????????if?(otherArgs.length?!= 2) {
??????????? System.err.println("Usage: Single Table Join <in> <out>");
??????????? System.exit(2);
??????? }
?
??????? Job job =?new?Job(conf,?"Single Table Join");
??????? job.setJarByClass(STjoin.class);
?
????????//?設置Map和Reduce處理類
??????? job.setMapperClass(Map.class);
??????? job.setReducerClass(Reduce.class);
?
????????//?設置輸出類型
??????? job.setOutputKeyClass(Text.class);
??????? job.setOutputValueClass(Text.class);
?
????????//?設置輸入和輸出目錄
??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
??????? System.exit(job.waitForCompletion(true) ? 0 : 1);
??? }
}
?
4.4 代碼結果
1)準備測試數據
??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"STjoin_in"文件夾(備注:"STjoin_out"不需要創建。)如圖4.4-1所示,已經成功創建。
?
???????????????? ?
圖4.4-1 創建"STjoin_in"?????????????????????????????????????? 圖4.4.2 上傳"child-parent"表
?
??? 然后在本地建立一個txt文件,通過Eclipse上傳到"/user/hadoop/STjoin_in"文件夾中,一個txt文件的內容如"實例描述"那個文件一樣。如圖4.4-2所示,成功上傳之后。
??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的文件,顯示其內容如圖4.4-3所示:
?
圖4.4-3 表"child-parent"內容
????2)運行詳解
????(1)Map處理:
??? map函數輸出結果如下所示。
?
child??????? parent????????????????àà??????????????????? 忽略此行
Tom??????? Lucy???????????????????àà??????????????? <Lucy,1+Tom+Lucy>
??????????????????????????????????????????? <Tom,2+Tom+Lucy >
Tom??????? Jack????????????????????àà??????????????? <Jack,1+Tom+Jack>
??????????????????????????????????????????? <Tom,2+Tom+Jack>
Jone??????? Lucy??????????????? àà??????????????? <Lucy,1+Jone+Lucy>
??????????????????????????????????????????? <Jone,2+Jone+Lucy>
Jone??????? Jack????????????????????àà??????????????? <Jack,1+Jone+Jack>
??????????????????????????????????????????? <Jone,2+Jone+Jack>
Lucy??????? Mary???????????????????àà??????????????? <Mary,1+Lucy+Mary>
??????????????????????????????????????????? <Lucy,2+Lucy+Mary>
Lucy??????? Ben????????????????????àà??????????????? <Ben,1+Lucy+Ben>
??????????????????????????????????????????? <Lucy,2+Lucy+Ben>
Jack??????? Alice????????????????????àà??????????????? <Alice,1+Jack+Alice>
??????????????????????????????????????????? <Jack,2+Jack+Alice>
Jack??????? Jesse???????????????????àà??????????????? <Jesse,1+Jack+Jesse>
??????????????????????????????????????????? <Jack,2+Jack+Jesse>
Terry??????? Alice???????????????????àà??????????????? <Alice,1+Terry+Alice>
??????????????????????????????????????????? <Terry,2+Terry+Alice>
Terry??????? Jesse??????????????????àà??????????????? <Jesse,1+Terry+Jesse>
??????????????????????????????????????????? <Terry,2+Terry+Jesse>
Philip??????? Terry??????????????????àà??????????????? <Terry,1+Philip+Terry>
??????????????????????????????????????????? <Philip,2+Philip+Terry>
Philip??????? Alma???????????????????àà??????????????? <Alma,1+Philip+Alma>
??????????????????????????????????????????? <Philip,2+Philip+Alma>
Mark??????? Terry???????????????????àà??????????????? <Terry,1+Mark+Terry>
??????????????????????????????????????????? <Mark,2+Mark+Terry>
Mark??????? Alma??????????????? àà??????????????? <Alma,1+Mark+Alma>
??????????????????????????????????????????? <Mark,2+Mark+Alma>
?
????(2)Shuffle處理
??? 在shuffle過程中完成連接。
?
| map函數輸出 | 排序結果 | shuffle連接 |
| <Lucy,1+Tom+Lucy> <Tom,2+Tom+Lucy> <Jack,1+Tom+Jack> <Tom,2+Tom+Jack> <Lucy,1+Jone+Lucy> <Jone,2+Jone+Lucy> <Jack,1+Jone+Jack> <Jone,2+Jone+Jack> <Mary,1+Lucy+Mary> <Lucy,2+Lucy+Mary> <Ben,1+Lucy+Ben> <Lucy,2+Lucy+Ben> <Alice,1+Jack+Alice> <Jack,2+Jack+Alice> <Jesse,1+Jack+Jesse> <Jack,2+Jack+Jesse> <Alice,1+Terry+Alice> <Terry,2+Terry+Alice> <Jesse,1+Terry+Jesse> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Philip,2+Philip+Terry> <Alma,1+Philip+Alma> <Philip,2+Philip+Alma> <Terry,1+Mark+Terry> <Mark,2+Mark+Terry> <Alma,1+Mark+Alma> <Mark,2+Mark+Alma> | <Alice,1+Jack+Alice> <Alice,1+Terry+Alice> <Alma,1+Philip+Alma> <Alma,1+Mark+Alma> <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack> <Jack,1+Jone+Jack> <Jack,2+Jack+Alice> <Jack,2+Jack+Jesse> <Jesse,1+Jack+Jesse> <Jesse,1+Terry+Jesse> <Jone,2+Jone+Lucy> <Jone,2+Jone+Jack> <Lucy,1+Tom+Lucy> <Lucy,1+Jone+Lucy> <Lucy,2+Lucy+Mary> <Lucy,2+Lucy+Ben> <Mary,1+Lucy+Mary> <Mark,2+Mark+Terry> <Mark,2+Mark+Alma> <Philip,2+Philip+Terry> <Philip,2+Philip+Alma> <Terry,2+Terry+Alice> <Terry,2+Terry+Jesse> <Terry,1+Philip+Terry> <Terry,1+Mark+Terry> <Tom,2+Tom+Lucy> <Tom,2+Tom+Jack> | <Alice,1+Jack+Alice, ??????? 1+Terry+Alice?, ??????? 1+Philip+Alma, ??????? 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jack,1+Tom+Jack, ??????? 1+Jone+Jack, ??????? 2+Jack+Alice, ??????? 2+Jack+Jesse > <Jesse,1+Jack+Jesse, ??????? 1+Terry+Jesse > <Jone,2+Jone+Lucy, ??????? 2+Jone+Jack> <Lucy,1+Tom+Lucy, ??????? 1+Jone+Lucy, ??????? 2+Lucy+Mary, ??????? 2+Lucy+Ben> <Mary,1+Lucy+Mary, ??????? 2+Mark+Terry, ??????? 2+Mark+Alma> <Philip,2+Philip+Terry, ??????? 2+Philip+Alma> <Terry,2+Terry+Alice, ??????? 2+Terry+Jesse, ??????? 1+Philip+Terry, ??????? 1+Mark+Terry> <Tom,2+Tom+Lucy, ??????? 2+Tom+Jack> |
?
????(3)Reduce處理
????首先由語句"0 != grandchildnum && 0 != grandparentnum"得知,只要在"value-list"中沒有左表或者右表,則不會做處理,可以根據這條規則去除無效的shuffle連接。
?
| 無效的shuffle連接 | 有效的shuffle連接 |
| <Alice,1+Jack+Alice, ??????? 1+Terry+Alice?, ??????? 1+Philip+Alma, ??????? 1+Mark+Alma > <Ben,1+Lucy+Ben> <Jesse,1+Jack+Jesse, ??????? 1+Terry+Jesse > <Jone,2+Jone+Lucy, ??????? 2+Jone+Jack> <Mary,1+Lucy+Mary, ??????? 2+Mark+Terry, ??????? 2+Mark+Alma> <Philip,2+Philip+Terry, ??????? 2+Philip+Alma> <Tom,2+Tom+Lucy, ??????? 2+Tom+Jack> | <Jack,1+Tom+Jack, ??????? 1+Jone+Jack, ??????? 2+Jack+Alice, ??????? 2+Jack+Jesse > <Lucy,1+Tom+Lucy, ??????? 1+Jone+Lucy, ??????? 2+Lucy+Mary, ??????? 2+Lucy+Ben> <Terry,2+Terry+Alice, ??????? 2+Terry+Jesse, ??????? 1+Philip+Terry, ??????? 1+Mark+Terry> |
??? 然后根據下面語句進一步對有效的shuffle連接做處理。
?
// 左表,取出child放入grandchildren
if ('1' == relationtype) {
??? grandchild[grandchildnum] = childname;
??? grandchildnum++;
}
?
// 右表,取出parent放入grandparent
if ('2' == relationtype) {
??? grandparent[grandparentnum] = parentname;
??? grandparentnum++;
}
?
??? 針對一條數據進行分析:
?
<Jack,1+Tom+Jack,
??????? 1+Jone+Jack,
??????? 2+Jack+Alice,
??????? 2+Jack+Jesse >
?
????分析結果:左表用"字符1"表示,右表用"字符2"表示,上面的<key,value-list>中的"key"表示左表與右表的連接鍵。而"value-list"表示以"key"連接的左表與右表的相關數據。
??? 根據上面針對左表與右表不同的處理規則,取得兩個數組的數據如下所示:
?
| grandchild | Tom、Jone(grandchild[grandchildnum] = childname;) |
| grandparent | Alice、Jesse(grandparent[grandparentnum] = parentname;) |
????
??? 然后根據下面語句進行處理。
?
for (int m = 0; m < grandchildnum; m++) {
??? for (int n = 0; n < grandparentnum; n++) {
??????? context.write(new Text(grandchild[m]), new Text(grandparent[n]));
??? }
}
?
??
?
處理結果如下面所示:
?
| Tom??????? Jesse Tom??????? Alice Jone??????? Jesse Jone??????? Alice? |
??? 其他的有效shuffle連接處理都是如此。
3)查看運行結果
??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"STjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖4.4-4所示。
?
圖4.4-4 運行結果
5、多表關聯
????多表關聯和單表關聯類似,它也是通過對原始數據進行一定的處理,從其中挖掘出關心的信息。下面進入這個實例。
5.1 實例描述
??? 輸入是兩個文件,一個代表工廠表,包含工廠名列和地址編號列;另一個代表地址表,包含地址名列和地址編號列。要求從輸入數據中找出工廠名和地址名的對應關系,輸出"工廠名——地址名"表。
??? 樣例輸入如下所示。
????1)factory:
?
factoryname??????????????? addressed
Beijing Red Star??????????????? 1
Shenzhen Thunder??????????? 3
Guangzhou Honda??????????? 2
Beijing Rising?????????????????? 1
Guangzhou Development Bank??????2
Tencent??????????????? 3
Back of Beijing??????????????? 1
?
????2)address:
?
addressID??? addressname
1??????? Beijing
2??????? Guangzhou
3??????? Shenzhen
4??????? Xian
?
??? 樣例輸出如下所示。
?
factoryname??????????????????? addressname
Back of Beijing??????????????????? ? Beijing
Beijing Red Star??????????????????? Beijing
Beijing Rising??????????????????? Beijing
Guangzhou Development Bank??????????Guangzhou
Guangzhou Honda??????????????? Guangzhou
Shenzhen Thunder??????????????? Shenzhen
Tencent??????????????????? Shenzhen
?
5.2 設計思路
??? 多表關聯和單表關聯相似,都類似于數據庫中的自然連接。相比單表關聯,多表關聯的左右表和連接列更加清楚。所以可以采用和單表關聯的相同的處理方式,map識別出輸入的行屬于哪個表之后,對其進行分割,將連接的列值保存在key中,另一列和左右表標識保存在value中,然后輸出。reduce拿到連接結果之后,解析value內容,根據標志將左右表內容分開存放,然后求笛卡爾積,最后直接輸出。
??? 這個實例的具體分析參考單表關聯實例。下面給出代碼。
5.3 程序代碼
??? 程序代碼如下所示:
?
package?com.hebut.mr;
?
import?java.io.IOException;
import?java.util.*;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?MTjoin {
?
????public?static?int?time?= 0;
?
????/*
???? *?在map中先區分輸入行屬于左表還是右表,然后對兩列值進行分割,
???? *?保存連接列在key值,剩余列和左右表標志在value中,最后輸出
???? */
????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {
?
????????//?實現map函數
????????public?void?map(Object key, Text value, Context context)
????????????????throws?IOException, InterruptedException {
??????????? String line = value.toString();//?每行文件
??????????? String relationtype =?new?String();//?左右表標識
?
????????????//?輸入文件首行,不處理
????????????if?(line.contains("factoryname") ==?true
??????????????????? || line.contains("addressed") ==?true) {
????????????????return;
??????????? }
?
????????????//?輸入的一行預處理文本
??????????? StringTokenizer itr =?new?StringTokenizer(line);
??????????? String mapkey =?new?String();
??????????? String mapvalue =?new?String();
????????????int?i = 0;
????????????while?(itr.hasMoreTokens()) {
????????????????//?先讀取一個單詞
??????????????? String token = itr.nextToken();
????????????????//?判斷該地址ID就把存到"values[0]"
????????????????if?(token.charAt(0) >=?'0'?&& token.charAt(0) <=?'9') {
??????????????????? mapkey = token;
????????????????????if?(i > 0) {
??????????????????????? relationtype =?"1";
??????????????????? }?else?{
??????????????????????? relationtype =?"2";
??????????????????? }
????????????????????continue;
??????????????? }
?
????????????????//?存工廠名
??????????????? mapvalue += token +?" ";
??????????????? i++;
??????????? }
?
????????????//?輸出左右表
??????????? context.write(new?Text(mapkey),?new?Text(relationtype +?"+"+ mapvalue));
??????? }
??? }
?
????/*
???? * reduce解析map輸出,將value中數據按照左右表分別保存,
*?然后求出笛卡爾積,并輸出。
???? */
????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {
?
????????//?實現reduce函數
????????public?void?reduce(Text key, Iterable<Text> values, Context context)
????????????????throws?IOException, InterruptedException {
?
????????????//?輸出表頭
????????????if?(0 ==?time) {
????????????????context.write(new?Text("factoryname"),?new?Text("addressname"));
????????????????time++;
??????????? }
?
????????????int?factorynum = 0;
??????????? String[] factory =?new?String[10];
????????????int?addressnum = 0;
??????????? String[]?address?=?new?String[10];
?
????????????Iterator?ite = values.iterator();
????????????while?(ite.hasNext()) {
??????????????? String record = ite.next().toString();
????????????????int?len = record.length();
????????????????int?i = 2;
????????????????if?(0 == len) {
????????????????????continue;
??????????????? }
?
????????????????//?取得左右表標識
????????????????char?relationtype = record.charAt(0);
?
????????????????//?左表
????????????????if?('1'?== relationtype) {
??????????????????? factory[factorynum] = record.substring(i);
??????????????????? factorynum++;
??????????????? }
?
????????????????//?右表
????????????????if?('2'?== relationtype) {
????????????????????address[addressnum] = record.substring(i);
??????????????????? addressnum++;
??????????????? }
??????????? }
?
????????????//?求笛卡爾積
????????????if?(0 != factorynum && 0 != addressnum) {
????????????????for?(int?m = 0; m < factorynum; m++) {
????????????????????for?(int?n = 0; n < addressnum; n++) {
????????????????????????//?輸出結果
??????????????????????? context.write(new?Text(factory[m]),
????????????????????????????????new?Text(address[n]));
??????????????????? }
??????????????? }
??????????? }
?
??????? }
??? }
?
????public?static?void?main(String[] args)?throws?Exception {
??????? Configuration conf =?new?Configuration();
????????//?這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
?
??????? String[] ioArgs =?new?String[] {?"MTjoin_in",?"MTjoin_out"?};
??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs).getRemainingArgs();
????????if?(otherArgs.length?!= 2) {
??????????? System.err.println("Usage: Multiple Table Join <in> <out>");
??????????? System.exit(2);
??????? }
?
??????? Job job =?new?Job(conf,?"Multiple Table Join");
??????? job.setJarByClass(MTjoin.class);
?
????????//?設置Map和Reduce處理類
??????? job.setMapperClass(Map.class);
??????? job.setReducerClass(Reduce.class);
?
????????//?設置輸出類型
??????? job.setOutputKeyClass(Text.class);
??????? job.setOutputValueClass(Text.class);
?
????????//?設置輸入和輸出目錄
??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
??????? System.exit(job.waitForCompletion(true) ? 0 : 1);
??? }
}
?
5.4 代碼結果
1)準備測試數據
??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"MTjoin_in"文件夾(備注:"MTjoin_out"不需要創建。)如圖5.4-1所示,已經成功創建。
?
???????? ?????? ?
圖5.4-1 創建"MTjoin_in"??????????????????????????????????????????????????????????? ?圖5.4.2 上傳兩個數據表
?
??? 然后在本地建立兩個txt文件,通過Eclipse上傳到"/user/hadoop/MTjoin_in"文件夾中,兩個txt文件的內容如"實例描述"那兩個文件一樣。如圖5.4-2所示,成功上傳之后。
??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的兩個文件。
?
圖5.4.3 兩個數據表的內容
2)查看運行結果
??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"MTjoin_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖5.4-4所示。
?
圖5.4-4 運行結果
6、倒排索引
??? "倒排索引"是文檔檢索系統中最常用的數據結構,被廣泛地應用于全文搜索引擎。它主要是用來存儲某個單詞(或詞組)在一個文檔或一組文檔中的存儲位置的映射,即提供了一種根據內容來查找文檔的方式。由于不是根據文檔來確定文檔所包含的內容,而是進行相反的操作,因而稱為倒排索引(Inverted Index)。
6.1 實例描述
??? 通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的ID號,或者是指文檔所在位置的URL,如圖6.1-1所示。
?
圖6.1-1 倒排索引結構
??? 從圖6.1-1可以看出,單詞1出現在{文檔1,文檔4,文檔13,……}中,單詞2出現在{文檔3,文檔5,文檔15,……}中,而單詞3出現在{文檔1,文檔8,文檔20,……}中。在實際應用中,還需要給每個文檔添加一個權值,用來指出每個文檔與搜索內容的相關度,如圖6.1-2所示。
?
?
圖6.1-2 添加權重的倒排索引
??? 最常用的是使用詞頻作為權重,即記錄單詞在文檔中出現的次數。以英文為例,如圖6.1-3所示,索引文件中的"MapReduce"一行表示:"MapReduce"這個單詞在文本T0中出現過1次,T1中出現過1次,T2中出現過2次。當搜索條件為"MapReduce"、"is"、"Simple"時,對應的集合為:{T0,T1,T2}∩{T0,T1}∩{T0,T1}={T0,T1},即文檔T0和T1包含了所要索引的單詞,而且只有T0是連續的。
?
?
圖6.1-3 倒排索引示例
??? 更復雜的權重還可能要記錄單詞在多少個文檔中出現過,以實現TF-IDF(Term Frequency-Inverse Document Frequency)算法,或者考慮單詞在文檔中的位置信息(單詞是否出現在標題中,反映了單詞在文檔中的重要性)等。
??? 樣例輸入如下所示。
????1)file1:
?
MapReduce is simple
?
????2)file2:
?
MapReduce is powerful is simple
?
????3)file3:
?
Hello MapReduce bye MapReduce
?
??? 樣例輸出如下所示。
?
MapReduce????? file1.txt:1;file2.txt:1;file3.txt:2;
is??????? file1.txt:1;file2.txt:2;
simple??????? ? file1.txt:1;file2.txt:1;
powerful??? file2.txt:1;
Hello??????? file3.txt:1;
bye??????? ?? file3.txt:1;
?
6.2 設計思路
??? 實現"倒排索引"只要關注的信息為:單詞、文檔URL及詞頻,如圖3-11所示。但是在實現過程中,索引文件的格式與圖6.1-3會略有所不同,以避免重寫OutPutFormat類。下面根據MapReduce的處理過程給出倒排索引的設計思路。
????1)Map過程
??? 首先使用默認的TextInputFormat類對輸入文件進行處理,得到文本中每行的偏移量及其內容。顯然,Map過程首先必須分析輸入的<key,value>對,得到倒排索引中需要的三個信息:單詞、文檔URL和詞頻,如圖6.2-1所示。
?
圖6.2-1 Map過程輸入/輸出
?
這里存在兩個問題:第一,<key,value>對只能有兩個值,在不使用Hadoop自定義數據類型的情況下,需要根據情況將其中兩個值合并成一個值,作為key或value值;第二,通過一個Reduce過程無法同時完成詞頻統計和生成文檔列表,所以必須增加一個Combine過程完成詞頻統計。
??? 這里講單詞和URL組成key值(如"MapReduce:file1.txt"),將詞頻作為value,這樣做的好處是可以利用MapReduce框架自帶的Map端排序,將同一文檔的相同單詞的詞頻組成列表,傳遞給Combine過程,實現類似于WordCount的功能。
????2)Combine過程
??? 經過map方法處理后,Combine過程將key值相同的value值累加,得到一個單詞在文檔在文檔中的詞頻,如圖6.2-2所示。如果直接將圖6.2-2所示的輸出作為Reduce過程的輸入,在Shuffle過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、URL和詞頻組成)應該交由同一個Reducer處理,但當前的key值無法保證這一點,所以必須修改key值和value值。這次將單詞作為key值,URL和詞頻組成value值(如"file1.txt:1")。這樣做的好處是可以利用MapReduce框架默認的HashPartitioner類完成Shuffle過程,將相同單詞的所有記錄發送給同一個Reducer進行處理。
?
?
圖6.2-2 Combine過程輸入/輸出
????3)Reduce過程
??? 經過上述兩個過程后,Reduce過程只需將相同key值的value值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給MapReduce框架進行處理了。如圖6.2-3所示。索引文件的內容除分隔符外與圖6.1-3解釋相同。
????4)需要解決的問題
??? 本實例設計的倒排索引在文件數目上沒有限制,但是單詞文件不宜過大(具體值與默認HDFS塊大小及相關配置有關),要保證每個文件對應一個split。否則,由于Reduce過程沒有進一步統計詞頻,最終結果可能會出現詞頻未統計完全的單詞。可以通過重寫InputFormat類將每個文件為一個split,避免上述情況。或者執行兩次MapReduce,第一次MapReduce用于統計詞頻,第二次MapReduce用于生成倒排索引。除此之外,還可以利用復合鍵值對等實現包含更多信息的倒排索引。
?
?
圖6.2-3 Reduce過程輸入/輸出
6.3 程序代碼
程序代碼如下所示:
?
package?com.hebut.mr;
?
import?java.io.IOException;
import?java.util.StringTokenizer;
?
import?org.apache.hadoop.conf.Configuration;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.IntWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.mapreduce.Job;
import?org.apache.hadoop.mapreduce.Mapper;
import?org.apache.hadoop.mapreduce.Reducer;
import?org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import?org.apache.hadoop.mapreduce.lib.input.FileSplit;
import?org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import?org.apache.hadoop.util.GenericOptionsParser;
?
public?class?InvertedIndex {
?
????public?static?class?Map?extends?Mapper<Object, Text, Text, Text> {
?
????????private?Text?keyInfo?=?new?Text();?//?存儲單詞和URL組合
????????private?Text?valueInfo?=?new?Text();?//?存儲詞頻
????????private?FileSplit?split;?//?存儲Split對象
?
????????//?實現map函數
????????public?void?map(Object key, Text value, Context context)
????????????????throws?IOException, InterruptedException {
?
????????????//?獲得<key,value>對所屬的FileSplit對象
????????????split?= (FileSplit) context.getInputSplit();
?
??????????? StringTokenizer itr =?new?StringTokenizer(value.toString());
?
????????????while?(itr.hasMoreTokens()) {
????????????????// key值由單詞和URL組成,如"MapReduce:file1.txt"
????????????????//?獲取文件的完整路徑
????????????????//?keyInfo.set(itr.nextToken()+":"+split.getPath().toString());
????????????????//?這里為了好看,只獲取文件的名稱。
????????????????int?splitIndex =?split.getPath().toString().indexOf("file");
????????????????keyInfo.set(itr.nextToken() +?":"
??????????????????? +?split.getPath().toString().substring(splitIndex));
????????????????//?詞頻初始化為1
????????????????valueInfo.set("1");
?
??????????????? context.write(keyInfo,?valueInfo);
??????????? }
??????? }
??? }
?
????public?static?class?Combine?extends?Reducer<Text, Text, Text, Text> {
?
????????private?Text?info?=?new?Text();
?
????????//?實現reduce函數
????????public?void?reduce(Text key, Iterable<Text> values, Context context)
????????????????throws?IOException, InterruptedException {
?
????????????//?統計詞頻
????????????int?sum = 0;
????????????for?(Text value : values) {
??????????????? sum += Integer.parseInt(value.toString());
??????????? }
?
????????????int?splitIndex = key.toString().indexOf(":");
????????????//?重新設置value值由URL和詞頻組成
????????????info.set(key.toString().substring(splitIndex + 1) +?":"?+ sum);
????????????//?重新設置key值為單詞
??????????? key.set(key.toString().substring(0, splitIndex));
?
??????????? context.write(key,?info);
??????? }
??? }
?
????public?static?class?Reduce?extends?Reducer<Text, Text, Text, Text> {
?
????????private?Text?result?=?new?Text();
?
????????//?實現reduce函數
????????public?void?reduce(Text key, Iterable<Text> values, Context context)
????????????????throws?IOException, InterruptedException {
?
????????????//?生成文檔列表
??????????? String fileList =?new?String();
????????????for?(Text value : values) {
??????????????? fileList += value.toString() +?";";
??????????? }
?
????????????result.set(fileList);
?
??????????? context.write(key,?result);
??????? }
??? }
?
????public?static?void?main(String[] args)?throws?Exception {
??????? Configuration conf =?new?Configuration();
????????//?這句話很關鍵
??????? conf.set("mapred.job.tracker",?"192.168.1.2:9001");
?
??????? String[] ioArgs =?new?String[] {?"index_in",?"index_out"?};
??????? String[] otherArgs =?new?GenericOptionsParser(conf, ioArgs)
??????????????? .getRemainingArgs();
????????if?(otherArgs.length?!= 2) {
??????????? System.err.println("Usage: Inverted Index <in> <out>");
??????????? System.exit(2);
??????? }
?
??????? Job job =?new?Job(conf,?"Inverted Index");
??????? job.setJarByClass(InvertedIndex.class);
?
????????//?設置Map、Combine和Reduce處理類
??????? job.setMapperClass(Map.class);
??????? job.setCombinerClass(Combine.class);
??????? job.setReducerClass(Reduce.class);
?
????????//?設置Map輸出類型
??????? job.setMapOutputKeyClass(Text.class);
??????? job.setMapOutputValueClass(Text.class);
?
????????//?設置Reduce輸出類型
??????? job.setOutputKeyClass(Text.class);
??????? job.setOutputValueClass(Text.class);
?
????????//?設置輸入和輸出目錄
??????? FileInputFormat.addInputPath(job,?new?Path(otherArgs[0]));
??????? FileOutputFormat.setOutputPath(job,?new?Path(otherArgs[1]));
??????? System.exit(job.waitForCompletion(true) ? 0 : 1);
??? }
}
?
6.4 代碼結果
1)準備測試數據
??? 通過Eclipse下面的"DFS Locations"在"/user/hadoop"目錄下創建輸入文件"index_in"文件夾(備注:"index_out"不需要創建。)如圖6.4-1所示,已經成功創建。
?
????????????????
圖6.4-1 創建"index_in"???????????????????????????????????????????? 圖6.4.2 上傳"file*.txt"
?
??? 然后在本地建立三個txt文件,通過Eclipse上傳到"/user/hadoop/index_in"文件夾中,三個txt文件的內容如"實例描述"那三個文件一樣。如圖6.4-2所示,成功上傳之后。
??? 從SecureCRT遠處查看"Master.Hadoop"的也能證實我們上傳的三個文件。
?
圖6.4.3 三個"file*.txt"的內容
2)查看運行結果
??? 這時我們右擊Eclipse的"DFS Locations"中"/user/hadoop"文件夾進行刷新,這時會發現多出一個"index_out"文件夾,且里面有3個文件,然后打開雙其"part-r-00000"文件,會在Eclipse中間把內容顯示出來。如圖6.4-4所示。
?
圖6.4-4 運行結果
?
?
文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.9.rar
總結
以上是生活随笔為你收集整理的hadoop 入门实例【转】的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 孕妇梦到很多乌龟是什么意思
- 下一篇: 梦到床上有青蛙是怎么回事