mahout贝叶斯算法开发思路(拓展篇)1
首先說明一點,此篇blog解決的問題是就下面的數據如何應用mahout中的貝葉斯算法?(這個問題是在上篇(。。。完結篇)blog最后留的問題,如果想直接使用該工具,可以在mahout貝葉斯算法拓展下載):
?
0.2 0.3 0.4:1 0.32 0.43 0.45:1 0.23 0.33 0.54:1 2.4 2.5 2.6:2 2.3 2.2 2.1:2 5.4 7.2 7.2:3 5.6 7 6:3 5.8 7.1 6.3:3 6 6 5.4:3 11 12 13:4
前篇blog上面的數據在最后的空格使用冒號代替(因為樣本向量和標識的解析需要不同的解析符號,同一個的話解析就會出問題)。關于上面的數據其實就是說樣本[0.2,0.3,0.4]被貼上了標簽1,其他依次類推,然后這個作為訓練數據訓練貝葉斯模型,最后通過上面的數據進行分類建議模型的準確度。
?
處理的過程大概可以分為7個步驟:1.轉換原始數據到貝葉斯算法可以使用的數據格式;2. 把所有的標識轉換為數值型格式;3.對原始數據進行處理獲得貝葉斯模型的屬性參數值1;4.對原始數據進行處理獲得貝葉斯模型的屬性參數值2;5.根據3、4的結果把貝葉斯模型寫入文件;6.對原始數據進行自分類;7.根據6的結果對貝葉斯模型進行評價。
下面分別介紹:
1. 數據格式轉換:
代碼如下:
?
package mahout.fansy.bayes.transform;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.math.NamedVector; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable;public class TFText2VectorWritable extends AbstractJob {/*** 處理把* [2.1,3.2,1.2:a* 2.1,3.2,1.3:b]* 這樣的數據轉換為 key:new Text(a),value:new VectorWritable(2.1,3.2,1.2:a) 的序列數據* @param args* @throws Exception */public static void main(String[] args) throws Exception {ToolRunner.run(new Configuration(), new TFText2VectorWritable(),args);}@Overridepublic int run(String[] args) throws Exception {addInputOption();addOutputOption();// 增加向量之間的分隔符,默認為逗號;addOption("splitCharacterVector","scv", "Vector split character,default is ','", ",");// 增加向量和標示的分隔符,默認為冒號;addOption("splitCharacterLabel","scl", "Vector and Label split character,default is ':'", ":");if (parseArguments(args) == null) {return -1;}Path input = getInputPath();Path output = getOutputPath();String scv=getOption("splitCharacterVector");String scl=getOption("splitCharacterLabel");Configuration conf=getConf();// FileSystem.get(output.toUri(), conf).deleteOnExit(output);//如果輸出存在,刪除輸出HadoopUtil.delete(conf, output);conf.set("SCV", scv);conf.set("SCL", scl);Job job=new Job(conf);job.setJobName("transform text to vector by input:"+input.getName());job.setJarByClass(TFText2VectorWritable.class); job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setMapperClass(TFMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(VectorWritable.class);job.setNumReduceTasks(0);job.setOutputKeyClass(Text.class);job.setOutputValueClass(VectorWritable.class);TextInputFormat.setInputPaths(job, input);SequenceFileOutputFormat.setOutputPath(job, output);if(job.waitForCompletion(true)){return 0;}return -1;}public static class TFMapper extends Mapper<LongWritable,Text,Text,VectorWritable>{private String SCV;private String SCL;/*** 初始化分隔符參數 */@Overridepublic void setup(Context ctx){SCV=ctx.getConfiguration().get("SCV");SCL=ctx.getConfiguration().get("SCL");}/*** 解析字符串,并輸出* @throws InterruptedException * @throws IOException */@Overridepublic void map(LongWritable key,Text value,Context ctx) throws IOException, InterruptedException{String[] valueStr=value.toString().split(SCL);if(valueStr.length!=2){return; // 沒有兩個說明解析錯誤,退出}String name=valueStr[1];String[] vector=valueStr[0].split(SCV);Vector v=new RandomAccessSparseVector(vector.length);for(int i=0;i<vector.length;i++){double item=0;try{item=Double.parseDouble(vector[i]);}catch(Exception e){return; // 如果不可以轉換,說明輸入數據有問題}v.setQuick(i, item);}NamedVector nv=new NamedVector(v,name);VectorWritable vw=new VectorWritable(nv);ctx.write(new Text(name), vw);}} }上面的代碼只使用了Mapper對數據進行處理即可,把原始數據的Text格式使用分隔符進行解析輸出<Text,VectorWritable>對應<標識,樣本向量>,貝葉斯算法處理的數據格式是VectorWritable的,所以要進行轉換。其中的解析符號是根據傳入的參數進行設置的。如果要單獨運行該類,傳入的參數如下:
?
?
usage: <command> [Generic Options] [Job-Specific Options] Generic Options:-archives <paths> comma separated archives to be unarchivedon the compute machines.-conf <configuration file> specify an application configuration file-D <property=value> use value for given property-files <paths> comma separated files to be copied to themap reduce cluster-fs <local|namenode:port> specify a namenode-jt <local|jobtracker:port> specify a job tracker-libjars <paths> comma separated jar files to include inthe classpath.-tokenCacheFile <tokensFile> name of the file with the tokens Job-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --splitCharacterVector (-scv) splitCharacterVector Vector split character,default is ',' --splitCharacterLabel (-scl) splitCharacterLabel Vector and Label split character,default is ':' --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run其中-scv和-scl參數是自己加的,其他參考mahout中的AbstractJob的默認設置;
?
2.轉換標識
這一步的主要操作是把輸入文件的所有標識全部讀取出來,然后進行轉換,轉換為數值型,代碼如下:
?
package mahout.fansy.bayes;import java.io.IOException; import java.util.Collection; import java.util.HashSet;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.mahout.common.Pair; import org.apache.mahout.common.iterator.sequencefile.PathFilters; import org.apache.mahout.common.iterator.sequencefile.PathType; import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;import com.google.common.io.Closeables;public class WriteIndexLabel {/*** @param args* @throws IOException */public static void main(String[] args) throws IOException {String inputPath="hdfs://ubuntu:9000/user/mahout/output_bayes/part-m-00000";String labPath="hdfs://ubuntu:9000/user/mahout/output_bayes/index.bin";Configuration conf=new Configuration();conf.set("mapred.job.tracker", "ubuntu:9001");long t=writeLabelIndex(inputPath,labPath,conf);System.out.println(t);}/*** 從輸入文件中讀出全部標識,并加以轉換,然后寫入文件* @param inputPath* @param labPath* @param conf* @return* @throws IOException*/public static long writeLabelIndex(String inputPath,String labPath,Configuration conf) throws IOException{long labelSize=0;Path p=new Path(inputPath);Path lPath=new Path(labPath);SequenceFileDirIterable<Text, IntWritable> iterable =new SequenceFileDirIterable<Text, IntWritable>(p, PathType.LIST, PathFilters.logsCRCFilter(), conf);labelSize = writeLabel(conf, lPath, iterable);return labelSize;}/*** 把數字和標識的映射寫入文件* @param conf* @param indexPath* @param labels* @return* @throws IOException*/public static long writeLabel(Configuration conf,Path indexPath,Iterable<Pair<Text,IntWritable>> labels) throws IOException{FileSystem fs = FileSystem.get(indexPath.toUri(), conf);SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, indexPath, Text.class, IntWritable.class);Collection<String> seen = new HashSet<String>();int i = 0;try {for (Object label : labels) {String theLabel = ((Pair<?,?>) label).getFirst().toString();if (!seen.contains(theLabel)) {writer.append(new Text(theLabel), new IntWritable(i++));seen.add(theLabel);}}} finally {Closeables.closeQuietly(writer);}System.out.println("labels number is : "+i);return i;} }
這一步要返回一個參數,即標識的一共個數,用于后面的處理需要。
?
3. 獲得貝葉斯模型屬性值1:
這個相當于 TrainNaiveBayesJob的第一個prepareJob,本來是可以直接使用mahout中的mapper和reducer的,但是其中mapper關于key的解析和我使用的不同,所以解析也不同,所以這一步驟的mapper可以認為就是TrainNaiveBayesJob中第一個prepareJob的mapper,只是做了很少的修改。此步驟的代碼如下:
?
package mahout.fansy.bayes;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.classifier.naivebayes.BayesUtils; import org.apache.mahout.common.AbstractJob; import org.apache.mahout.common.HadoopUtil; import org.apache.mahout.common.mapreduce.VectorSumReducer; import org.apache.mahout.math.VectorWritable; import org.apache.mahout.math.map.OpenObjectIntHashMap; /*** 貝葉斯算法第一個job任務相當于 TrainNaiveBayesJob的第一個prepareJob* 只用修改Mapper即可,Reducer還用原來的* @author Administrator**/ public class BayesJob1 extends AbstractJob {/*** @param args* @throws Exception */public static void main(String[] args) throws Exception {ToolRunner.run(new Configuration(), new BayesJob1(),args);}@Overridepublic int run(String[] args) throws Exception {addInputOption();addOutputOption();addOption("labelIndex","li", "The path to store the label index in");if (parseArguments(args) == null) {return -1;}Path input = getInputPath();Path output = getOutputPath();String labelPath=getOption("labelIndex");Configuration conf=getConf();HadoopUtil.cacheFiles(new Path(labelPath), getConf());HadoopUtil.delete(conf, output);Job job=new Job(conf);job.setJobName("job1 get scoreFetureAndLabel by input:"+input.getName());job.setJarByClass(BayesJob1.class); job.setInputFormatClass(SequenceFileInputFormat.class);job.setOutputFormatClass(SequenceFileOutputFormat.class);job.setMapperClass(BJMapper.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(VectorWritable.class);job.setCombinerClass(VectorSumReducer.class);job.setReducerClass(VectorSumReducer.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(VectorWritable.class);SequenceFileInputFormat.setInputPaths(job, input);SequenceFileOutputFormat.setOutputPath(job, output);if(job.waitForCompletion(true)){return 0;}return -1;}/*** 自定義Mapper,只是解析的地方有改動而已* @author Administrator**/public static class BJMapper extends Mapper<Text, VectorWritable, IntWritable, VectorWritable>{public enum Counter { SKIPPED_INSTANCES }private OpenObjectIntHashMap<String> labelIndex;@Overrideprotected void setup(Context ctx) throws IOException, InterruptedException {super.setup(ctx);labelIndex = BayesUtils.readIndexFromCache(ctx.getConfiguration()); //}@Overrideprotected void map(Text labelText, VectorWritable instance, Context ctx) throws IOException, InterruptedException {String label = labelText.toString(); if (labelIndex.containsKey(label)) {ctx.write(new IntWritable(labelIndex.get(label)), instance);} else {ctx.getCounter(Counter.SKIPPED_INSTANCES).increment(1);}}}}如果要單獨使用此類,可以參考下面的調用方式:
?
?
usage: <command> [Generic Options] [Job-Specific Options] Generic Options:-archives <paths> comma separated archives to be unarchivedon the compute machines.-conf <configuration file> specify an application configuration file-D <property=value> use value for given property-files <paths> comma separated files to be copied to themap reduce cluster-fs <local|namenode:port> specify a namenode-jt <local|jobtracker:port> specify a job tracker-libjars <paths> comma separated jar files to include inthe classpath.-tokenCacheFile <tokensFile> name of the file with the tokens Job-Specific Options: --input (-i) input Path to job input directory. --output (-o) output The directory pathname for output. --labelIndex (-li) labelIndex The path to store the label index in --help (-h) Print out help --tempDir tempDir Intermediate output directory --startPhase startPhase First phase to run --endPhase endPhase Last phase to run其中的-li參數是自己加的,其實就是第2步驟中求得的標識的總個數,其他參考AbstractJob默認參數。
?
?
分享,成長,快樂
轉載請注明blog地址:http://blog.csdn.net/fansy1990
轉載于:https://www.cnblogs.com/pangblog/p/3323104.html
總結
以上是生活随笔為你收集整理的mahout贝叶斯算法开发思路(拓展篇)1的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql指定时间_MySQL查询指定时
- 下一篇: Docker中安装Jenkins实时发布