Mapreduce的排序、全排序以及二次排序
生活随笔
收集整理的這篇文章主要介紹了
Mapreduce的排序、全排序以及二次排序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
一:背景
Hadoop中雖然有自動排序和分組,由于自帶的排序是按照Key進行排序的,有些時候,我們希望同時對Key和Value進行排序。自帶的排序功能就無法滿足我們了,還好Hadoop提供了一些組件可以讓開發人員進行二次排序。
?
二:技術實現
我們先來看案例需求
#需求1: 首先按照第一列數字升序排列,當第一列數字相同時,第二列數字也升序排列(列之間用制表符\t隔開)
?
3 3
3 2
3 1
2 2
2 1
1 1
MapReduce計算之后的結果應該是:
?
?
1 1
2 1
2 2
3 1
3 2
3 3
#需求2:第一列不相等時,第一列按降序排列,當第一列相等時,第二列按升序排列
?
?
3 3
3 2
3 1
2 2
2 1
1 1
MapReduce計算之后的結果應該是:
3 1
3 2
3 3
2 1
2 2
1 1
下面是實現代碼,實現兩種需求的關鍵是compareTo()方法的實現不同:
?
public class SecondSortTest {// 定義輸入路徑private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";// 定義輸出路徑private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";public static void main(String[] args) {try {// 創建配置信息Configuration conf = new Configuration();/**********************************************///對Map端輸出進行壓縮//conf.setBoolean("mapred.compress.map.output", true);//設置map端輸出使用的壓縮類//conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);//對reduce端輸出進行壓縮//conf.setBoolean("mapred.output.compress", true);//設置reduce端輸出使用的壓縮類//conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);// 添加配置文件(我們可以在編程的時候動態配置信息,而不需要手動去改變集群)/** conf.addResource("classpath://hadoop/core-site.xml");* conf.addResource("classpath://hadoop/hdfs-site.xml");* conf.addResource("classpath://hadoop/hdfs-site.xml");*/// 創建文件系統FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);// 如果輸出目錄存在,我們就刪除if (fileSystem.exists(new Path(OUT_PATH))) {fileSystem.delete(new Path(OUT_PATH), true);}// 創建任務Job job = new Job(conf, SecondSortTest.class.getName());//1.1 設置輸入目錄和設置輸入數據格式化的類FileInputFormat.setInputPaths(job, INPUT_PATH);job.setInputFormatClass(TextInputFormat.class);//1.2 設置自定義Mapper類和設置map函數輸出數據的key和value的類型job.setMapperClass(MySecondSortMapper.class);job.setMapOutputKeyClass(CombineKey.class);job.setMapOutputValueClass(LongWritable.class);//1.3 設置分區和reduce數量(reduce的數量,和分區的數量對應,因為分區為一個,所以reduce的數量也是一個)job.setPartitionerClass(HashPartitioner.class);job.setNumReduceTasks(1);//1.4 排序、分組//1.5 歸約//2.1 Shuffle把數據從Map端拷貝到Reduce端。//2.2 指定Reducer類和輸出key和value的類型job.setReducerClass(MySecondSortReducer.class);job.setOutputKeyClass(LongWritable.class);job.setOutputValueClass(LongWritable.class);//2.3 指定輸出的路徑和設置輸出的格式化類FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));job.setOutputFormatClass(TextOutputFormat.class);// 提交作業 退出System.exit(job.waitForCompletion(true) ? 0 : 1);} catch (Exception e) {e.printStackTrace();}}public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{//定義聯合的keyprivate CombineKey combineKey = new CombineKey();protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,InterruptedException {//對輸入的value進行切分String[] splits = value.toString().split("\t");//設置聯合的keycombineKey.setComKey(Long.parseLong(splits[0]));combineKey.setComVal(Long.parseLong(splits[1]));//通過context寫出去context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));}}public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{@Overrideprotected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)throws IOException, InterruptedException {//因為輸入的CombineKey已經排好序了,所有我們只要獲取其中的兩個成員變量寫出去就可以了。values在這個例子中沒有什么作用context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));}}}/*** 重新組合成一個key,實現二次排序* @author 廖*民* time : 2015年1月18日下午7:27:52* @version*/class CombineKey implements WritableComparable<CombineKey>{public long comKey;public long comVal;//必須提供無參構造函數,否則hadoop反射機制會出錯public CombineKey() {}//有參構造函數public CombineKey(long comKey, long comVal) {this.comKey = comKey;this.comVal = comVal;}public long getComKey() {return comKey;}public void setComKey(long comKey) {this.comKey = comKey;}public long getComVal() {return comVal;}public void setComVal(long comVal) {this.comVal = comVal;}public void write(DataOutput out) throws IOException {out.writeLong(comKey);out.writeLong(comVal);}public void readFields(DataInput in) throws IOException {this.comKey = in.readLong();this.comVal = in.readLong();}/*** 這個方法一定要實現* java里面排序默認是小的放在前面,即返回負數的放在前面,這樣就是所謂的升序排列* 我們在下面的方法中直接返回一個差值,也就相當于會升序排列。* 如果我們要實現降序排列,那么我們就可以返回一個正數*//*public int compareTo(CombineKey o) {//第一列不相同時按升序排列,當第一列相同時第二列按升序排列long minus = this.comKey - o.comKey;//如果第一個值不相等時,我們就先對第一列進行排序if (minus != 0){return (int) minus;}//如果第一列相等時,我們就對第二列進行排序return (int) (this.comVal - o.comVal);}*//*** 為了實現第一列不同時按降序排序,第一列相同時第二列按升序排列* 第一列:降序,當第一列相同時,第二列:升序* 為了實現降序,*/public int compareTo(CombineKey o) {//如果a-b<0即,a小于b,按這樣 的思路應該是升序排列,我們可以返回一個相反數使其降序long tmp = this.comKey - o.comKey;//如果第一個值不相等時,我們就先對第一列進行排序if (tmp != 0){return (int) (-tmp);}//如果第一列相等時,我們就對第二列進行升序排列return (int) (this.comVal - o.comVal);}@Overridepublic int hashCode() {final int prime = 31;int result = 1;result = prime * result + (int) (comKey ^ (comKey >>> 32));return result;}@Overridepublic boolean equals(Object obj) {if (this == obj)return true;if (obj == null)return false;if (getClass() != obj.getClass())return false;CombineKey other = (CombineKey) obj;if (comKey != other.comKey)return false;return true;}}?
總結
以上是生活随笔為你收集整理的Mapreduce的排序、全排序以及二次排序的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 飞翔的小鸟小游戏
- 下一篇: Jeston TX2安装Ubuntu系统