学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例
生活随笔
收集整理的這篇文章主要介紹了
学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
四、MapReduce API介紹
- 一般MapReduce都是由Mapper, Reducer 及main 函數組成。
- Mapper程序一般完成鍵值對映射操作;
- Reducer 程序一般完成鍵值對聚合操作;
- Main函數則負責組裝Mapper,Reducer及必要的配置;
- 高階編程還涉及到設置輸入輸出文件格式、設置Combiner、Partitioner優化程序等;
4.1、MapReduce程序模塊 : Main 函數
4.2、MapReduce程序模塊: Mapper
- org.apache.hadoop.mapreduce.Mapper
4.3、MapReduce程序模塊: Reducer
- org.apache.hadoop.mapreduce.Reducer
五、MapReduce實例
5.1、流程(Mapper、Reducer、Main、打包運行)
5.2、實例1:按日期訪問統計次數:
1、參考WordCount程序,修改Mapper;
(這里新建一個java程序,然后把下面(1、2、3步代碼)復制到類里)
2、直接復制 Reducer程序;
public static class IntSumReducerextends Reducer<Text,IntWritable,Text,IntWritable> {private IntWritable result = new IntWritable();public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {int sum = 0;for (IntWritable val : values) {sum += val.get();}result.set(sum);context.write(key, result);}}3、直接復制Main函數,并做相應修改;
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: wordcount <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "word count");job.setJarByClass(CountByDate.class); //我們的主類是CountByDatejob.setMapperClass(SpiltMapper.class); //mapper:我們修改為SpiltMapperjob.setCombinerClass(IntSumReducer.class);job.setReducerClass(IntSumReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}4、編譯打包 (jar打包)
build出現錯誤及解決辦法:
完成
5/6、上傳jar包&數據
email_log_with_date.txt數據包鏈接:https://pan.baidu.com/s/1HfwHCfmvVdQpuL-MPtpAng
提取碼:cgnb
上傳數據包(注意開啟hdfs):
上傳OK(瀏覽器:master:50070查看)
7、運行程序
(注意開啟yarn)
上傳完成后:
(master:8088)
8、查看結果
(master:50070)
5.3、實例2:按用戶訪問次數排序
Mapper、Reducer、Main程序
SortByCountFirst.Mapper
SortByCountSecond.Mapper
package demo;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException;public class SortByCountSecond {//1、修改Mapperpublic static class SpiltMapperextends Mapper<Object, Text, IntWritable, Text> {private IntWritable count = new IntWritable(1);private Text word = new Text();//value: email_address \t countpublic void map(Object key, Text value, Context context) throws IOException, InterruptedException {String[] data = value.toString().split("\t",-1);word.set(data[0]);count.set(Integer.parseInt(data[1]));context.write(count,word);}}//2、直接復制 Reducer程序,不用修改public static class ReverseReducerextends Reducer<IntWritable,Text,Text,IntWritable> {public void reduce(IntWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {for (Text val : values) {context.write(val,key);}}}//3、直接復制Main函數,并做相應修改;public static void main(String[] args) throws Exception {Configuration conf = new Configuration();String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();if (otherArgs.length < 2) {System.err.println("Usage: demo.SortByCountFirst <in> [<in>...] <out>");System.exit(2);}Job job = Job.getInstance(conf, "sort by count first ");job.setJarByClass(SortByCountSecond.class); //我們的主類是CountByDatejob.setMapperClass(SpiltMapper.class); //mapper:我們修改為SpiltMapper // job.setCombinerClass(IntSumReducer.class);job.setReducerClass(ReverseReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);for (int i = 0; i < otherArgs.length - 1; ++i) {FileInputFormat.addInputPath(job, new Path(otherArgs[i]));}FileOutputFormat.setOutputPath(job,new Path(otherArgs[otherArgs.length - 1]));System.exit(job.waitForCompletion(true) ? 0 : 1);} }然后打包上傳
yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00 yarn jar sortbycount.jar demo.SortByCountSecond -Dmapreduce.job.queuename=prod email_log_with_date.txt sortbycountfirst_output00 sortbycountsecond_output00總結
以上是生活随笔為你收集整理的学习笔记Hadoop(十四)—— MapReduce开发入门(2)—— MapReduce API介绍、MapReduce实例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 学习笔记Hadoop(十三)—— Map
- 下一篇: 学习笔记Hadoop(十五)—— Map