MapReduce的几个企业级经典面试案例
MapReduce的幾個企業級經典面試案例
一、官方統計案例:
- 要求:統計一下單詞出現的次數
測試數據:
zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin編寫代碼:
-
mapper類
/*** @author 17616*/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 首先獲取一行數據String line = value.toString ();// 將行內的單詞進行切分,使用一個數組進行保存,切分數據時根據源數據得知可以使用空格的方式切分。String[] arr = line.split (" ");for (String str : arr) {context.write (new Text (str), new LongWritable (1));}} } -
reducer類
/*** @author 17616*/ public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overridepublic void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {// 定義變量記錄單詞出現的次數long sum = 0;for (LongWritable val : values) {// 記錄總次數sum += val.get ();}// 輸出數據,key就是單詞,value就是在map階段這個單詞出現的總次數context.write (key, new LongWritable (sum));} } -
Driver類
/*** @author 17616* 官方案例,計算統計*/ public class WordCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 獲取當前的默認配置Configuration conf = new Configuration ();// 獲取代表當前mr作業的job對象Job job = Job.getInstance (conf);// 指定一下當前程序的入口類job.setJarByClass (WordCountDriver.class);//指定當前Mapper、Reducer任務的類job.setMapperClass (WordCountMapper.class);job.setReducerClass (WordCountReducer.class);//設置Mapper的結果類型job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (LongWritable.class);// 設置Reducer的結果類型job.setOutputKeyClass (Text.class);job.setOutputValueClass (LongWritable.class);//設置待分析的文件夾路徑(linux的路徑地址)FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/mapreduce"));if (!job.waitForCompletion (true)) {return;}} }
運行結果:
zhangqin 20 zhangrui 20 zhangyong 20二、計算平均值:
-
要求:計算一下數據的平均值
-
測試數據:
tom 69 tom 84 tom 68 jary 89 jary 90 jary 81 jary 35 alex 23 alex 100 alex 230 -
編寫代碼:
-
mapper類
/*** @Author zhangyong* @Date 2020/4/3 23:43* @Version 1.0*/ public class AverageMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取每行的數據內容String line = value.toString ();//按照空格去切會獲取到多個數據,所以用數組的方式存儲String[] data = line.split (" ");String name = data[0];//Integer做一個數據類型的強制轉換。int score = Integer.parseInt (data[1]);//輸出數據context.write (new Text (name), new IntWritable (score));} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/3 23:43* @Version 1.0*/ public class AverageReducer extends Reducer<Text, IntWritable, Text, IntWritable> {@Overridepublic void reduce(Text name, Iterable<IntWritable> scores, Context context) throws IOException, InterruptedException {int i = 0;int score = 0;for (IntWritable data : scores) {score = score + data.get ();i++;}int average = score / i;context.write (name, new IntWritable (average));} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/3 23:41* @Version 1.0* 計算平均值*/ public class AverageDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);//驅動類,入口類job.setJarByClass (AverageDriver.class);//設置Mapper和Reducer的類job.setMapperClass (AverageMapper.class);job.setReducerClass (AverageReducer.class);//設置Mapper的結果類型job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (IntWritable.class);//設置Reduce的結果類型job.setOutputKeyClass (Text.class);job.setOutputValueClass (IntWritable.class);//設置待分析的文件夾路徑(linux的路徑地址)FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/average"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/average"));//提交到jobjob.waitForCompletion (true);} }
-
-
運行結果:
alex 117 jary 73 tom 73
三、求溫度最高值:
- 要求:求出一下年限的時間的最高溫度
測試數據:
2329999919500515070000 9909999919500515120022 9909999919500515180011 9509999919490324120111 6509999919490324180078 9909999919370515070001 9909999919370515120002 9909999919450515180001 6509999919450324120002 8509999919450324180078編寫代碼:
-
mapper類
/*** @author 17616*/ public class HeightMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取一段數據String line = value.toString ();//獲取年份String year = line.substring (8, 12);//獲取溫度(強制轉換一下)int t = Integer.parseInt (line.substring (18, 22));context.write (new Text (year),new LongWritable (t));} } -
reducer類
/*** @author 17616*/ public class HeightReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overridepublic void reduce(Text year, Iterable<LongWritable> t, Context context) throws IOException, InterruptedException {long max = 0;for (LongWritable data : t) {if (max < data.get ()) {max = data.get ();}}context.write (year, new LongWritable (max));} } -
Driver類
/*** @author 17616* -求最大值*/ public class HeightDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 獲取當前的默認配置Configuration conf = new Configuration ();// 獲取代表當前mr作業的job對象Job job = Job.getInstance (conf);// 指定一下當前程序的入口類job.setJarByClass (HeightDriver.class);//指定當前Mapper、Reducer任務的類job.setMapperClass (HeightMapper.class);job.setReducerClass (HeightReducer.class);//設置Mapper的結果類型job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (LongWritable.class);// 設置Reducer的結果類型job.setOutputKeyClass (Text.class);job.setOutputValueClass (LongWritable.class);//設置待分析的文件夾路徑(linux的路徑地址)FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/wendu/"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wendu"));job.waitForCompletion (true);} }
運行結果:
1937 2 1945 78 1949 111 1950 22四、數據去重:
- 要求:去重一下ip地址
測試數據:
192.168.234.21 192.168.234.22 192.168.234.21 192.168.234.21 192.168.234.23 192.168.234.21 192.168.234.21 192.168.234.21 192.168.234.25 192.168.234.21 192.168.234.21 192.168.234.26 192.168.234.21 192.168.234.27 192.168.234.21 192.168.234.27 192.168.234.21 192.168.234.29 192.168.234.21 192.168.234.26 192.168.234.21 192.168.234.25 192.168.234.25 192.168.234.25 192.168.234.21 192.168.234.22 192.168.234.21編寫代碼:
-
mapper類
/*** @Author zhangyong* @Date 2020/4/7 19:53* @Version 1.0*/ public class DisMapper extends Mapper<LongWritable,Text,Text,NullWritable> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {/*** 其中value只是一個變量,此處被當做key進行輸出*/context.write (value,NullWritable.get ());} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/7 21:21* @Version 1.0*/ public class DisReducer extends Reducer<Text, NullWritable, Text, NullWritable> {@Overridepublic void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write (key, NullWritable.get ());}} -
Driver類
/*** @Author zhangyong* @Date 2020/4/7 21:32* @Version 1.0* 數據去重*/ public class DisDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);//設置Drive類job.setJarByClass (DisReducer.class);//設置Mapper、Reduce類job.setMapperClass (DisMapper.class);job.setReducerClass (DisReducer.class);//Mapper的輸出job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (NullWritable.class);//地址FileInputFormat.setInputPaths (job,new Path ("hdfs://anshun115:9000/distinct"));FileOutputFormat.setOutputPath (job,new Path ("hdfs://anshun115:9000/result/distinct"));job.waitForCompletion (true);} }
運行結果:
192.168.234.21 192.168.234.22 192.168.234.23 192.168.234.25 192.168.234.26 192.168.234.27 192.168.234.29五、流量統計:
- 要求:統計一下手機號碼使用的流量數
測試數據:
13901000123 zs bj 343 13202111011 ww sh 456 13901000123 zs bj 1024 13207551234 ls sz 758編寫代碼:
-
Bean類
/*** @Author zhangyong* @Date 2020/4/10 8:01* @Version 1.0*/ public class FlowBean implements Writable {private String phone;private String name;private String addr;private long flow;/*** 序列化** @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF (phone);dataOutput.writeUTF (name);dataOutput.writeUTF (addr);dataOutput.writeLong (flow);}/*** 反序列化** @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {this.phone = dataInput.readUTF ();this.name = dataInput.readUTF ();this.addr = dataInput.readUTF ();this.flow = dataInput.readLong ();}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddr() {return addr;}public void setAddr(String addr) {this.addr = addr;}public long getFlow() {return flow;}public void setFlow(long flow) {this.flow = flow;}@Overridepublic String toString() {return "FlowBean{" +"phone='" + phone + '\'' +", name='" + name + '\'' +", addr='" + addr + '\'' +", flow=" + flow +'}';} } -
mapper類
/*** @Author zhangyong* @Date 2020/4/10 8:10* @Version 1.0*/ public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {@Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取行String line = value.toString ();/*** [13901000123,zk,bj,343]* phone = 13901000123;* name = zk;* addr = bj;* flow = 343;*/String[] info = line.split (" ");FlowBean flowBean = new FlowBean ();flowBean.setPhone (info[0]);flowBean.setName (info[1]);flowBean.setAddr (info[2]);flowBean.setFlow (Integer.parseInt (info[3]));context.write (new Text (flowBean.getName ()), flowBean);} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/10 8:23* @Version 1.0*/ public class FlowReducer extends Reducer<Text, FlowBean, FlowBean, NullWritable> {@Overridepublic void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {FlowBean result = new FlowBean ();for (FlowBean value : values) {result.setPhone (value.getPhone ());result.setName (value.getName ());result.setAddr (value.getAddr ());result.setFlow (result.getFlow () + value.getFlow ());}context.write (result, NullWritable.get ());} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/10 8:28* @Version 1.0* 流量統計*/ public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);// 加載主類job.setJarByClass (FlowDriver.class);//加載mapper、reduce類job.setMapperClass (FlowMapper.class);job.setReducerClass (FlowReducer.class);//設置map的的key、valuejob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//設置輸出的的key、valuejob.setOutputKeyClass(FlowBean.class);job.setOutputValueClass (NullWritable.class);//設置路徑(傳輸、結果)FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/flow"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/flow"));job.waitForCompletion (true);} }
運行結果:
FlowBean{phone='13207551234', name='ls', addr='sz', flow=758} FlowBean{phone='13202111011', name='ww', addr='sh', flow=456} FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}六、電影的排行榜:
- 要求:按照降序排列一下電影的熱度
測試數據:
中國機長 72 機械師2 83 奇異博士 87 流浪地球 79 復仇者聯盟4:終局之戰 94 驚奇隊長 68 蜘蛛俠:英雄遠征 80 長城 56 奪路而逃 69 神奇動物在哪里 57 驢得水 59 我不是潘金蓮 55 速度與激情:特別行動 77 哪吒之魔童降世 96 捉迷藏 78 上海堡壘 9 葉問4 75 勇士之門 35 羅曼蒂克消亡史 67 阿麗塔:戰斗天使 89編寫代碼:
-
Bean類
/*** @Author zhangyong* @Date 2020/4/13 8:42* @Version 1.0*/ public class MovieBean implements WritableComparable<MovieBean> {private String name;private int hot;/*** 排序方法** @param o* @return*/@Overridepublic int compareTo(MovieBean o) {return o.hot - this.hot;}/*** 序列化** @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF (name);dataOutput.writeInt (hot);}/*** 反序列化** @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {this.name = dataInput.readUTF ();this.hot = dataInput.readInt ();}public void setName(String name) {this.name = name;}public int getHot() {return hot;}public void setHot(int hot) {this.hot = hot;}@Overridepublic String toString() {return "MovieBean{" +"name='" + name + '\'' +", hot=" + hot +'}';} } -
mapper類
/*** @Author zhangyong* @Date 2020/4/13 8:52* @Version 1.0*/ public class MovieMapper extends Mapper<LongWritable, Text, MovieBean, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取一行String line = value.toString ();//截取數據String[] split = line.split (" ");//封裝對象MovieBean movieBean = new MovieBean ();movieBean.setName (split[0]);movieBean.setHot (Integer.parseInt (split[1]));//輸出context.write (movieBean, NullWritable.get ());} } -
reducer類
無 -
Driver類
/*** @Author zhangyong* @Date 2020/4/13 9:19* @Version 1.0*/ public class MovieDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (MovieDriver.class);job.setMapperClass (MovieMapper.class);//加載map輸出類型和value的輸出類型job.setMapOutputKeyClass (MovieBean.class);job.setMapOutputValueClass (NullWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/sort"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/sort"));job.waitForCompletion (true);} }
運行結果:
MovieBean{name='哪吒之魔童降世', hot=96} MovieBean{name='復仇者聯盟4:終局之戰', hot=94} MovieBean{name='阿麗塔:戰斗天使', hot=89} MovieBean{name='奇異博士', hot=87} MovieBean{name='機械師2', hot=83} MovieBean{name='蜘蛛俠:英雄遠征', hot=80} MovieBean{name='流浪地球', hot=79} MovieBean{name='捉迷藏', hot=78} MovieBean{name='速度與激情:特別行動', hot=77} MovieBean{name='葉問4', hot=75} MovieBean{name='中國機長', hot=72} MovieBean{name='奪路而逃', hot=69} MovieBean{name='驚奇隊長', hot=68} MovieBean{name='羅曼蒂克消亡史', hot=67} MovieBean{name='驢得水', hot=59} MovieBean{name='神奇動物在哪里', hot=57} MovieBean{name='長城', hot=56} MovieBean{name='我不是潘金蓮', hot=55} MovieBean{name='勇士之門', hot=35} MovieBean{name='上海堡壘', hot=9}七、多個文件統計成績:
- 要求:根據三張表統計每個同學的各課成績的總和
測試數據:
chinese.txt
1 alex 89 2 alex 73 3 alex 67 1 romeo 49 2 romeo 83 3 romeo 27 1 lee 77 2 lee 66 3 lee 89english.txt
1 alex 55 2 alex 69 3 alex 75 1 romeo 44 2 romeo 64 3 romeo 86 1 lee 76 2 lee 84 3 lee 93math.txt
1 alex 85 2 alex 59 3 alex 95 1 romeo 74 2 romeo 67 3 romeo 96 1 lee 45 2 lee 76 3 lee 67編寫代碼:
-
Bean類
/*** @Author zhangyong* @Date 2020/4/10 10:00* @Version 1.0*/ public class ScoreBean implements Writable {private String name;private int chinese;private int math;private int english;@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF(name);dataOutput.writeInt(chinese);dataOutput.writeInt(math);dataOutput.writeInt(english);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.name = dataInput.readUTF();this.chinese = dataInput.readInt();this.math = dataInput.readInt();this.english = dataInput.readInt();}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getChinese() {return chinese;}public void setChinese(int chinese) {this.chinese = chinese;}public int getMath() {return math;}public void setMath(int math) {this.math = math;}public int getEnglish() {return english;}public void setEnglish(int english) {this.english = english;}@Overridepublic String toString() {return "StuScore{" +"name='" + name + '\'' +", chinese=" + chinese +", math=" + math +", english=" + english +'}';} } -
mapper類
/*** @Author zhangyong* @Date 2020/4/10 10:03* @Version 1.0*/ public class ScoreMapper extends Mapper<LongWritable, Text, Text, ScoreBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 獲取一行數據String line = value.toString();String[] data = line.split(" ");ScoreBean ss = new ScoreBean();ss.setName(data[1]);/*** 注意,此處導包的時候不要導錯,應該導入的是org.apache.hadoop.mapreduce.lib.input.FileSplit;* 通過獲取當前map階段的MapTask處理的切片信息來獲取文件名。*/FileSplit split = (FileSplit) context.getInputSplit();if (split.getPath().getName().equals("chinese.txt")) {ss.setChinese(Integer.parseInt(data[2]));} else if (split.getPath().getName().equals("math.txt")) {ss.setMath(Integer.parseInt(data[2]));} else if (split.getPath().getName().equals("english.txt")) {ss.setEnglish(Integer.parseInt(data[2]));}context.write(new Text(ss.getName()), ss);} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/10 10:05* @Version 1.0*/ public class ScoreReducer extends Reducer<Text, ScoreBean, Text, ScoreBean> {@Overrideprotected void reduce(Text key, Iterable<ScoreBean> values, Context context) throws IOException, InterruptedException {ScoreBean resultScore = new ScoreBean();// 此處key.toSting中只有name一個值,因為在map階段的輸出key只有nameresultScore.setName(key.toString());for (ScoreBean value : values) {// result.setFlow(result.getFlow() + value.getFlow());// 語文成績分數resultScore.setChinese(resultScore.getChinese() + value.getChinese());// 數學成績分resultScore.setMath(resultScore.getMath() + value.getMath());// 英語成績分resultScore.setEnglish(resultScore.getEnglish() + value.getEnglish());}context.write(key, resultScore);} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/10 10:10* @Version 1.0* 統計成績*/ public class ScoreDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);//加載Drive類job.setJarByClass (ScoreDriver.class);//加載Mapper、Reducer類job.setMapperClass (ScoreMapper.class);job.setReducerClass (ScoreReducer.class);//加載map輸出類型和value的輸出類型job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (ScoreBean.class);job.setOutputKeyClass (Text.class);job.setOutputValueClass (ScoreBean.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/score"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/score"));job.waitForCompletion (true);} }
運行結果:
alex StuScore{name='alex', chinese=229, math=239, english=199} lee StuScore{name='lee', chinese=232, math=188, english=253} romeo StuScore{name='romeo', chinese=159, math=237, english=194}八、Job鏈處理數據:
- 要求:統計每個同學的總分
測試數據:
1|zhang 100 2|wang 200 3|zhang 150 4|lisi 190 5|wang 50 6|zhang 80 7|lisi 50編寫代碼:
-
Bean類
/*** @Author zhangyong* @Date 2020/4/14 15:06* @Version 1.0*/ public class CountBean implements WritableComparable<CountBean> {private String name;private int count;@Overridepublic int compareTo(CountBean o) {return o.count - this.count;}/*** 序列化** @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF (name);dataOutput.writeInt (count);}/*** 反序列化** @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {this.name = dataInput.readUTF ();this.count = dataInput.readInt ();}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic String toString() {return "CountBean{" +"name='" + name + '\'' +", count=" + count +'}';} } -
mapper類
OneCountMapper類
/*** @Author zhangyong* @Date 2020/4/14 8:54* @Version 1.0*/ public class OneCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString ();/*** 通過 | 進行切分時會得到一個數組,數組中的0號下標為序號,1號下標中有name和profit的數據。* 再通過切分1號下標中的數據時可以獲取到name和profit的數據。*/String name = line.split ("\\|")[1].split (" ")[0];int count = Integer.parseInt (line.split ("\\|")[1].split (" ")[1]);context.write (new Text (name),new IntWritable (count));} }TwoCountMapper類
/*** @Author zhangyong* @Date 2020/4/14 15:27* @Version 1.0*/ public class TwoCountMapper extends Mapper<LongWritable, Text, CountBean, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString ();/*** 因為第二個Mapper任務要讀取的數據內容是第一個MR任務的結果文件,通常MR任務的結果文件是以TAB的方式來展示數據的。* 所以當第二個Mapper任務要執行切分時,所使用的分隔符應該是\t——制表符。*/String name = line.split ("\t")[0];int count = Integer.parseInt (line.split ("\t")[1]);CountBean bean = new CountBean ();bean.setName (name);bean.setCount (count);context.write (bean, NullWritable.get ());} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/14 8:54* @Version 1.0*/ public class OneCountReducer extends Reducer<Text , IntWritable, Text, IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {//統計利潤int sum = 0;for (IntWritable value : values) {sum+=value.get ();}context.write (key,new IntWritable (sum));} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/14 8:54* @Version 1.0* job鏈操作數據*/ public class CountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (CountDriver.class);job.setMapperClass (OneCountMapper.class);job.setReducerClass (OneCountReducer.class);job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (IntWritable.class);job.setOutputKeyClass (Text.class);job.setOutputValueClass (IntWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/count"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/count"));if (job.waitForCompletion (true)) {// 設置第二個Job任務Job job2 = Job.getInstance (conf);// 設置第二個Job任務的Mapperjob2.setMapperClass (TwoCountMapper.class);job2.setMapOutputKeyClass (CountBean.class);job2.setMapOutputValueClass (NullWritable.class);/*** 設置第二個Job任務是輸入輸出路徑。* 此處的輸入路徑是第一個job任務的輸出路徑* 注意設置路徑時,里面傳入的job應該是當前的job任務,如下所示,應該是job2。* 如果寫成前面的job任務名稱,在運行時則會爆出錯誤,提示路徑不存在。*/FileInputFormat.setInputPaths (job2, new Path ("hdfs://anshun115:9000/result/count"));FileOutputFormat.setOutputPath (job2, new Path ("hdfs://anshun115:9000/result/count2"));// 此處提交任務時,注意用的是job2。job2.waitForCompletion (true);}}}
運行結果:
count.txt
lisi 240 wang 250 zhang 330count2.txt
CountBean{name='zhang', count=330} CountBean{name='wang', count=250} CountBean{name='lisi', count=240}九、簡單分區案例:
-
要求:分區顯示手機使用流量的總和
-
測試數據:
13901000123 zs bj 343 13202111011 ww sh 456 13901000123 zs bj 1024 13207551234 ls sz 758
2. 編寫代碼:- Partitioner類```java/*** @Author zhangyong* @Date 2020/4/10 10:42* @Version 1.0*/public class AddPartitioner extends Partitioner<Text, PartFlowBean> {@Overridepublic int getPartition(Text text, PartFlowBean flowBean, intnumPartitioner) {String addr = flowBean.getAddr();if (addr.equals("bj")) {return 0;} else if (addr.equals("sh")) {return 1;} else {return 2;}}}
-
Bean類
/*** @Author zhangyong* @Date 2020/4/10 10:01* @Version 1.0*/ public class PartFlowBean implements Writable {private String phone;private String name;private String addr;private long flow;/*** 序列化** @param dataOutput* @throws IOException*/@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeUTF (phone);dataOutput.writeUTF (name);dataOutput.writeUTF (addr);dataOutput.writeLong (flow);}/*** 反序列化** @param dataInput* @throws IOException*/@Overridepublic void readFields(DataInput dataInput) throws IOException {this.phone = dataInput.readUTF ();this.name = dataInput.readUTF ();this.addr = dataInput.readUTF ();this.flow = dataInput.readLong ();}public String getPhone() {return phone;}public void setPhone(String phone) {this.phone = phone;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddr() {return addr;}public void setAddr(String addr) {this.addr = addr;}public long getFlow() {return flow;}public void setFlow(long flow) {this.flow = flow;}@Overridepublic String toString() {return "FlowBean{" +"phone='" + phone + '\'' +", name='" + name + '\'' +", addr='" + addr + '\'' +", flow=" + flow +'}';} } -
mapper類
/*** @Author zhangyong* @Date 2020/4/10 10:51* @Version 1.0*/ public class PartFlowMapper extends Mapper<LongWritable, Text, Text, PartFlowBean> {@Overridepublic void map(LongWritable key, Text value, Context context) throwsIOException, InterruptedException {String line = value.toString ();/**[13901000123,zk,bj,343]phone = 13901000123;name = zk;addr = bj;flow = 343;*/String[] info = line.split (" ");PartFlowBean flowBean = new PartFlowBean ();flowBean.setPhone (info[0]);flowBean.setName (info[1]);flowBean.setAddr (info[2]);flowBean.setFlow (Integer.parseInt (info[3]));context.write (new Text (flowBean.getName ()), flowBean);} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/10 10:23* @Version 1.0*/ public class PartFlowReducer extends Reducer<Text, PartFlowBean, PartFlowBean,NullWritable> {@Overridepublic void reduce(Text key, Iterable<PartFlowBean> values, Contextcontext) throws IOException, InterruptedException {PartFlowBean result = new PartFlowBean ();for (PartFlowBean value : values) {result.setPhone (value.getPhone ());result.setPhone (value.getPhone ());result.setName (value.getName ());result.setAddr (value.getAddr ());result.setFlow (result.getFlow () + value.getFlow ());}context.write (result, NullWritable.get ());} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/11 11:17* @Version 1.0* 分區案例*/ public class PartFlowDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (PartFlowDriver.class);job.setMapperClass (PartFlowMapper.class);job.setReducerClass (PartFlowReducer.class);/*** 下面的兩個類如果不寫的話,那么就不會生效。*/// 設置分區類job.setPartitionerClass (AddPartitioner.class);// 設置分區數量job.setNumReduceTasks (3);job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (PartFlowBean.class);job.setOutputKeyClass (PartFlowBean.class);job.setOutputValueClass (NullWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/partition"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/partition"));job.waitForCompletion (true);} }
運行結果:
part-r-00000
FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}part-r-00001
FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}part-r-00002
FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}十、分區并全排序:
- 要求:把一下數據按照兩位數、三位數、四位數以上進行分區,并且按照大小排序
測試數據:
82 239 231 23 22 213 123 232 124 213 3434 232 4546 565 123 231 231 2334 231 1123 5656 657 12313 4324 213 123 2 232 32 343 123 4535 12321 3442 453 1233 342 453 1231 322 452 232 343 455 3123 3434 3242編寫代碼:
-
Partitioner類
/*** @Author zhangyong* @Date 2020/4/14 9:39* @Version 1.0* 全排序* 將上述文件內容按照數字位數分別寫入三個文件,如下* 0-99的寫入到文件1* 100-999寫入到文件2* 1000-其他數據寫入到文件3*/ public class AutoPartitioner extends Partitioner<IntWritable, IntWritable> {@Overridepublic int getPartition(IntWritable key, IntWritable value, int numPartitions) {String num = String.valueOf (key.get ());if (num.matches ("[0-9][0-9]") || num.matches ("[0-9]")) {return 0;} else if (num.matches ("[0-9][0-9][0-9]")) {return 1;} else {return 2;}} } -
mapper類
/*** @Author zhangyong* @Date 2020/4/14 9:44* @Version 1.0*/ public class NumSortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString ();String[] data = line.split (" ");for (String num : data) {context.write (new IntWritable (Integer.parseInt (num)), new IntWritable (1));}} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/14 9:39* @Version 1.0*/ public class NumSortReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int result = 0;for (IntWritable count : values) {result = result + count.get ();}context.write (key, new IntWritable (result));} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/14 9:39* @Version 1.0**/ public class NumSortDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (NumSortDriver.class);job.setMapperClass (NumSortMapper.class);job.setMapOutputKeyClass (IntWritable.class);job.setMapOutputValueClass (IntWritable.class);/*** 由于結果文件系統是3個,所以需要在此指定Reduce的分區類和任務數。*/job.setPartitionerClass (AutoPartitioner.class);job.setNumReduceTasks (3);job.setReducerClass (NumSortReducer.class);job.setOutputKeyClass (IntWritable.class);job.setOutputValueClass (IntWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/numcount/"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/numcount"));job.waitForCompletion (true);} }
運行結果:
part-r-00000
2 1 22 1 23 1 32 1 82 1part-r-00001
123 4 124 1 213 3 231 4 232 4 239 1 322 1 342 1 343 2 452 1 453 2 455 1 565 1 657 1part-r-00002
1123 1 1231 1 1233 1 2334 1 3123 1 3242 1 3434 2 3442 1 4324 1 4535 1 4546 1 5656 1 12313 1 12321 1十一、Combine提高運行效率:
-
要求:使用Combine類統計一下單詞出現的次數
-
測試數據:
zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin zhangyong zhangrui zhangqin -
編寫代碼:
-
mapper類
/*** @Author zhangyong* @Date 2020/4/15 7:30* @Version 1.0*/ public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString ();String[] words = line.split (" ");for (String word : words) {context.write (new Text (word), new IntWritable (1));}} } -
Combine類
/*** @Author zhangyong* @Date 2020/4/15 7:34* @Version 1.0*/ public class WcCombine extends Reducer<Text,IntWritable,Text,IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count=0;for (IntWritable value : values) {count+=value.get ();}context.write (key,new IntWritable (count));} } -
reducer類
/*** @Author zhangyong* @Date 2020/4/15 7:34* @Version 1.0*/ public class WcReducer extends Reducer<Text,IntWritable,Text,IntWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int count = 0;for (IntWritable value : values) {count+=value.get ();System.err.println(key + ":" + value);}context.write (key,new IntWritable (count));} } -
Driver類
/*** @Author zhangyong* @Date 2020/4/15 7:42* @Version 1.0*/ public class WcDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration ();Job job = Job.getInstance (conf);job.setJarByClass (WcDriver.class);job.setMapperClass (WcMapper.class);job.setReducerClass (WcReducer.class);job.setMapOutputKeyClass (Text.class);job.setMapOutputValueClass (IntWritable.class);/*** 設置combine組件類,如果不設定,默認是不執行combine過程的。* 設置combine的目的是為了讓合并工作提前發生一次,在MapTask階段時合并一次,使Reduce階段的工作負載。* 需要注意的是combine僅僅是做合并的工作,減少工作負載,并不能影響最終的文件結果。*/job.setCombinerClass(WcCombine.class);job.setOutputKeyClass (Text.class);job.setOutputValueClass (IntWritable.class);FileInputFormat.setInputPaths (job, new Path ("hdfs://anshun115:9000/mapreduce"));FileOutputFormat.setOutputPath (job, new Path ("hdfs://anshun115:9000/result/wccombine"));job.waitForCompletion (true);} }
-
-
運行結果:
zhangqin 20 zhangrui 20 zhangyong 20
十二、推薦認識好友:
- 要求:找出一下朋友的潛在朋友(一度二度朋友關系鏈)
測試數據:
tom rose tom jim tom smith tom lucy rose tom rose lucy rose smith jim tom jim lucy jim smith smith jim smith tom smith rose編寫代碼:
-
第一個mapper類
/*** @Author 張勇* @Site www.gz708090.com* @Version 1.0* @Date 2020-04-17 12:08*/ public class OneFriendMapper extends Mapper<LongWritable, Text, Text, Text> {/*** 輸入的key和value是根據文件內容來確定。* 輸出的key和value是因為在業務邏輯中設定的輸出是name-friend好友關系。*/@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 獲取每行的數據String line = value.toString();// 獲取姓名String name = line.split(" ")[0];// 獲取好友String friend = line.split(" ")[1];context.write(new Text(name), new Text(friend));} } -
第一個reducer類
/*** @Author 張勇* @Site www.gz708090.com* @Version 1.0* @Date 2020-04-17 12:28*/ public class OneFriendReducer extends Reducer<Text, Text, Text, IntWritable> {/*** 輸入key和value要和mapper的輸出保持一致。* Text和IntWritable:* 如果是好友-1,如果不是好友就用-2。*/@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {ArrayList<String> friendList = new ArrayList<>();//處理好友關系for (Text value : values) {friendList.add(value.toString());if (key.toString().compareTo(value.toString()) < 0) {context.write(new Text(key + "-" + value), new IntWritable(1));} else {context.write(new Text(value + "-" + key), new IntWritable(1));}}// 處理可能相識的好友。for (int i = 0; i < friendList.size(); i++) {for (int j = 0; j < friendList.size(); j++) {String friend1 = friendList.get(i);String friend2 = friendList.get(j);if (friend1.compareTo(friend2) < 0) {context.write(new Text(friend1 + "-" + friend2), new IntWritable(2));}}}} } -
第二個mapper類
/*** @Author 張勇* @Site www.gz708090.com* @Version 1.0* @Date 2020-04-17 12:32*/ public class TwoFriendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 獲取一行數據String line = value.toString();// 獲取朋友關系的信息String friendInfo = line.split("\t")[0];// 獲取朋友關系的深度int deep = Integer.parseInt(line.split("\t")[1]);context.write(new Text(friendInfo), new IntWritable(deep));} } -
第二個reducer類
/*** @Author 張勇* @Site www.gz708090.com* @Version 1.0* @Date 2020-04-17 12:34*/ public class TwoFriendReducer extends Reducer<Text, IntWritable, Text, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {Boolean flag = true;/*** 設定好友關系為true的時候進行輸出* 因為題目要求是輸出可能相識的好友。所以為true的代碼應該是2* 也就是好友關系為1的時候設置變量為false*/for (IntWritable value : values) {if (value.get() == 1) {flag = false;}}if (flag) {context.write(key, NullWritable.get());}} } -
Driver類
/*** @Author 張勇* @Site www.gz708090.com* @Version 1.0* @Date 2020-04-17 12:36*/ public class FriendDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();//設置第一輪MapReduce的相應處理類與輸入輸出Job job1 = Job.getInstance(conf);job1.setJarByClass(FriendDriver.class);job1.setMapperClass(OneFriendMapper.class);job1.setReducerClass(OneFriendReducer.class);job1.setMapOutputKeyClass(Text.class);job1.setMapOutputValueClass(Text.class);job1.setOutputKeyClass(Text.class);job1.setOutputValueClass(IntWritable.class);//設置路徑(傳輸、結果)FileInputFormat.setInputPaths(job1, new Path("hdfs://anshun115:9000/friend"));FileOutputFormat.setOutputPath(job1, new Path("hdfs://anshun115:9000/result/friend"));//如果第一輪MapReduce完成再做這里的代碼if (job1.waitForCompletion(true)) {Job job2 = Job.getInstance(conf);// 設置第二個Job任務的Mapperjob2.setMapperClass(TwoFriendMapper.class);job2.setMapOutputKeyClass(Text.class);job2.setMapOutputValueClass(IntWritable.class);// 設置第二個Job任務的Reducerjob2.setReducerClass(TwoFriendReducer.class);job2.setOutputKeyClass(Text.class);job2.setOutputValueClass(NullWritable.class);/*** 設置第二個Job任務是輸入輸出路徑。* 此處的輸入路徑是第一個job任務的輸出路徑* 注意設置路徑時,里面傳入的job應該是當前的job任務,如下所示,應該是job2。* 如果寫成前面的job任務名稱,在運行時則會爆出錯誤,提示路徑不存在。*/FileInputFormat.setInputPaths(job2, new Path("hdfs://anshun115:9000/result/friend"));FileOutputFormat.setOutputPath(job2, new Path("hdfs://anshun115:9000/result/friend2"));// 此處提交任務時,注意用的是job2。job2.waitForCompletion(true);}} }
運行結果:
friend
jim-smith 1 jim-lucy 1 jim-tom 1 smith-tom 2 lucy-smith 2 lucy-tom 2 rose-smith 1 lucy-rose 1 rose-tom 1 smith-tom 2 lucy-smith 2 lucy-tom 2 rose-smith 1 smith-tom 1 jim-smith 1 rose-tom 2 jim-rose 2 jim-tom 2 lucy-tom 1 smith-tom 1 jim-tom 1 rose-tom 1 lucy-smith 2 lucy-rose 2 jim-lucy 2 jim-smith 2 jim-rose 2 rose-smith 2friend2
jim-rose lucy-smith總結
以上是生活随笔為你收集整理的MapReduce的几个企业级经典面试案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 摘抄佛言警句
- 下一篇: 无尘车间净化装修方案