Hadoop倒排索引原理解析
倒排索引源于實際應用中需要根據屬性的值來查找記錄,這種索引表中的每一項都包括一個屬性值和具有該屬性值的各記錄的地址,由于不是由記錄來確定屬性值,而是由屬性值來確定記錄的位置,因而稱為倒排索引,我查閱了一些資料去學習了解它,下面是我自己對倒排索引的理解
假設我現在有兩個文件 123.txt 和 456.txt,內容分別是
123.txt
456.txt
Hello hadoop basketball good good better best nice sun moon star skyMap 階段前
在 map 階段前得到的是
1.txt
2.txt
0 Hello hadoop 14 basketball good 31 good better best nice 54 sun moon star sky前面的數字是行偏移量,作用并不大,主要是根據后面的 value 進行拆分,上面兩個內容也就是 map 階段分別從從 1.txt 和 2.txt 得到的輸入
Map 階段(重寫 map 方法)
我們將單詞及其來自的文件作為 key,單詞的數量作為 value(其實值就是1),形式自己定,我的如下
map 階段結束得到的 1.txt
map 階段結束得到的 2.txt
Hello->456.txt 1 hadoop->456.txt 1 basketball->456.txt 1 good->456.txt 1 good->456.txt 1 better->456.txt 1 best->456.txt 1 nice->456.txt 1 sun->456.txt 1 moon->456.txt 1 star->456.txt 1 sky->456.txt 1這樣設計我們就可以使用 MapReduce 框架自帶的 map 端排序,將同一單詞的 value 組成列表
如下
1.txt
2.txt
Hello->456.txt {1} basketball->456.txt {1} best->456.txt {1} better->456.txt {1} good->456.txt {1,1} hadoop->456.txt {1} moon->456.txt {1} nice->456.txt {1} sky->456.txt {1} star->456.txt {1} sun->456.txt {1}上面的內容也就是 combine 階段分別從 1.txt 和 2.txt 得到的輸入
Combine 階段
combine 階段一般來說是跟 reduce 一樣的,但這里我們需要自定義 combine 方法,這一階段我們將 key 的 value 值累加,然后把單詞設置為 key,就可以使用 MapReduce 框架默認的 Shuffle 過程,將相同單詞發送給同一個 Reducer 來處理,文件及該單詞在這一文件出現的次數設為 value
1.txt 經過 combine 階段的輸出如下
2.txt 經過 combine 階段的輸出如下
Hello 456.txt->1 basketball 456.txt->1 best 456.txt->1 better 456.txt->1 good 456.txt->2 hadoop 456.txt->1 moon 456.txt->1 nice 456.txt->1 sky 456.txt->1 star 456.txt->1 sun 456.txt->1Shuffle 階段
shuffle 階段輸出(假設只有一個分區)
這一階段的輸出也就是 Reduce 階段的輸入
Reduce 階段(重寫 reduce 方法)
Reduce 階段就容易了,輸出如下
原理搞懂之后,編寫代碼就容易多了,我們主要是對 map 和 reduce 方法重寫,combiner 類需看情況是否需要,不合并則無需指定該類,分區類也根據自己需要編寫,總的來說,MapReduce 數據格式的轉換如下
Map: (Key1, Value1) → list(Key2,Value2)
Combine: (Key2, list(Value2)) → list(Key3, Value3)
Reduce: (Key3, list(Value3)) → list(Key4, Value4)
下面附上我的源碼,我是設置為2個分區,單詞 A-M 包括小寫在分區1,N-Z 包括小寫在分區2
MyMap 類:
MyCombiner 類:
package hadoopSort;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator;public class MyCombiner extends Reducer<Text,Text,Text,Text> {private Text text = new Text();@Overridepublic void reduce(Text key,Iterable<Text> values,Reducer<Text,Text,Text,Text>.Context context)throws IOException,InterruptedException {int sum = 0;//統計數量for (Text v:values){//統計單詞在該文件出現的總次數sum += Integer.parseInt(v.toString());}//將Key以‘->’為分隔符,則第一個為單詞,第二個為單詞所在的文件String[] line = key.toString().split("->");//單詞設置為Key值key.set(line[0]);//文件名及該單詞在該文件出現次數設置為valuetext.set(line[1]+"->"+sum);context.write(key,text);} }MyPartitioner 類(分區類):
package hadoopSort;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; import java.awt.*;public class MyPartitioner extends Partitioner<Text,Text> {@Overridepublic int getPartition(Text key, Text value, int numPart) {char firstLetter = key.toString().charAt(0);//A-M包括小寫在分區1,N-Z包括小寫在分區2,這里不考慮不合格的字符,假設都是符合要求的單詞if (firstLetter>='a'&&firstLetter<='m'||firstLetter>='A'&&firstLetter<='M'){return 0;}else {return 1;}} }MyReduce 類:
package hadoopSort;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator;public class MyReduce extends Reducer<Text,Text,Text,Text> {private Text result = new Text();@Overrideprotected void reduce(Text k2, Iterable<Text> v2, Reducer<Text,Text,Text,Text>.Context context)throws IOException,InterruptedException{String line = new String();for(Text c:v2){//將value列表里的內容連接起來line += c.toString()+",";//System.out.println(c.toString()+",");}//去掉最后一個逗號line = line.substring(0,line.length()-1);result.set(line);context.write(k2,result);} }Main 類:
package hadoopSort;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Main {public static void main(String[] args)throws Exception{Configuration conf = new Configuration();conf.set("mapreduce.app-submission.cross-platform","true"); //跨平臺提交//輸入路徑和輸出路徑,輸出路徑必須不存在String[] filePath = new String[]{"/user/hadoop/input", "/user/hadoop/output"};if (filePath.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}//指定運行對象和job名稱Job job = Job.getInstance(conf,"word sort");//提交到集群需要指定jar包的位置,不然會報錯ClassNotFoundExceptionjob.setJar("out\\artifacts\\myMapReduce_jar\\myMapReduce.jar");job.setJarByClass(Main.class);//指定map類job.setMapperClass(MyMap.class);//指定map輸出的key和value的格式job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//指定combiner類job.setCombinerClass(MyCombiner.class);//指定reduce類job.setReducerClass(MyReduce.class);//指定reduce輸出的key和value的格式job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//設置分區類job.setPartitionerClass(MyPartitioner.class);//指定分區數job.setNumReduceTasks(2);//若輸出路徑存在則刪除,就不需要每次都手動刪除了Path outputPath = new Path(filePath[1]);outputPath.getFileSystem(conf).delete(outputPath, true);//設置輸入路徑FileInputFormat.addInputPath(job,new Path(filePath[0]));//設置輸出路徑FileOutputFormat.setOutputPath(job,new Path(filePath[1]));//等待任務完成System.exit(job.waitForCompletion(true)?0:1);} }有不對的地方歡迎指正!
總結
以上是生活随笔為你收集整理的Hadoop倒排索引原理解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 完美代码,让你的代码无懈可击
- 下一篇: 特洛伊木马 (计算机木马程序)