使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner
工程結構:
在整個案例過程中,代碼如下:
WordCountMapper的代碼如下:
package cn.toto.bigdata.mr.wc; ? import java.io.IOException; ? import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; ? /** ?* 這里的Mapper是hadoop-mapreduce-client-core-2.8.0.jar中的內容 ?* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> ?* KEYIN???? :是指框架讀取到的數據的key的類型,在默認的InputFormat下,讀到的key是一行文本的起始偏移量,所以key的類型是Long ?* VALUEIN?? :是指框架讀取到的數據的value的類型,在默認的InputFormat下,讀到的value是一行文本的內容,所以value的類型是String ?* KEYOUT??? :是指用戶自定義邏輯方法返回的數據中key的類型,由用戶業務邏輯決定,在此wordcount程序中,我們輸出的key是單詞,所以是String ?* VALUEOUT? :是指用戶自定義邏輯方法返回的數據中value的類型,由用戶業務邏輯決定,在此wordcount程序中,我們輸出的value是單詞的數量,所以是Integer ?* ?* 但是,String,Long等jdk中自帶的數據類型,在序列化是,效率比較低,hadoop為了提高序列化效率,自定義了一套序列化框架, ?* 所以,在hadoop的程序中,如果該數據需要進行序列化(寫磁盤,或者網絡傳輸),就一定要用實現了hadoop序列化框架的數據類型 ?* ?* Long?????? ----> LongWritable ?* String???? ----> Text ?* Integer??? ----> IntWritable ?* Null?????? ----> NullWritable ?*/ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { ? ???????? /** ???????? ?* 這就是mapreduce框架中一個主體運行程序MapTask所要調用的用戶業務邏輯方法 ???????? ?* MapTask會驅動InputFormat去讀取數據(keyIN,VALUEIN),每讀到一個KV對,就傳入這個用戶寫的map方法中調用一次 ???????? ?* 在默認的inputformat實現中,此處的一個key就是一行的起始偏移量,value就是一行的內容 ???????? ?*/ ???????? @Override ???????? protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) ??????????????????????????? throws IOException, InterruptedException { ?????????????????? ?????????????????? String line = value.toString(); ?????????????????? String[] words = line.split(" "); ?????????????????? for (String word : words) { ??????????????????????????? context.write(new Text(word), new IntWritable(1)); ?????????????????? } ???????? } } |
?
WordCountReducer的代碼如下:
package cn.toto.bigdata.mr.wc; ? import java.io.IOException; ? import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; ? public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { ? ???????? /** reducetask在調我們寫的reduce方法 ?????????????????? reducetask應該收到了前一階段(map階段)中所有maptask輸出的數據中的一部分 ?????????????????? (數據的key.hashcode%reducetask數==本reductask號) ?????????????????? reducetask將這些收到kv數據拿來處理時,是這樣調用我們的reduce方法的: ?????????????????? ?????? 先將自己收到的所有的kv對按照k分組(根據k是否相同) ?????????????????? ?????? 將某一組kv中的第一個kv中的k傳給reduce方法的key變量,把這一組kv中所有的v用一個迭代器傳給reduce方法的變量values ?????????????????? ?*/ ???????? @Override ???????? protected void reduce(Text key, Iterable<IntWritable> values, ??????????????????????????? Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { ?????????????????? int count = 0; ?????????????????? for(IntWritable v : values) { ??????????????????????????? count += v.get(); ?????????????????? } ?????????????????? ?????????????????? context.write(key, new IntWritable(count)); ???????? } } |
?
WordCountDriver的代碼如下:
package cn.toto.bigdata.mr.wc; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; ? /** ?* 本類是客戶端用來指定wordcount job程序運行時所需要的很多參數: ?* 比如,指定用哪個組件作為數據讀取器、數據結果輸出器 ?*???? 指定用哪個類作為map階段的業務邏輯類,哪個類作為reduce階段的業務邏輯類 ?*???? 指定wordcount job程序的jar包所在路徑 ?*???? .... ?*???? 運行前準備工作 ?*???? 1、將當前的工程導出成wordcount.jar ?*???? 2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的內容類似: ?*????????????????? The true ??????????????? nobility is ??????????????? in being ??????????????? superior to ??????????????? your previous ??????????????? self guess ?? ?? 3、將 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中 ?*???? ?*???? 以及其他各種需要的參數 ?*???? hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver ?*???? 上面的命令等同: ?*???? java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver ?*???? ?*???? 上面的含義是通過hadoop jar將hadoop classpath的jar都拷貝到應用中,并且指定執行cn.toto.bigdata.mr.wc.WordCountDriver ?* ???? ?*???? 最后查看結果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通過這個命令可以查看查看到 ?*/ public class WordCountDriver { ? ?? public static void main(String[] args) throws Exception { ????? Configuration conf = new Configuration(); ????? ????? conf.set("fs.defaultFS", "hdfs://hadoop:9000"); ????? ????? /*conf.set("mapreduce.framework.name", "yarn"); ????? conf.set("yarn.resourcemanager.hostname", "mini1");*/ ????? ????? Job job = Job.getInstance(conf); ????? //告訴框架,我們的程序所在jar包的路徑 ????? // job.setJar("c:/wordcount.jar"); ????? job.setJarByClass(WordCountDriver.class); ????? ????? //告訴框架,我們的程序所用的mapper類和reducer類 ????? job.setMapperClass(WordCountMapper.class); ????? job.setReducerClass(WordCountReducer.class); ????? ????? //告訴框架,我們的mapperreducer輸出的數據類型 ????? job.setMapOutputKeyClass(Text.class); ????? job.setMapOutputValueClass(IntWritable.class); ????? ????? job.setOutputKeyClass(Text.class); ????? job.setOutputValueClass(IntWritable.class); ????? ????? // 告訴框架,我們的數據讀取、結果輸出所用的format組件 ????? // TextInputFormat是mapreduce框架中內置的一種讀取文本文件的輸入組件 ????? job.setInputFormatClass(TextInputFormat.class); ????? job.setOutputFormatClass(TextOutputFormat.class); ????? ????? // 告訴框架,我們要處理的文件在哪個路徑下,注意若hdfs中已經有了/wordcount/input/這個文件,說明 ????? FileInputFormat.setInputPaths(job, new Path("/wordcount/input/")); ????? ????? // 告訴框架,我們的處理結果要輸出到哪里去 ????? FileOutputFormat.setOutputPath(job, new Path("/wordcount/output/"));深 ????? ????? boolean res = job.waitForCompletion(true); ????? ????? System.exit(res?0:1); ?? } } |
?
運行前的準備工作:
運行前準備工作 1、將當前的工程導出成wordcount.jar 2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的內容類似: ?????????????????? The true ??????????????? nobility is ??????????????? in being ??????????????? superior to ??????????????? your previous ??????????????? self guess ?? 3、將 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中 |
最后,可以執行的命令是: hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver 執行后的效果如下: |
?
B:使用WordCount本地運行,并且使用Combiner的案例(主要改變是在WordCountDriver中),代碼如下:
package cn.toto.bigdata.mr.wc;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** 本類是客戶端用來指定wordcount job程序運行時所需要的很多參數:* 比如,指定用哪個組件作為數據讀取器、數據結果輸出器* 指定用哪個類作為map階段的業務邏輯類,哪個類作為reduce階段的業務邏輯類* 指定wordcount job程序的jar包所在路徑* ....* 運行前準備工作* 1、將當前的工程導出成wordcount.jar* 2、準備/home/toto/software/wordcount/input/a.txt 其中a.txt中的內容類似:The truenobility is in being superior to your previous self guess3、將 /home/toto/software/wordcount通過 hadoop fs -put wordcount /wordcount 上傳到hdfs中* * 以及其他各種需要的參數* hadoop jar wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver* 上面的命令等同:* java -cp wordcount.jar cn.toto.bigdata.mr.wc.WordCountDriver* * 上面的含義是通過hadoop jar將hadoop classpath的jar都拷貝到應用中,并且指定執行cn.toto.bigdata.mr.wc.WordCountDriver* * 最后查看結果的方式是:hadoop fs -cat /wordcount/output/part-r-00000,通過這個命令可以查看查看到 */ public class WordCountDriver {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();//conf.set("fs.defaultFS", "hdfs://hadoop:9000");/*conf.set("mapreduce.framework.name", "yarn");conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);//告訴框架,我們的程序所在jar包的路徑// job.setJar("c:/wordcount.jar");job.setJarByClass(WordCountDriver.class);//告訴框架,我們的程序所用的mapper類和reducer類job.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);//使用tCombiner,使用Combiner的好處是讓數據在mapper task中就做統計求和,然后將求和后的結果傳遞給//reducer,然后reducer可以在進行求和。這樣的好處是減少了reducer的工作。讓每個mapper task自己做聚合,//通過分擔的方式讓效率得以提升,由于combiner的內容結構,編程規范也是集成reducer,所以在當前場景中可以將combiner直接//設置成WordCountReducerjob.setCombinerClass(WordCountReducer.class);//告訴框架,我們的mapperreducer輸出的數據類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 告訴框架,我們的數據讀取、結果輸出所用的format組件// TextInputFormat是mapreduce框架中內置的一種讀取文本文件的輸入組件job.setInputFormatClass(TextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);// 告訴框架,我們要處理的文件在哪個路徑下,注意若hdfs中已經有了/wordcount/input/這個文件,說明FileInputFormat.setInputPaths(job, new Path("e:/wordcount/input/"));// 告訴框架,我們的處理結果要輸出到哪里去FileOutputFormat.setOutputPath(job, new Path("e:/wordcount/output/"));boolean res = job.waitForCompletion(true);System.exit(res?0:1);} }準備工作:
在E盤下準備e:/wordcount/input/a.txt,其中的內容如下:
The true nobility is in being superior to your previous self guess No great discovery was ever made without a bold Knowledge will give you power but character respect The sun is just rising in the morning of another day I I figure life is a gift and I don't intend on wasting右鍵運行上面的代碼,進入:
E:\wordcount\output\part-r-00000中看結果,結果內容如下:
經過上面的所有步驟之后,程序已經編寫完成
總結:
3.MAPREDUCE中的Combiner[dht1]?
(1)combiner是MR程序中Mapper和Reducer之外的一種組件
(2)combiner組件的父類就是Reducer
(3)combiner和reducer的區別在于運行的位置:
Combiner是在每一個maptask所在的節點運行
Reducer是接收全局所有Mapper的輸出結果;
(4) combiner的意義就是對每一個maptask的輸出進行局部匯總,以減小網絡傳輸量
具體實現步驟:
? ? 1、?自定義一個combiner繼承Reducer,重寫reduce方法
? ? 2、?在job中設置:?job.setCombinerClass(CustomCombiner.class)
(5) combiner能夠應用的前提是不能影響最終的業務邏輯
而且,combiner的輸出kv應該跟reducer的輸入kv類型要對應起來
Combiner的使用要非常謹慎
因為combiner在mapreduce過程中可能調用也肯能不調用,可能調一次也可能調多次
所以:combiner使用的原則是:有或沒有都不能影響業務邏輯
===============================================================================
?流量統計和自定義類實現序列化案例:
package cn.toto.bigdata.mr.wc.flowsum;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 自定義的類要被mapreduce使用,需要序列化WritableComparable*/ public class FlowBean implements WritableComparable<FlowBean> {private String phoneNbr;private long upFlow;private long dFlow;private long sumFlow;/*** */public FlowBean() {}/*** 序列化框架在反序列化操作創建對象實例時會調用無參構造*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNbr);out.writeLong(upFlow);out.writeLong(dFlow);out.writeLong(sumFlow);}/*** 反序列化方法,注意:字段的反序列化順序與序列化時的順序保持一致*/@Overridepublic void readFields(DataInput in) throws IOException {this.phoneNbr = in.readUTF();this.upFlow = in.readLong();this.dFlow = in.readLong();this.sumFlow = in.readLong();}@Overridepublic int compareTo(FlowBean o) {return (int)(o.getSumFlow() - this.sumFlow);}public void set(long upFlow,long dFlow) {this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public void set(String phoneNbr,long upFlow, long dFlow) {this.phoneNbr = phoneNbr;this.upFlow = upFlow;this.dFlow = dFlow;this.sumFlow = upFlow + dFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getdFlow() {return dFlow;}public void setdFlow(long dFlow) {this.dFlow = dFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public String getPhoneNbr() {return phoneNbr;}public void setPhoneNbr(String phoneNbr) {this.phoneNbr = phoneNbr;}@Overridepublic String toString() {return "FlowBean [phoneNbr=" + phoneNbr + ", upFlow=" + upFlow + ", dFlow=" + dFlow + ", sumFlow=" + sumFlow+ "]";} } package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSum {//在kv中傳輸我們自定義的對象時可以的,但是必須實現hadoop的序列化機制 implements Writablepublic static class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//將讀到的一行數據進行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取業務所需要的個字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(upFlow, dFlow);context.write(k, v);}}public static class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {FlowBean v = new FlowBean();/*** reduce方法接收到的key是某一組<a手機號,bean><a手機號,bean><a手機號,bean>中的第一個手機號* reduce方法接收到的vlaues是這一組kv中的所有bean的一個迭代器*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {long upFlowCount = 0;long dFlowCount = 0;for (FlowBean bean : values) {upFlowCount += bean.getUpFlow();dFlowCount += bean.getdFlow();}v.set(upFlowCount, dFlowCount);context.write(key, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();/** conf.set("mapreduce.framework.name", "yarn");* conf.set("yarn.resourcemanager.hostname", "mini1");*/Job job = Job.getInstance(conf);// 告訴框架,我們的程序所在jar包的路徑// job.setJar("c:/wordcount.jar");job.setJarByClass(FlowSum.class);// 告訴框架,我們的程序所用的mapper類和reducer類job.setMapperClass(FlowSumMapper.class);job.setReducerClass(FlowSumReducer.class);// 告訴框架,我們的mapperreducer輸出的數據類型/** job.setMapOutputKeyClass(Text.class);* job.setMapOutputValueClass(FlowBean.class);*/// 如果map階段輸出的數據類型跟最終輸出的數據類型一致,就只要以下兩行代碼來指定job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 框架中默認的輸入輸出組件就是這倆貨,所以可以省略這兩行代碼/** job.setInputFormatClass(TextInputFormat.class);* job.setOutputFormatClass(TextOutputFormat.class);*/// 告訴框架,我們要處理的文件在哪個路徑下FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/input/"));// 告訴框架,我們的處理結果要輸出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/output/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);} } package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 實現流量匯總并且按照流量大小倒序排序 前提:處理的數據是已經匯總過的結果文件* * @author* */ public class FlowSumSort {public static class FlowSumSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {FlowBean k = new FlowBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");String phoneNbr = fields[0];long upFlowSum = Long.parseLong(fields[1]);long dFlowSum = Long.parseLong(fields[2]);k.set(upFlowSum, dFlowSum);v.set(phoneNbr);context.write(k, v);}}public static class FlowSumSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {@Overrideprotected void reduce(FlowBean bean, Iterable<Text> phoneNbrs, Context context) throws IOException, InterruptedException {context.write(phoneNbrs.iterator().next(), bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumSort.class);// 告訴框架,我們的程序所用的mapper類和reducer類job.setMapperClass(FlowSumSortMapper.class);job.setReducerClass(FlowSumSortReducer.class);job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 告訴框架,我們的mapperreducer輸出的數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告訴框架,我們要處理的文件在哪個路徑下(注意:這里的程序執行)FileInputFormat.setInputPaths(job, new Path("E:/learnTempFolder/flow/output/"));// 告訴框架,我們的處理結果要輸出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/learnTempFolder/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);} }運行條件模擬:
1、配置環境變量為HADOOP_HOME=E:\learnTempFolder\hadoop-2.7.3
?2、從CSDN資源上下載支持win10版本的:E:\learnTempFolder\hadoop-2.7.3\bin\winutils.exe 和 E:\learnTempFolder\hadoop-2.7.3\bin\hadoop.dll
界面效果如下:
3、準備要處理的資料:
HTTP_20130313143750.dat 數據文件的具體內容如:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 2004、先運行FlowSum(右鍵執行Java程序)運行生成的文件為E:\learnTempFolder\flow\output\part-r-00000,內容如下:
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 13602846565 1938 2910 4848 13660577991 6960 690 7650 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 13826544101 264 0 264 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 55485、運行FlowSumSort(注意不要刪除上面的part-r-00000)運行后產生的文件內容是:
13502468823 7335 110349 117684 13925057413 11058 48243 59301 13726230503 2481 24681 27162 18320173382 9531 2412 11943 13560439658 2034 5892 7926 13660577991 6960 690 7650 15013685858 3659 3538 7197 13922314466 3008 3720 6728 15920133257 3156 2936 6092 84138413 4116 1432 5548 13602846565 1938 2910 4848 18211575961 1527 2106 3633 15989002119 1938 180 2118 13560436666 1116 954 2070 13926435656 132 1512 1644 13480253104 180 180 360 13826544101 264 0 264 13719199419 240 0 240當然,我們也可以一次性求和并運算出結果輸出到指定的文件目錄中,代碼如下:
package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException; import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class OneStepSumSort {public static class OneStepSumSortMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//將讀到的一行數據進行字段切分String line = value.toString();String[] fields = StringUtils.split(line,"\t");//抽取業務所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class OneStepSumSortReducer extends Reducer<Text, FlowBean, Text, FlowBean> {TreeMap<FlowBean, Text> treeMap = new TreeMap<FlowBean,Text>();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);Text text = new Text(key.toString());treeMap.put(sumBean, text);}@Overrideprotected void cleanup(Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {Set<Entry<FlowBean, Text>> entrySet = treeMap.entrySet();for (Entry<FlowBean, Text> ent : entrySet) {context.write(ent.getValue(), ent.getKey());}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(OneStepSumSort.class);// 告訴框架,我們的程序所用的mapper類和reducer類job.setMapperClass(OneStepSumSortMapper.class);job.setReducerClass(OneStepSumSortReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告訴框架,我們的mapperreducer輸出的數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 告訴框架,我們要處理的文件在哪個路徑下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));// 告訴框架,我們的處理結果要輸出到哪里去FileOutputFormat.setOutputPath(job, new Path("E:/flow/sortout/"));boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);} }到"E:/flow/sortout/"目錄下,查看結果:
即:
13502468823 FlowBean [phoneNbr=13502468823, upFlow=7335, dFlow=110349, sumFlow=117684] 13925057413 FlowBean [phoneNbr=13925057413, upFlow=11058, dFlow=48243, sumFlow=59301] 13726238888 FlowBean [phoneNbr=13726230503, upFlow=2481, dFlow=24681, sumFlow=27162] 18320173382 FlowBean [phoneNbr=18320173382, upFlow=9531, dFlow=2412, sumFlow=11943] 13560439658 FlowBean [phoneNbr=13560439658, upFlow=2034, dFlow=5892, sumFlow=7926] 13660577991 FlowBean [phoneNbr=13660577991, upFlow=6960, dFlow=690, sumFlow=7650] 15013685858 FlowBean [phoneNbr=15013685858, upFlow=3659, dFlow=3538, sumFlow=7197] 13922314466 FlowBean [phoneNbr=13922314466, upFlow=3008, dFlow=3720, sumFlow=6728] 15920133257 FlowBean [phoneNbr=15920133257, upFlow=3156, dFlow=2936, sumFlow=6092] 84138413 FlowBean [phoneNbr=84138413, upFlow=4116, dFlow=1432, sumFlow=5548] 13602846565 FlowBean [phoneNbr=13602846565, upFlow=1938, dFlow=2910, sumFlow=4848] 18211575961 FlowBean [phoneNbr=18211575961, upFlow=1527, dFlow=2106, sumFlow=3633] 15989002119 FlowBean [phoneNbr=15989002119, upFlow=1938, dFlow=180, sumFlow=2118] 13560436666 FlowBean [phoneNbr=13560436666, upFlow=1116, dFlow=954, sumFlow=2070] 13926435656 FlowBean [phoneNbr=13926435656, upFlow=132, dFlow=1512, sumFlow=1644] 13480253104 FlowBean [phoneNbr=13480253104, upFlow=180, dFlow=180, sumFlow=360] 13826544101 FlowBean [phoneNbr=13826544101, upFlow=264, dFlow=0, sumFlow=264] 13926251106 FlowBean [phoneNbr=13719199419, upFlow=240, dFlow=0, sumFlow=240]6、為不同的手機號設置分區,讓不同的手機號在不同的文件中。方法如下:
A:下面是自定義分區,自定分區的代碼如下:
package cn.toto.bigdata.mr.wc.flowsum;import java.util.HashMap;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner;/*** 自定義分區要繼承Partitioner*/ public class ProvincePartitioner extends Partitioner<Text, FlowBean>{private static HashMap<String, Integer> provinceMap = new HashMap<String, Integer>();static {provinceMap.put("138", 0);provinceMap.put("139", 1);provinceMap.put("136", 2);provinceMap.put("137", 3);provinceMap.put("135", 4);}@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = provinceMap.get(key.toString().substring(0,3));if (code != null) {return code;}return 5;} }B:測試一下自定義分區: package cn.toto.bigdata.mr.wc.flowsum;import java.io.IOException;import org.apache.commons.lang.StringEscapeUtils; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowSumProvince {public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 將讀到的一行數據進行字段切分String line = value.toString();String[] fields = StringUtils.split(line, "\t");// 抽取業務所需要的各字段String phoneNbr = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dFlow = Long.parseLong(fields[fields.length - 2]);k.set(phoneNbr);v.set(phoneNbr, upFlow, dFlow);context.write(k, v);}}public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {int upCount = 0;int dCount = 0;for (FlowBean bean : values) {upCount += bean.getUpFlow();dCount += bean.getdFlow();}FlowBean sumBean = new FlowBean();sumBean.set(key.toString(), upCount, dCount);context.write(key, sumBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(FlowSumProvince.class);// 告訴框架,我們的程序所用的mapper類和reducer類job.setMapperClass(FlowSumProvinceMapper.class);job.setReducerClass(FlowSumProvinceReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 告訴框架,我們的mapperreducer輸出的數據類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//設置shuffle的分區組件使用我們自定義的分區組件,按照手機號進行分區,注意在自定義的手機號分區中有5個,所以我們的分區不能少于6個job.setPartitionerClass(ProvincePartitioner.class);//設置reduce task的數量job.setNumReduceTasks(6);//告訴框架,我們要處理的文件在哪個路徑下FileInputFormat.setInputPaths(job, new Path("E:/flow/input/"));//告訴框架,我們的處理結果要輸出到哪里去Path out = new Path("E:/flow/provinceout/");FileSystem fs = FileSystem.get(conf);if (fs.exists(out)) {fs.delete(out,true);}FileOutputFormat.setOutputPath(job, out);boolean res = job.waitForCompletion(true);System.exit(res ? 0 : 1);} } C:運行所需的準備:數據文件:
文件內容如下:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200運行后的結果如下:
part-r-00001中內容:
等等
?
總結
以上是生活随笔為你收集整理的使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 风险暴露是指什么
- 下一篇: 藏在农村的暴利小生意 2019可以选择