MapReduce Java API-使用Partitioner实现输出到多个文件
場景
MapReduce Java API-多輸入路徑方式:
https://blog.csdn.net/BADAO_LIUMANG_QIZHI/article/details/119453275
在上面的基礎上,怎樣用Partitioner的方式實現將學生的成績數據
分段輸出到不同的文件。
例如分為三個成績段:
小于60分
大于等于60分小于等于80分
大于80分
Partitioner
1、Partion發生在Map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,
每個分區映射到一個Reducer。每個分區內又調用job.setSortComparatorClass設置的key
比較函數類排序。
2、 Partitioner的作用是對Mapper產生的中間結果進行分片,以便將同一個分組的數據交給同一個Reducer處理,
它直接影響Reducer階段的復雜均衡。
3、Partitioner創建流程
① 先分析一下具體的業務邏輯,確定大概有多少個分區
② 首先書寫一個類,它要繼承org.apache.hadoop.mapreduce.Partitioner這個類
③ 重寫public int getPartition這個方法,根據具體邏輯,讀數據庫或者配置返回相同的數字
④ 在main方法中設置Partioner的類,job.setPartitionerClass(DataPartitioner.class);
⑤ 設置Reducer的數量,job.setNumReduceTasks(6);
注:
博客:
https://blog.csdn.net/badao_liumang_qizhi
關注公眾號
霸道的程序猿
獲取編程相關電子書、教程推送與免費下載。
實現
1、首先新建數據集score.txt,用來進行分段輸出。
1、自定義分區函數類
通過成績判斷,用return的值為0、1、2代表三個分區。
package com.badao.muloutput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;public class StudentPartitioner extends Partitioner<IntWritable, Text> {@Overridepublic int getPartition(IntWritable intWritable, Text text, int i) {//學生成績int scoreInt = intWritable.get();//默認指定分區0if(i==0){return 0;}if(scoreInt < 60){return 0;}else if(scoreInt<=80){return 1;}else{return 2;}} }3、定義Mapper類
package com.badao.muloutput;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException;public class MulOutputMapper extends Mapper<LongWritable,Text,IntWritable,Text> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] studentArr = value.toString().split(" ");if(StringUtils.isNotBlank(studentArr[1])){IntWritable pKey = new IntWritable(Integer.parseInt(studentArr[1].trim()));context.write(pKey,value);}} }4、定義Reduce類
package com.badao.muloutput;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class MulOutputReducer extends Reducer<IntWritable,Text,NullWritable,Text> {@Overridepublic void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for(Text value:values){context.write(NullWritable.get(),value);}} }5、新建Job類
package com.badao.muloutput;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;import java.io.IOException;public class MulOutputJob {public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {wordCountLocal();}public static void wordCountLocal()throws IOException, ClassNotFoundException, InterruptedException{Configuration conf = new Configuration();System.setProperty("HADOOP_USER_NAME","root");conf.set("fs.defaultFS","hdfs://192.168.148.128:9000");//實例化一個作業,word count是作業的名字Job job = Job.getInstance(conf, "muloutput");//指定通過哪個類找到對應的jar包job.setJarByClass(MulOutputJob.class);//為job設置Mapper類job.setMapperClass(MulOutputMapper.class);//為job設置reduce類job.setReducerClass(MulOutputReducer.class);//設置Partitioner類job.setPartitionerClass(StudentPartitioner.class);//設置reduce的個數為3job.setNumReduceTasks(3);//mapper輸出格式job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);//reduce輸出格式job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(Text.class);//為job設置輸入路徑,輸入路徑是存在的文件夾/文件FileInputFormat.addInputPath(job,new Path("/score.txt"));//為job設置輸出路徑FileOutputFormat.setOutputPath(job,new Path("/muloutput8"));job.waitForCompletion(true);}}6、將數據集上傳到HDFS指定的目錄下,運行job查看輸出結果
?
注意事項
這里要注意坑點,因為這里在分解數據時是按照一個空格來拆分的,所以數據集中
每個key和value之間只能有一個空格。
并且不要再數據集的最后面添加多余的換行,不然會導致不能正常輸出數據。
比如這里查看數據時發現多了個換行
?
然后找不到不出統計數據的原因,就在代碼中將每步的結果輸出下
如果是上面多了換行的話,下面輸出key-value時就會有異常數據,都跟上面這樣是正常的。
總結
以上是生活随笔為你收集整理的MapReduce Java API-使用Partitioner实现输出到多个文件的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MapReduce Java API-多
- 下一篇: Three.js中使用材质覆盖属性