ChainMapper和ChainReducer
生活随笔
收集整理的這篇文章主要介紹了
ChainMapper和ChainReducer
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
The ChainMapper class allows to use multiple Mapper classes within a single Map task.?
import?java.io.IOException;
import?java.util.*;
import?java.lang.String;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.conf.*;
import?org.apache.hadoop.io.*;
import?org.apache.hadoop.mapred.*;
import?org.apache.hadoop.util.*;
import?org.apache.hadoop.mapred.lib.*;
public?class?WordCount
{
????public?static?class?Map00?extends?MapReduceBase?implements?Mapper
????{
????????public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????Text?ft?=?new?Text(“100″);
????????????if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}
????public?static?class?Map01?extends?MapReduceBase?implements?Mapper
????{
????????public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????Text?ft?=?new?Text(“101″);
????????????if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}
????public?static?class?Reduce?extends?MapReduceBase?implements?Reducer
????{
????????public?void?reduce(Text?key,?Iterator?values,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????while(values.hasNext())
????????????{
????????????????output.collect(key,?values.next());
????????????}
????????}
????}
????public?static?void?main(String[]?args)?throws?Exception
????{
????????JobConf?conf?=?new?JobConf(WordCount.class);
????????conf.setJobName(“wordcount00″);
????????conf.setInputFormat(KeyValueTextInputFormat.class);
????????conf.setOutputFormat(TextOutputFormat.class);
????????ChainMapper?cm?=?new?ChainMapper();
????????JobConf?mapAConf?=?new?JobConf(false);
????????cm.addMapper(conf,?Map00.class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapAConf);
????????JobConf?mapBConf?=?new?JobConf(false);
????????cm.addMapper(conf,?Map01.class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapBConf);
????????conf.setReducerClass(Reduce.class);
????????conf00.setOutputKeyClass(Text.class);
????????conf00.setOutputValueClass(Text.class);
????????FileInputFormat.setInputPaths(conf,?new?Path(args[0]));
????????FileOutputFormat.setOutputPath(conf,?new?Path(args[1]));
????????JobClient.runJob(conf);
????}
}?
import?java.io.DataInput;
import?java.io.DataOutput;
import?java.io.IOException;
import?java.text.SimpleDateFormat;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.io.WritableComparable;
import?org.apache.hadoop.mapred.FileInputFormat;
import?org.apache.hadoop.mapred.FileOutputFormat;
import?org.apache.hadoop.mapred.JobConf;
import?org.apache.hadoop.mapred.MapReduceBase;
import?org.apache.hadoop.mapred.Mapper;
import?org.apache.hadoop.mapred.OutputCollector;
import?org.apache.hadoop.mapred.Reporter;
import?org.apache.hadoop.mapred.TextInputFormat;
import?org.apache.hadoop.mapred.TextOutputFormat;
import?org.apache.hadoop.mapred.jobcontrol.Job;
import?org.apache.hadoop.mapred.jobcontrol.JobControl;
import?org.apache.hadoop.mapred.lib.ChainMapper;
import?com.oncedq.code.util.DateUtil;
public?class?ProcessSample?{
????public?static?class?ExtractMappper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Text,?LongWritable,?Conn1>?{
????????@Override
????????public?void?map(LongWritable?arg0,?Text?arg1,
????????????????OutputCollector<LongWritable,?Conn1>?arg2,?Reporter?arg3)
????????????????throws?IOException?{
????????????String?line?=?arg1.toString();
????????????String[]?strs?=?line.split(";");
????????????Conn1?conn1?=?new?Conn1();
????????????conn1.orderKey?=?Long.parseLong(strs[0]);
????????????conn1.customer?=?Long.parseLong(strs[1]);
????????????conn1.state?=?strs[2];
????????????conn1.price?=?Double.parseDouble(strs[3]);
????????????conn1.orderDate?=?DateUtil.getDateFromString(strs[4],?"yyyy-MM-dd");
????????????LongWritable?lw?=?new?LongWritable(conn1.orderKey);
????????????arg2.collect(lw,?conn1);
????????}
????}
????private?static?class?Conn1?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?Filter1Mapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn1,?LongWritable,?Conn2>?{
????????@Override
????????public?void?map(LongWritable?inKey,?Conn1?c2,
????????????????OutputCollector<LongWritable,?Conn2>?collector,?Reporter?report)
????????????????throws?IOException?{
????????????if?(c2.state.equals("F"))?{
????????????????Conn2?inValue?=?new?Conn2();
????????????????inValue.customer?=?c2.customer;
????????????????inValue.orderDate?=?c2.orderDate;
????????????????inValue.orderKey?=?c2.orderKey;
????????????????inValue.price?=?c2.price;
????????????????inValue.state?=?c2.state;
????????????????collector.collect(inKey,?inValue);
????????????}
????????}
????}
????private?static?class?Conn2?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?RegexMapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn2,?LongWritable,?Conn3>?{
????????@Override
????????public?void?map(LongWritable?inKey,?Conn2?c3,
????????????????OutputCollector<LongWritable,?Conn3>?collector,?Reporter?report)
????????????????throws?IOException?{
????????????c3.state?=?c3.state.replaceAll("F",?"Find");
????????????Conn3?c2?=?new?Conn3();
????????????c2.customer?=?c3.customer;
????????????c2.orderDate?=?c3.orderDate;
????????????c2.orderKey?=?c3.orderKey;
????????????c2.price?=?c3.price;
????????????c2.state?=?c3.state;
????????????collector.collect(inKey,?c2);
????????}
????}
????private?static?class?Conn3?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?LoadMapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn3,?LongWritable,?Conn3>?{
????????@Override
????????public?void?map(LongWritable?arg0,?Conn3?arg1,
????????????????OutputCollector<LongWritable,?Conn3>?arg2,?Reporter?arg3)
????????????????throws?IOException?{
????????????arg2.collect(arg0,?arg1);
????????}
????}
????public?static?void?main(String[]?args)?{
????????JobConf?job?=?new?JobConf(ProcessSample.class);
????????job.setJobName("ProcessSample");
????????job.setNumReduceTasks(0);
????????job.setInputFormat(TextInputFormat.class);
????????job.setOutputFormat(TextOutputFormat.class);
????????JobConf?mapper1?=?new?JobConf();
????????JobConf?mapper2?=?new?JobConf();
????????JobConf?mapper3?=?new?JobConf();
????????JobConf?mapper4?=?new?JobConf();
????????ChainMapper?cm?=?new?ChainMapper();
????????cm.addMapper(job,?ExtractMappper.class,?LongWritable.class,?Text.class,
????????????????LongWritable.class,?Conn1.class,?true,?mapper1);
????????cm.addMapper(job,?Filter1Mapper.class,?LongWritable.class,?Conn1.class,
????????????????LongWritable.class,?Conn2.class,?true,?mapper2);
????????cm.addMapper(job,?RegexMapper.class,?LongWritable.class,?Conn2.class,
????????????????LongWritable.class,?Conn3.class,?true,?mapper3);
????????cm.addMapper(job,?LoadMapper.class,?LongWritable.class,?Conn3.class,
????????????????LongWritable.class,?Conn3.class,?true,?mapper4);
????????FileInputFormat.setInputPaths(job,?new?Path("orderData"));
????????FileOutputFormat.setOutputPath(job,?new?Path("orderDataOutput"));
????????Job?job1;
????????try?{
????????????job1?=?new?Job(job);
????????????JobControl?jc?=?new?JobControl("test");
????????????jc.addJob(job1);
????????????jc.run();
????????}?catch?(IOException?e)?{
????????????//?TODO?Auto-generated?catch?block
????????????e.printStackTrace();
????????}
????}
}
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
?
?http://www.oratea.net/?p=371
?通過ChainMapper可以將多個map類合并成一個map任務。
下面個這個例子沒什么實際意思,但是很好的演示了ChainMapper的作用。
源文件
100 tom 90
101 mary 85
102 kate 60
map00的結果,過濾掉100的記錄
101 mary 85
102 kate 60
map01的結果,過濾掉101的記錄
102 kate 60
reduce結果
102 kate 60
?package?org.myorg;
import?java.io.IOException;
import?java.util.*;
import?java.lang.String;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.conf.*;
import?org.apache.hadoop.io.*;
import?org.apache.hadoop.mapred.*;
import?org.apache.hadoop.util.*;
import?org.apache.hadoop.mapred.lib.*;
public?class?WordCount
{
????public?static?class?Map00?extends?MapReduceBase?implements?Mapper
????{
????????public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????Text?ft?=?new?Text(“100″);
????????????if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}
????public?static?class?Map01?extends?MapReduceBase?implements?Mapper
????{
????????public?void?map(Text?key,?Text?value,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????Text?ft?=?new?Text(“101″);
????????????if(!key.equals(ft))
????????????{
????????????????output.collect(key,?value);
????????????}
????????}
????}
????public?static?class?Reduce?extends?MapReduceBase?implements?Reducer
????{
????????public?void?reduce(Text?key,?Iterator?values,?OutputCollector?output,?Reporter?reporter)?throws?IOException
????????{
????????????while(values.hasNext())
????????????{
????????????????output.collect(key,?values.next());
????????????}
????????}
????}
????public?static?void?main(String[]?args)?throws?Exception
????{
????????JobConf?conf?=?new?JobConf(WordCount.class);
????????conf.setJobName(“wordcount00″);
????????conf.setInputFormat(KeyValueTextInputFormat.class);
????????conf.setOutputFormat(TextOutputFormat.class);
????????ChainMapper?cm?=?new?ChainMapper();
????????JobConf?mapAConf?=?new?JobConf(false);
????????cm.addMapper(conf,?Map00.class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapAConf);
????????JobConf?mapBConf?=?new?JobConf(false);
????????cm.addMapper(conf,?Map01.class,?Text.class,?Text.class,?Text.class,?Text.class,?true,?mapBConf);
????????conf.setReducerClass(Reduce.class);
????????conf00.setOutputKeyClass(Text.class);
????????conf00.setOutputValueClass(Text.class);
????????FileInputFormat.setInputPaths(conf,?new?Path(args[0]));
????????FileOutputFormat.setOutputPath(conf,?new?Path(args[1]));
????????JobClient.runJob(conf);
????}
}?
?
另外一個例子,代碼很多,其實很簡單,Conn幾個類都是相同的
http://yixiaohuamax.iteye.com/blog/684244?
package?com.oncedq.code;import?java.io.DataInput;
import?java.io.DataOutput;
import?java.io.IOException;
import?java.text.SimpleDateFormat;
import?org.apache.hadoop.fs.Path;
import?org.apache.hadoop.io.LongWritable;
import?org.apache.hadoop.io.Text;
import?org.apache.hadoop.io.WritableComparable;
import?org.apache.hadoop.mapred.FileInputFormat;
import?org.apache.hadoop.mapred.FileOutputFormat;
import?org.apache.hadoop.mapred.JobConf;
import?org.apache.hadoop.mapred.MapReduceBase;
import?org.apache.hadoop.mapred.Mapper;
import?org.apache.hadoop.mapred.OutputCollector;
import?org.apache.hadoop.mapred.Reporter;
import?org.apache.hadoop.mapred.TextInputFormat;
import?org.apache.hadoop.mapred.TextOutputFormat;
import?org.apache.hadoop.mapred.jobcontrol.Job;
import?org.apache.hadoop.mapred.jobcontrol.JobControl;
import?org.apache.hadoop.mapred.lib.ChainMapper;
import?com.oncedq.code.util.DateUtil;
public?class?ProcessSample?{
????public?static?class?ExtractMappper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Text,?LongWritable,?Conn1>?{
????????@Override
????????public?void?map(LongWritable?arg0,?Text?arg1,
????????????????OutputCollector<LongWritable,?Conn1>?arg2,?Reporter?arg3)
????????????????throws?IOException?{
????????????String?line?=?arg1.toString();
????????????String[]?strs?=?line.split(";");
????????????Conn1?conn1?=?new?Conn1();
????????????conn1.orderKey?=?Long.parseLong(strs[0]);
????????????conn1.customer?=?Long.parseLong(strs[1]);
????????????conn1.state?=?strs[2];
????????????conn1.price?=?Double.parseDouble(strs[3]);
????????????conn1.orderDate?=?DateUtil.getDateFromString(strs[4],?"yyyy-MM-dd");
????????????LongWritable?lw?=?new?LongWritable(conn1.orderKey);
????????????arg2.collect(lw,?conn1);
????????}
????}
????private?static?class?Conn1?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?Filter1Mapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn1,?LongWritable,?Conn2>?{
????????@Override
????????public?void?map(LongWritable?inKey,?Conn1?c2,
????????????????OutputCollector<LongWritable,?Conn2>?collector,?Reporter?report)
????????????????throws?IOException?{
????????????if?(c2.state.equals("F"))?{
????????????????Conn2?inValue?=?new?Conn2();
????????????????inValue.customer?=?c2.customer;
????????????????inValue.orderDate?=?c2.orderDate;
????????????????inValue.orderKey?=?c2.orderKey;
????????????????inValue.price?=?c2.price;
????????????????inValue.state?=?c2.state;
????????????????collector.collect(inKey,?inValue);
????????????}
????????}
????}
????private?static?class?Conn2?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?RegexMapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn2,?LongWritable,?Conn3>?{
????????@Override
????????public?void?map(LongWritable?inKey,?Conn2?c3,
????????????????OutputCollector<LongWritable,?Conn3>?collector,?Reporter?report)
????????????????throws?IOException?{
????????????c3.state?=?c3.state.replaceAll("F",?"Find");
????????????Conn3?c2?=?new?Conn3();
????????????c2.customer?=?c3.customer;
????????????c2.orderDate?=?c3.orderDate;
????????????c2.orderKey?=?c3.orderKey;
????????????c2.price?=?c3.price;
????????????c2.state?=?c3.state;
????????????collector.collect(inKey,?c2);
????????}
????}
????private?static?class?Conn3?implements?WritableComparable<Conn1>?{
????????public?long?orderKey;
????????public?long?customer;
????????public?String?state;
????????public?double?price;
????????public?java.util.Date?orderDate;
????????@Override
????????public?void?readFields(DataInput?in)?throws?IOException?{
????????????orderKey?=?in.readLong();
????????????customer?=?in.readLong();
????????????state?=?Text.readString(in);
????????????price?=?in.readDouble();
????????????orderDate?=?DateUtil.getDateFromString(Text.readString(in),
????????????????????"yyyy-MM-dd");
????????}
????????@Override
????????public?void?write(DataOutput?out)?throws?IOException?{
????????????out.writeLong(orderKey);
????????????out.writeLong(customer);
????????????Text.writeString(out,?state);
????????????out.writeDouble(price);
????????????Text.writeString(out,?DateUtil.getDateStr(orderDate,?"yyyy-MM-dd"));
????????}
????????@Override
????????public?int?compareTo(Conn1?arg0)?{
????????????//?TODO?Auto-generated?method?stub
????????????return?0;
????????}
????}
????public?static?class?LoadMapper?extends?MapReduceBase?implements
????????????Mapper<LongWritable,?Conn3,?LongWritable,?Conn3>?{
????????@Override
????????public?void?map(LongWritable?arg0,?Conn3?arg1,
????????????????OutputCollector<LongWritable,?Conn3>?arg2,?Reporter?arg3)
????????????????throws?IOException?{
????????????arg2.collect(arg0,?arg1);
????????}
????}
????public?static?void?main(String[]?args)?{
????????JobConf?job?=?new?JobConf(ProcessSample.class);
????????job.setJobName("ProcessSample");
????????job.setNumReduceTasks(0);
????????job.setInputFormat(TextInputFormat.class);
????????job.setOutputFormat(TextOutputFormat.class);
????????JobConf?mapper1?=?new?JobConf();
????????JobConf?mapper2?=?new?JobConf();
????????JobConf?mapper3?=?new?JobConf();
????????JobConf?mapper4?=?new?JobConf();
????????ChainMapper?cm?=?new?ChainMapper();
????????cm.addMapper(job,?ExtractMappper.class,?LongWritable.class,?Text.class,
????????????????LongWritable.class,?Conn1.class,?true,?mapper1);
????????cm.addMapper(job,?Filter1Mapper.class,?LongWritable.class,?Conn1.class,
????????????????LongWritable.class,?Conn2.class,?true,?mapper2);
????????cm.addMapper(job,?RegexMapper.class,?LongWritable.class,?Conn2.class,
????????????????LongWritable.class,?Conn3.class,?true,?mapper3);
????????cm.addMapper(job,?LoadMapper.class,?LongWritable.class,?Conn3.class,
????????????????LongWritable.class,?Conn3.class,?true,?mapper4);
????????FileInputFormat.setInputPaths(job,?new?Path("orderData"));
????????FileOutputFormat.setOutputPath(job,?new?Path("orderDataOutput"));
????????Job?job1;
????????try?{
????????????job1?=?new?Job(job);
????????????JobControl?jc?=?new?JobControl("test");
????????????jc.addJob(job1);
????????????jc.run();
????????}?catch?(IOException?e)?{
????????????//?TODO?Auto-generated?catch?block
????????????e.printStackTrace();
????????}
????}
}
總結
以上是生活随笔為你收集整理的ChainMapper和ChainReducer的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Android用户界面设计:框架布局
- 下一篇: [C#] 接收和发送UDP数据