MAPREDUCE的实战案例
reduce端join算法實現(xiàn)
?
1、需求:
?
訂單數(shù)據(jù)表t_order:
?
| id | date | pid | amount |
| 1001 | 20150710 | P0001 | 2 |
| 1002 | 20150710 | P0001 | 3 |
| 1002 | 20150710 | P0002 | 3 |
?
?
?
商品信息表t_product
?
| id | pname | category_id | price |
| P0001 | 小米5 | 1000 | 2 |
| P0002 | 錘子T1 | 1000 | 3 |
假如數(shù)據(jù)量巨大,兩表的數(shù)據(jù)是以文件的形式存儲在HDFS中,需要用mapreduce程序來實現(xiàn)一下SQL查詢運算:?
select ?a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id
2、實現(xiàn)機制:
通過將關(guān)聯(lián)的條件作為map輸出的key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進行數(shù)據(jù)的串聯(lián)
?
public class OrderJoin {static class OrderJoinMapper extends Mapper<LongWritable, Text, Text, OrderJoinBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 拿到一行數(shù)據(jù),并且要分辨出這行數(shù)據(jù)所屬的文件String line = value.toString();String[] fields = line.split("\t");// 拿到itemidString itemid = fields[0];// 獲取到這一行所在的文件名(通過inpusplit)String name = "你拿到的文件名";// 根據(jù)文件名,切分出各字段(如果是a,切分出兩個字段,如果是b,切分出3個字段) OrderJoinBean bean = new OrderJoinBean();bean.set(null, null, null, null, null);context.write(new Text(itemid), bean);}}static class OrderJoinReducer extends Reducer<Text, OrderJoinBean, OrderJoinBean, NullWritable> {@Overrideprotected void reduce(Text key, Iterable<OrderJoinBean> beans, Context context) throws IOException, InterruptedException {//拿到的key是某一個itemid,比如1000//拿到的beans是來自于兩類文件的bean// {1000,amount} {1000,amount} {1000,amount} --- {1000,price,name}//將來自于b文件的bean里面的字段,跟來自于a的所有bean進行字段拼接并輸出 }} }缺點:這種方式中,join的操作是在reduce階段完成,reduce端的處理壓力太大,map節(jié)點的運算負載則很低,資源利用率不高,且在reduce階段極易產(chǎn)生數(shù)據(jù)傾斜
?
解決方案: map端join實現(xiàn)方式
?
?
1、原理闡述
?
適用于關(guān)聯(lián)表中有小表的情形;
?
可以將小表分發(fā)到所有的map節(jié)點,這樣,map節(jié)點就可以在本地對自己所讀到的大表數(shù)據(jù)進行join并輸出最終結(jié)果,可以大大提高join操作的并發(fā)度,加快處理速度
?
2、實現(xiàn)示例
?
--先在mapper類中預先定義好小表,進行join
?
--引入實際場景中的解決方案:一次加載數(shù)據(jù)庫或者用distributedcache
?
?
public class TestDistributedCache {static class TestDistributedCacheMapper extends Mapper<LongWritable, Text, Text, Text>{FileReader in = null;BufferedReader reader = null;HashMap<String,String> b_tab = new HashMap<String, String>();String localpath =null;String uirpath = null;//是在map任務(wù)初始化的時候調(diào)用一次 @Overrideprotected void setup(Context context) throws IOException, InterruptedException {//通過這幾句代碼可以獲取到cache file的本地絕對路徑,測試驗證用Path[] files = context.getLocalCacheFiles();localpath = files[0].toString();URI[] cacheFiles = context.getCacheFiles();//緩存文件的用法——直接用本地IO來讀取//這里讀的數(shù)據(jù)是map task所在機器本地工作目錄中的一個小文件in = new FileReader("b.txt");reader =new BufferedReader(in);String line =null;while(null!=(line=reader.readLine())){String[] fields = line.split(",");b_tab.put(fields[0],fields[1]);}IOUtils.closeStream(reader);IOUtils.closeStream(in);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//這里讀的是這個map task所負責的那一個切片數(shù)據(jù)(在hdfs上)String[] fields = value.toString().split("\t");String a_itemid = fields[0];String a_amount = fields[1];String b_name = b_tab.get(a_itemid);// 輸出結(jié)果 1001 98.9 banancontext.write(new Text(a_itemid), new Text(a_amount + "\t" + ":" + localpath + "\t" +b_name ));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TestDistributedCache.class);job.setMapperClass(TestDistributedCacheMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);//這里是我們正常的需要處理的數(shù)據(jù)所在路徑FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//不需要reducerjob.setNumReduceTasks(0);//分發(fā)一個文件到task進程的工作目錄job.addCacheFile(new URI("hdfs://hadoop-server01:9000/cachefile/b.txt"));//分發(fā)一個歸檔文件到task進程的工作目錄 // job.addArchiveToClassPath(archive);//分發(fā)jar包到task節(jié)點的classpath下 // job.addFileToClassPath(jarfile); job.waitForCompletion(true);} }?
?
web日志預處理
1、需求:
對web訪問日志中的各字段識別切分
去除日志中不合法的記錄
根據(jù)KPI統(tǒng)計需求,生成各類訪問請求過濾數(shù)據(jù)
?
2、實現(xiàn)代碼:
a) 定義一個bean,用來記錄日志數(shù)據(jù)中的各數(shù)據(jù)字段
?
public class WebLogBean {private String remote_addr;// 記錄客戶端的ip地址private String remote_user;// 記錄客戶端用戶名稱,忽略屬性"-"private String time_local;// 記錄訪問時間與時區(qū)private String request;// 記錄請求的url與http協(xié)議private String status;// 記錄請求狀態(tài);成功是200private String body_bytes_sent;// 記錄發(fā)送給客戶端文件主體內(nèi)容大小private String http_referer;// 用來記錄從那個頁面鏈接訪問過來的private String http_user_agent;// 記錄客戶瀏覽器的相關(guān)信息private boolean valid = true;// 判斷數(shù)據(jù)是否合法public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {this.remote_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {this.remote_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {this.request = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("\001").append(this.remote_addr);sb.append("\001").append(this.remote_user);sb.append("\001").append(this.time_local);sb.append("\001").append(this.request);sb.append("\001").append(this.status);sb.append("\001").append(this.body_bytes_sent);sb.append("\001").append(this.http_referer);sb.append("\001").append(this.http_user_agent);return sb.toString(); } }?
?
?
b)定義一個parser用來解析過濾web訪問日志原始記錄
?
public class WebLogParser {public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);webLogBean.setTime_local(arr[3].substring(1));webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);if (arr.length > 12) {webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP錯誤webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static String parserTime(String time) {time.replace("/", "-");return time;} }?
c) mapreduce程序
?
public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Text k = new Text();NullWritable v = NullWritable.get();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();WebLogBean webLogBean = WebLogParser.parser(line);if (!webLogBean.isValid())return;k.set(webLogBean.toString());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);} }?
?
?
?
流量統(tǒng)計相關(guān)需求
1、對流量日志中的用戶統(tǒng)計總上、下行流量技術(shù)點: 自定義javaBean用來在mapreduce中充當value
注意: javaBean要實現(xiàn)Writable接口,實現(xiàn)兩個方法
//序列化,將對象的字段信息寫入輸出流 @Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upflow);out.writeLong(downflow);out.writeLong(sumflow);}//反序列化,從輸入流中讀取各個字段信息 @Overridepublic void readFields(DataInput in) throws IOException {upflow = in.readLong();downflow = in.readLong();sumflow = in.readLong();}1、統(tǒng)計流量且按照流量大小倒序排序
技術(shù)點:這種需求,用一個mapreduce -job 不好實現(xiàn),需要兩個mapreduce -job
第一個job負責流量統(tǒng)計,跟上題相同
第二個job讀入第一個job的輸出,然后做排序
要將flowBean作為map的key輸出,這樣mapreduce就會自動排序?此時,flowBean要實現(xiàn)接口WritableComparable? ?要實現(xiàn)其中的compareTo()方法,方法中,我們可以定義倒序比較的邏輯
1、統(tǒng)計流量且按照手機號的歸屬地,將結(jié)果數(shù)據(jù)輸出到不同的省份文件中技術(shù)點:自定義Partitioner
@Overridepublic int getPartition(Text key, FlowBean value, int numPartitions) {String prefix = key.toString().substring(0,3);Integer partNum = pmap.get(prefix);return (partNum==null?4:partNum);}自定義partition后,要根據(jù)自定義partitioner的邏輯設(shè)置相應數(shù)量的reduce task
| job.setNumReduceTasks(5); |
?
注意:如果reduceTask的數(shù)量>= getPartition的結(jié)果數(shù) ?,則會多產(chǎn)生幾個空的輸出文件part-r-000xx
如果 ????1<reduceTask的數(shù)量<getPartition的結(jié)果數(shù) ,則有一部分分區(qū)數(shù)據(jù)無處安放,會Exception!!!
如果 reduceTask的數(shù)量=1,則不管mapTask端輸出多少個分區(qū)文件,最終結(jié)果都交給這一個reduceTask,最終也就只會產(chǎn)生一個結(jié)果文件 part-r-00000
?
社交粉絲數(shù)據(jù)分析
以下是qq的好友列表數(shù)據(jù),冒號前是一個用,冒號后是該用戶的所有好友(數(shù)據(jù)中的好友關(guān)系是單向的)
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J
?
求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?
解題思路:
求出哪些人兩兩之間有共同好友,及他倆的共同好友都有誰?
解題思路:
| 第一步 ? map 讀一行 ??A:B,C,D,F,E,O 輸出 ???<B,A><C,A><D,A><F,A><E,A><O,A> 在讀一行 ??B:A,C,E,K 輸出 ??<A,B><C,B><E,B><K,B> ? ? REDUCE 拿到的數(shù)據(jù)比如<C,A><C,B><C,E><C,F><C,G>...... 輸出: ? <A-B,C> <A-E,C> <A-F,C> <A-G,C> <B-E,C> <B-F,C>..... ? ? ? 第二步 map 讀入一行<A-B,C> 直接輸出<A-B,C> ? reduce 讀入數(shù)據(jù) ?<A-B,C><A-B,F><A-B,G>....... 輸出: A-B ?C,F,G,..... |
?
package cn.itcast.bigdata.mr.fensi;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepOne {static class SharedFriendsStepOneMapper extends Mapper<LongWritable, Text, Text, Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// A:B,C,D,F,E,OString line = value.toString();String[] person_friends = line.split(":");String person = person_friends[0];String friends = person_friends[1];for (String friend : friends.split(",")) {// 輸出<好友,人>context.write(new Text(friend), new Text(person));}}}static class SharedFriendsStepOneReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text friend, Iterable<Text> persons, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text person : persons) {sb.append(person).append(",");}context.write(friend, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepOne.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepOneMapper.class);job.setReducerClass(SharedFriendsStepOneReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/srcdata/friends"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out"));job.waitForCompletion(true);}} package cn.itcast.bigdata.mr.fensi;import java.io.IOException; import java.util.Arrays;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class SharedFriendsStepTwo {static class SharedFriendsStepTwoMapper extends Mapper<LongWritable, Text, Text, Text> {// 拿到的數(shù)據(jù)是上一個步驟的輸出結(jié)果// A I,K,C,B,G,F,H,O,D,// 友 人,人,人 @Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] friend_persons = line.split("\t");String friend = friend_persons[0];String[] persons = friend_persons[1].split(",");Arrays.sort(persons);for (int i = 0; i < persons.length - 1; i++) {for (int j = i + 1; j < persons.length; j++) {// 發(fā)出 <人-人,好友> ,這樣,相同的“人-人”對的所有好友就會到同1個reduce中去context.write(new Text(persons[i] + "-" + persons[j]), new Text(friend));}}}}static class SharedFriendsStepTwoReducer extends Reducer<Text, Text, Text, Text> {@Overrideprotected void reduce(Text person_person, Iterable<Text> friends, Context context) throws IOException, InterruptedException {StringBuffer sb = new StringBuffer();for (Text friend : friends) {sb.append(friend).append(" ");}context.write(person_person, new Text(sb.toString()));}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SharedFriendsStepTwo.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapperClass(SharedFriendsStepTwoMapper.class);job.setReducerClass(SharedFriendsStepTwoReducer.class);FileInputFormat.setInputPaths(job, new Path("D:/temp/out/part-r-00000"));FileOutputFormat.setOutputPath(job, new Path("D:/temp/out2"));job.waitForCompletion(true);}}?
?
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/duan2/p/7538049.html
總結(jié)
以上是生活随笔為你收集整理的MAPREDUCE的实战案例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: USACO-Section1.4 Wor
- 下一篇: 第1次作业:谈谈我的看法与感想