【Hadoop版】K-Shingle+最小Hash签名+LSH算法+LSH族
將單機版的代碼轉化為可以在Hadoop上運行的MapReduce版本需要適應兩個方面。
1.MR模型,即將單機版的一個程序切分成兩個步驟。
2.Hadoop本身的IO特性。
由于在單機版的時候,文件讀取采用了bufferreader類進行,但是hadoop中必須使用hadoop自己的讀寫方式即將文件默認以鍵值對方式輸入,key是行再文本內的偏移量,value是文件中的一行。
這個特性破壞了單機版中,直接讀取整個文檔然后進行處理的情況。每個map方法執行時只能讀一行,無法使用ShingleSet類。如果map中收集所有行,在cleanup中對整個匯集起來的文檔 進行處理并不可行。因為map用于收集文檔的所有行則無法產生key-value輸出,而reduce必須接收來自map的輸出。
于是采用了預處理文本,將其所有換行取掉,變為一行。(據說Hadoop一行最多只能讀1024Kb,經測試這是錯誤的)。
1.下邊是對文本進行預處理的代碼。
import java.io.BufferedReader; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; import java.util.regex.Matcher;import java.util.regex.Pattern;public class RemoveLineBreak {BufferedReader inputStream;PrintWriter outputStream;public void replaceBlank() throws IOException {inputStream =new BufferedReader(new FileReader("源文件路徑"));outputStream =new PrintWriter(new FileOutputStream("輸出文件路徑",true));String line=inputStream.readLine();Pattern p = Pattern.compile("\\s+|\t|\r|\n");while(line!=null){Matcher m = p.matcher(line);line = m.replaceAll(" ");outputStream.print(line);line=inputStream.readLine();}inputStream.close();outputStream.close();}public static void main(String[] agrs) throws IOException{RemoveLineBreak test=new RemoveLineBreak();test.replaceBlank();}}2.在Hadoop程序中進行重要處理的一個類 import java.util.ArrayList; import java.io.*;//只能處理a-z+空格 public class HadoopShingleSet {int k;int signatureNumber;int times;int [] randomArray;int [] randomArrayForLSH;ArrayList <String> array=new ArrayList <String>();//存儲所有的Shingles的集合,這些Shingles是無序的/**數組中的值是哈希桶的編號,即各個Shingles對應的桶號,和Shingles在array中的順序相同但也是無序的,可以看做矩陣的行號,* * 但這些行號并沒有按從小到大排序* * 從array變到resultOfHashToShingle的過程采用了相同的哈希函數** 如果是多篇文檔的話,各自的resultOfHashToShingle數組中存儲的桶號并不相同,也沒有按照桶號的順序來存儲,僅僅存儲了文* * 檔的shingles都被哈希到了哪些桶* */long [] resultOfHashToShingle;long [] signature;//這個數組用于存儲文本的簽名矩陣int bandNumber;int [] bucketNumber;//這個數組用于存儲簽名被哈希到的桶號int [] bucketNumberANDOR;//這個數組用于存儲簽名被哈希到的桶號BufferedReader inputStream;public HadoopShingleSet (int k,int signatureNumber,int bandNumber,int times, int[] randomArray,int [] randomArrayForLSH){this.k=k; this.signatureNumber=signatureNumber;this.bandNumber=bandNumber;this.times=times;this.randomArray=randomArray;this.randomArrayForLSH=randomArrayForLSH;}public void createShingleSet(String line) {if(!(line.length()<k)){//這塊的處理有點粗糙,行過短的被忽略,并且是先讀入行再進行去除制表符、回車等字符int start =0;String tmp=null;do{tmp=line.substring(start,start+(k-1));start++;if(!array.contains(tmp)){array.add(tmp);//如果文檔長度不同的話,自己所包含的shingle種類大小可能也不同}}while(!(start>line.length()-(k-1)));}}public void hashToShingle(){resultOfHashToShingle=new long[array.size()];for(int i=0;i<array.size();i++){String tmp=array.get(i);long sum=0;//設為longfor(int t=0;t<k-4;t++){char[] chartmp=tmp.substring(t,t+4).toCharArray();//將九位字符串中的連續四位以字符數組的形式存儲//將字符串轉化為32位整數。這里的強制類型轉換將char轉為int,再將double轉long時,由于double此時為整數且不大于long最大值,所以轉換無損long inttmp=(long)((int)chartmp[0]*Math.pow(128,3)+(int)chartmp[1]*Math.pow(128,2)+(int)chartmp[2]*Math.pow(128,1)+(int)chartmp[3]*Math.pow(128,0));sum+=inttmp;}long hashResult=(sum%(long)Math.pow(2,32));//java中模運算的操作數范圍大;將字符串哈希到2^32個桶中,而int占-2^31到+2^31。但桶數目小于27^9//hashResult的結果是0-2^32-1resultOfHashToShingle[i]=hashResult;}}/**對所有的桶重新進行大量哈希,每個哈希取最小的桶號* * 強制沒和哈希函數的結果共有27^9個桶(每個哈希函數的桶數目可以不一樣嗎?)因為27^9中字符串* */public void produceSignature(){signature=new long[signatureNumber];for(int i=0;i<signatureNumber;i++){long min=(long)Math.pow(27, k);//一個哈希函數將resultOfHashToShingle中的桶號在重新排序到27^k個桶中,找出最小的桶號即為簽名存儲進signature即可for(int t=0;t<resultOfHashToShingle.length;t++){long tmp=(resultOfHashToShingle[t]*randomArray[2*i]+randomArray[2*i+1])%(long)Math.pow(27, k);//結果是0-27^kif(tmp<min) min=tmp;}signature[i]=min;}} public void localitySensitiveHahing(){int rows=signatureNumber/bandNumber;//因為有bandNumber個行條,所以使得哈希函數也有bandNumber*time個桶。同一個行條必須使用同一個哈希函數。//這里不同行條使用了不同的hash函數//所以,第i個行條的哈希值=[(行條內簽名之和)*randomArray[row*i]+randomArray(row*i+1)]%(bandNumber*time)//對一個文檔的簽名向量的每個行條使用一個哈希函數,并存入了數組bucketNumber,對每篇文檔的簽明進行了bandNumber次hashbucketNumber=new int[bandNumber];for(int i=0;i<bandNumber;i++){int begin=i*rows;int end=(i+1)*rows;long sum=0;for(int t=begin;t<end;t++) sum+=signature[t];//將本文檔的第i行條的哈希值(即被哈希到的桶號)放入bucketNumber[i],如果兩個文檔的bucketNumber[i]相等,這說明這兩個文檔的第i個行條完全一樣//每個行條一組桶。bucketNumber[i]=(int)((sum*randomArray[rows*i]+randomArray[rows*i+1])%(bandNumber*times));}//與構造+或構造,選用的hash函數并不一定要是在局部敏感哈希中使用過的哈希函數。所以在再這里再構造4*4*bucketNumber個哈希函數對文檔進行重新處理//也就是對每個行條是用來16個hash函數//每個行條使用不同的hash函數,并將結果存入數組,每篇文檔進行了4*4*bandNumber次哈希。bucketNumberANDOR=new int[4*4*bandNumber];for(int i=0;i<bandNumber;i++){int begin=i*rows;int end=(i+1)*rows;long sum=0;for(int t=begin;t<end;t++) sum+=signature[t];for(int k=0;k<(4*4);k+=2) bucketNumberANDOR[(4*4)*i+k]=(int)((sum*randomArrayForLSH[(4*4)*i+k]+randomArrayForLSH[(4*4)*i+k+1])%(bandNumber*times));}}public void run() {this.hashToShingle();this.produceSignature();this.localitySensitiveHahing();}}
3.Hadoop代碼主類 import java.io.IOException; import java.util.Random; import java.util.Scanner; import java.util.StringTokenizer; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.ArrayList; import java.lang.Iterable; import java.util.Iterator;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class FindSimilar {public static class TokenizerMapper extends Mapper<Object, Text, Text, Text>{private Text outPutKey = new Text();private Text outPutValue = new Text();int signatureNumber=100; int bandNumber=1;int times=100; int[] randomArray;int[] randomArrayForLSH;HadoopShingleSet test;protected void setup(Context context) throws IOException, InterruptedException { this.bandNumber=Integer.parseInt(context.getConfiguration().get("bandNumber"));this.times=Integer.parseInt(context.getConfiguration().get("times"));this.signatureNumber=Integer.parseInt(context.getConfiguration().get("signatureNumber"));String tmprandomArray=context.getConfiguration().get("randomArray");String testLength=new String(tmprandomArray);int length_randomArray=0;int begin=0;int end;do{ end=testLength.indexOf('d');testLength=testLength.substring(end+1, testLength.length()); length_randomArray++;}while(!testLength.equals(""));int i=0;begin=0;this.randomArray=new int [length_randomArray];do{ end=tmprandomArray.indexOf('d');String tmp=tmprandomArray.substring(begin, end);tmprandomArray=tmprandomArray.substring(end+1, tmprandomArray.length());int number=Integer.parseInt(tmp);this.randomArray[i]=number;i++;}while(!tmprandomArray.equals(""));String tmprandomArrayForLSH=context.getConfiguration().get("randomArrayForLSH");testLength=new String(tmprandomArrayForLSH);int length_randomArrayForLSH=0;begin=0;do{ end=testLength.indexOf('d');testLength=testLength.substring(end+1, testLength.length()); length_randomArrayForLSH++;}while(!testLength.equals(""));i=0;begin=0;this.randomArrayForLSH=new int [length_randomArrayForLSH];do{ end=tmprandomArrayForLSH.indexOf('d');String tmp=tmprandomArrayForLSH.substring(begin, end);tmprandomArrayForLSH=tmprandomArrayForLSH.substring(end+1, tmprandomArrayForLSH.length());int number=Integer.parseInt(tmp);this.randomArrayForLSH[i]=number;i++;}while(!tmprandomArrayForLSH.equals(""));test=new HadoopShingleSet (5,signatureNumber,bandNumber,times,this.randomArray,this.randomArrayForLSH);}public void map(Object key, Text value, Context context ) throws IOException, InterruptedException {System.out.println(value.toString());System.out.println("輸出一行了******************************************");//如果整篇文檔都不存在換行,那么就可以做到一行相當于一整片文檔test.createShingleSet(value.toString());test.run();InputSplit inputSplit = context.getInputSplit();String fileName = ((FileSplit) inputSplit).getPath().toString();for(int i=0;i<test.bucketNumber.length;i++){String tmp=i+"d"+test.bucketNumber[i];outPutKey.set(i+"d"+test.bucketNumber[i]);outPutValue.set(fileName);context.write(outPutKey,outPutValue);}}}public static class TokenizerReducer extends Reducer<Text,Text,Text,Text> {private Text outPutKey = new Text();private Text outPutValue = new Text();public void reduce(Text key,Iterable<Text> values, Context context ) throws IOException, InterruptedException {//必須聲明在reduce方法,否則可能多個key使用同一個arrayArrayList <Text> array=new ArrayList <Text>();//System.out.println("當key等于"+key.toString()+"時:");Iterator <Text> iterator=values.iterator();int k=0; while(iterator.hasNext()) { Text tmp=new Text(iterator.next().toString());// System.out.println("輸出從迭代器獲取的值"+tmp);array.add(k,tmp);// System.out.println("輸出array中這次存儲的值"+array.get(k));k++;}//System.out.println("接下來遍歷array數組:");//for(int i=0;i<array.size();i++){// System.out.println("i等于"+i+"時");// System.out.println(array.get(i).toString());// }for(int i=0;i<array.size();i++){outPutKey.set("鍵值為: "+key.toString()+" "+array.get(i).toString());for(int t=i+1;t<array.size();t++){outPutValue.set(" "+array.get(t).toString());context.write(outPutKey,outPutValue);} }}}public static void main(String[] args) throws Exception {int signatureNumber=100;int bandNumber=1;int times=100;double Jaccard;int [] randomArray;int [] randomArrayForLSH;Scanner keyboard=new Scanner(System.in);//產生最小哈希簽名的哈希函數數目強制設初始化為100個(100對隨機數),即每個文本有100個簽名,下邊進行重新賦值。System.out.println("請問您希望將使用多少個Hash函數用于為文檔產生簽名?");signatureNumber=keyboard.nextInt();randomArray=new int [signatureNumber*2];Random random = new Random();for(int i=0;i<signatureNumber;i++){int tmp=(int)Math.pow(signatureNumber,0.5);randomArray[2*i]=(Math.abs(random.nextInt())%tmp)+1;//隨機數在0-(tmp-1),改為1-tmprandomArray[2*i+1]=(Math.abs(random.nextInt())%tmp)+1;}//根據簽名向量的長度以及預期的相似度來確定行條的數目,對double進行了運算,可能產生誤差System.out.println("請問您希望將相似度為多少的文檔在LSH過程中盡可能成為后選對?");Jaccard=keyboard.nextDouble();System.out.println("請問您希望在LSH過程中哈希桶的數目是行條數的幾倍?");times=keyboard.nextInt();keyboard.close();double difference=Math.abs(Math.pow(1.0/1.0,1.0/100.0)-Jaccard); for(int i=2;i<=signatureNumber;i++){if(signatureNumber%i==0){double tmp=Math.abs(Math.pow((double)1/(double)i,(double)i/(double)signatureNumber)-Jaccard);System.out.printf("行條=%4d時 ",i);System.out.printf("差值為%8f",tmp);if(tmp<difference) {difference=tmp;bandNumber=i;System.out.println(" 行條被改變");}else{System.out.println(" 行條未改變");}}}System.out.println("簽名矩陣被分為了"+bandNumber+"個行條");randomArrayForLSH=new int [4*4*bandNumber];for(int i=0;i<(4*4*bandNumber);i++){int tmp=(int)Math.pow(4*4*bandNumber,0.5 );randomArrayForLSH[i]=(Math.abs(random.nextInt())%tmp)+1;//隨機數在0-(tmp-1),改為1-tmp}Configuration conf = new Configuration();String bandNumber_String=bandNumber+"";String signatureNumber_String=signatureNumber+"";String times_String= times+"";String randomArray_String="";for(int i=0;i<randomArray.length;i++){randomArray_String+=randomArray[i];randomArray_String+="d";}String randomArrayForLSH_String="";for(int i=0;i<randomArrayForLSH.length;i++){randomArrayForLSH_String+=randomArrayForLSH[i];randomArrayForLSH_String+="d";}conf.set("bandNumber",bandNumber_String);conf.set("signatureNumber",signatureNumber_String);conf.set("times",times_String);conf.set("randomArray",randomArray_String);conf.set("randomArrayForLSH",randomArrayForLSH_String);String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = new Job(conf, " FindSimilar");job.setJarByClass( FindSimilar.class);job.setMapperClass(TokenizerMapper.class);job.setReducerClass(TokenizerReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }
處理所有文檔的hash函數需要一樣。這里使用conf傳遞在main中生成的hash函數的系數。將其轉換為字符串再傳遞給每個Map task。
至此程序將給出所有可能的文檔對。(重復的未被剔除)
在Mapper類的setup()方法為每個task聲明一個HadoopShingleSet對象。當輸入的每篇文檔小于一個block大小時,每篇文檔將有一個HadoopShingleSet對象。但當某篇文檔大于一個block塊大小時,這篇文檔將有可能被劃分到多個InputSplit,這樣每個InputSplit對應一個Mapper,也就可能產生多個HadoopShingleSet對象,從而出錯。
.................................................................
至于具體的劃分策略,FileInputFormat默認為文件在HDFS上的每一個Block生成一個對應的FileSplit。那么自然,FileSplit.start就是對應Block在文件中的Offset、FileSplit.length就是對應Block的Length、FileSplit.hosts就是對應Block的Location。
但是可以設置“mapred.min.split.size”參數,使得Split的大小大于一個Block,這時候FileInputFormat會將連續的若干個Block分在一個Split中、也可能會將一個Block分別劃在不同的Split中(但是前提是一個Split必須在一個文件中)。
總結
以上是生活随笔為你收集整理的【Hadoop版】K-Shingle+最小Hash签名+LSH算法+LSH族的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: opencv学习4——图像缩放
- 下一篇: Qt5学习之路及嵌入式开发教程1:信号槽