电信信息日志使用mapreduce统计的两种方式
生活随笔
收集整理的這篇文章主要介紹了
电信信息日志使用mapreduce统计的两种方式
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
信息準備:
數據信息:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 視頻網站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站點統計 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站點統計 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 綜合門戶 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 綜合門戶 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985079 13823070001 20-7C-8F-70-68-1F:CMCC 120.196.100.99 6 3 360 180 200 1363157985069 13600217502 00-1F-64-E2-E8-B1:CMCC 120.196.100.55 18 138 1080 186852 200問題提出,統計? 手機號? 上行下行數據包 上行下行總流量?
?
1.使用hadoop的序列化【進行了手機號是否匹配的分區】
package Hadoop;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Arrays;public class TrafficCountApp {public static class TrafficWritable implements Writable {private String phoneNo;private int upPackNo;private int downPackNo;private int upPayLoad;private int downPayLoad;public TrafficWritable(){}public TrafficWritable(String[] split){this(split[1], Integer.parseInt(split[6]), Integer.parseInt(split[7]),Integer.parseInt(split[8]),Integer.parseInt(split[9]));}public TrafficWritable(String phoneNo, int upPackNo, int downPackNo, int upPayLoad, int downPayLoad) {this.phoneNo = phoneNo;this.upPackNo = upPackNo;this.downPackNo = downPackNo;this.upPayLoad = upPayLoad;this.downPayLoad = downPayLoad;}public String getPhoneNo() {return phoneNo;}public void setPhoneNo(String phoneNo) {this.phoneNo = phoneNo;}public int getUpPackNo() {return upPackNo;}public void setUpPackNo(int upPackNo) {this.upPackNo = upPackNo;}public int getDownPackNo() {return downPackNo;}public void setDownPackNo(int downPackNo) {this.downPackNo = downPackNo;}public int getUpPayLoad() {return upPayLoad;}public void setUpPayLoad(int upPayLoad) {this.upPayLoad = upPayLoad;}public int getDownPayLoad() {return downPayLoad;}public void setDownPayLoad(int downPayLoad) {this.downPayLoad = downPayLoad;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(phoneNo);out.writeInt(upPackNo);out.writeInt(downPackNo);out.writeInt(upPayLoad);out.writeInt(downPayLoad);}@Overridepublic void readFields(DataInput in) throws IOException {phoneNo = in.readUTF();upPackNo = in.readInt();downPackNo = in.readInt();upPayLoad = in.readInt();downPayLoad = in.readInt();}@Overridepublic String toString() {return upPackNo+"\t"+downPackNo+"\t"+upPayLoad+"\t"+downPayLoad;}}//1.自定義mapperpublic static class TrafficCountMapper extends Mapper<LongWritable, Text, Text, TrafficWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {System.out.println(value.toString());String[] split = value.toString().split("\\s");System.out.println(Arrays.toString(split));context.write(new Text(split[1]), new TrafficWritable(split));}}//2.自定義reducerpublic static class TrafficCountReducer extends Reducer<Text, TrafficWritable, Text, TrafficWritable>{@Overrideprotected void reduce(Text key, Iterable<TrafficWritable> values, Context context) throws IOException, InterruptedException {int upPackNo =0;int downPackNo =0;int upPayLoad =0;int downPayLoad =0;for (TrafficWritable tw:values) {upPackNo+=tw.getUpPackNo();downPackNo += tw.getDownPackNo();upPayLoad += tw.getUpPayLoad();downPayLoad += tw.getDownPayLoad();}TrafficWritable v3 = new TrafficWritable(key.toString(), upPackNo, downPackNo, upPayLoad, downPayLoad);context.write(key, v3);}}//3.寫一個驅動方法public static void main(String[] args) throws Exception {//使用一個job類的實例Configuration conf = new Configuration();Job job = Job.getInstance(conf);//下面一行很重要job.setJarByClass(TrafficCountApp.class);//自定義的mapper、reducerjob.setMapperClass(TrafficCountMapper.class);job.setReducerClass(TrafficCountReducer.class);//mapper的輸出k、v類型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TrafficWritable.class);//reducer的輸出k、v的類型job.setOutputKeyClass(Text.class);job.setOutputValueClass(TrafficWritable.class);//job的輸入hdfs、輸出hdfsFileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobjob.waitForCompletion(true);} }?
2.使用Text
【注】hadoop中的字符串表示形式:Text
package com.henu;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author George* @description 不使用序列化**/ public class DianXinCount {public static class DianXinMap extends Mapper<LongWritable, Text,Text, Text>{Text t1 = new Text();Text v1 = new Text();String str1 = "";@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] strings = line.split("\\s");str1 = strings[6]+"\t"+strings[7]+"\t"+strings[8]+"\t"+strings[9];t1.set(strings[1]);v1.set(str1);context.write(t1,v1);}}public static class DianXinReduce extends Reducer<Text,Text,Text,Text>{int upPackNo =0;int downPackNo =0;int upPayLoad =0;int downPayLoad =0;Text v2 = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {for (Text s : values) {//注:\t并不識別 //s+String[] strings = s.toString().split("\t");upPackNo+=Integer.parseInt(strings[0]);downPackNo+=Integer.parseInt(strings[1]);upPayLoad+=Integer.parseInt(strings[2]);downPayLoad+=Integer.parseInt(strings[3]);v2.set(upPackNo+"\t"+downPackNo+"\t"+upPayLoad+"\t"+downPayLoad);}upPackNo =0;downPackNo =0;upPayLoad =0;downPayLoad =0;context.write(key,v2);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(DianXinCount.class);job.setMapperClass(DianXinMap.class);job.setReducerClass(DianXinReduce.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));job.waitForCompletion(true);} }【注】在整個過程中,特別是字符串的拆分上,建議多使用一下測試類:
package com.henu;/*** @author George* @description**/ public class Test {public static void main(String[] args) {String s = "aaa"+"\t"+"bbb"+"\t"+"ccc"+"\t"+"aaa";String[] split = s.split("\t");System.out.println(split.length);for (String s1 : split) {System.out.println(s1);}System.out.println("----------------");System.out.println(split[1]);System.out.println(split[2]);/*String str = "1363157985066\t13726230503\t00-FD-07-A4-72-B8:CMCC\t120.196.100.82\ti02.c.aliimg.com\t\t24\t27\t2481\t24681\t200";String[] strings = str.split("\\s");for (String s : strings) {System.out.println(s);}System.out.println("-------------");System.out.println(strings[6]);System.out.println(strings[1]);*/} }結果展示:
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的电信信息日志使用mapreduce统计的两种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: split分片主要源码解析
- 下一篇: Hadoop企业优化