2021年大数据Hadoop(二十二):MapReduce的自定义分组
全網最詳細的Hadoop文章系列,強烈建議收藏加關注!
后面更新文章都會列出歷史文章目錄,幫助大家回顧知識重點。
目錄
本系列歷史文章
前言
MapReduce的自定義分組
需求
分析
實現
第一步:定義OrderBean
第二步:自定義分區
第三步:自定義groupingComparator
第四步:程序main函數入口
本系列歷史文章
2021年大數據Hadoop(三十):Hadoop3.x的介紹
2021年大數據Hadoop(二十九):關于YARN常用參數設置
2021年大數據Hadoop(二十八):YARN的調度器Scheduler
2021年大數據Hadoop(二十七):YARN運行流程
2021年大數據Hadoop(二十六):YARN三大組件介紹
2021年大數據Hadoop(二十五):YARN通俗介紹和基本架構
2021年大數據Hadoop(二十四):MapReduce高階訓練
2021年大數據Hadoop(二十三):MapReduce的運行機制詳解
2021年大數據Hadoop(二十二):MapReduce的自定義分組
2021年大數據Hadoop(二十一):MapReuce的Combineer
2021年大數據Hadoop(二十):MapReduce的排序和序列化
2021年大數據Hadoop(十九):???????MapReduce分區???????
2021年大數據Hadoop(十八):MapReduce程序運行模式和深入解析
2021年大數據Hadoop(十七):MapReduce編程規范及示例編寫
2021年大數據Hadoop(十六):MapReduce計算模型介紹
2021年大數據Hadoop(十五):Hadoop的聯邦機制 Federation
2021年大數據Hadoop(十四):HDFS的高可用機制
2021年大數據Hadoop(十三):HDFS意想不到的其他功能
2021年大數據Hadoop(十二):HDFS的API操作
2021年大數據Hadoop(十一):HDFS的元數據輔助管理
2021年大數據Hadoop(十):HDFS的數據讀寫流程
2021年大數據Hadoop(九):HDFS的高級使用命令
2021年大數據Hadoop(八):HDFS的Shell命令行使用
2021年大數據Hadoop(七):HDFS分布式文件系統簡介
2021年大數據Hadoop(六):全網最詳細的Hadoop集群搭建
2021年大數據Hadoop(五):Hadoop架構
2021年大數據Hadoop(四):Hadoop發行版公司
2021年大數據Hadoop(三):Hadoop國內外應用
2021年大數據Hadoop(二):Hadoop發展簡史和特性優點
2021年大數據Hadoop(一):Hadoop介紹
前言
2021大數據領域優質創作博客,帶你從入門到精通,該博客每天更新,逐漸完善大數據各個知識體系的文章,幫助大家更高效學習。
有對大數據感興趣的可以關注微信公眾號:三幫大數據
MapReduce的自定義分組
GroupingComparator是mapreduce當中reduce端的一個功能組件,主要的作用是決定哪些數據作為一組,調用一次reduce的邏輯,默認是每個不同的key,作為多個不同的組,每個組調用一次reduce邏輯,我們可以自定義GroupingComparator實現不同的key作為同一個組,調用一次reduce邏輯
???????需求
有如下訂單數據
| 訂單id | 商品id | 成交金額 |
| Order_0000001 | Pdt_01 | 222.8 |
| Order_0000001 | Pdt_05 | 25.8 |
| Order_0000002 | Pdt_03 | 522.8 |
| Order_0000002 | Pdt_04 | 122.4 |
| Order_0000002 | Pdt_05 | 722.4 |
| Order_0000003 | Pdt_01 | 222.8 |
現在需要求出每一個訂單中成交金額最大的一筆交易
???????分析
1、利用“訂單id和成交金額”作為key,可以將map階段讀取到的所有訂單數據按照id分區,按照金額排序,發送到reduce
2、在reduce端利用groupingcomparator將訂單id相同的kv聚合成組,然后取第一個即是最大值
實現
???????
第一步:定義OrderBean
定義一個OrderBean,里面定義兩個字段,第一個字段是我們的orderId,第二個字段是我們的金額(注意金額一定要使用Double或者DoubleWritable類型,否則沒法按照金額順序排序)
public?class?OrderBean implements?WritableComparable<OrderBean>?{private?String orderId;private?Double price;@Overridepublic?int?compareTo(OrderBean o)?{//比較訂單id的排序順序int?i =?this.orderId.compareTo(o.orderId);if(i==0){//如果訂單id相同,則比較金額,金額大的排在前面i =?-?this.price.compareTo(o.price);}return?i;}@Overridepublic?void?write(DataOutput out)?throws?IOException {out.writeUTF(orderId);out.writeDouble(price);}@Overridepublic?void?readFields(DataInput in)?throws?IOException {this.orderId =??in.readUTF();this.price =?in.readDouble();}public?OrderBean()?{}public?OrderBean(String orderId,?Double price)?{this.orderId =?orderId;this.price =?price;}public?String getOrderId()?{return?orderId;}public?void?setOrderId(String orderId)?{this.orderId =?orderId;}public?Double getPrice()?{return?price;}public?void?setPrice(Double price)?{this.price =?price;}@Overridepublic?String toString()?{return??orderId +"\t"+price;}}
???????第二步:自定義分區
自定義分區,按照訂單id進行分區,把所有訂單id相同的數據,都發送到同一個reduce中去
public?class?OrderPartition extends?Partitioner<OrderBean,NullWritable>?{@Overridepublic?int?getPartition(OrderBean orderBean,?NullWritable nullWritable,?int?i)?{//自定義分區,將相同訂單id的數據發送到同一個reduce里面去return??(orderBean.getOrderId().hashCode()?&?Integer.MAX_VALUE)%i;}}
???????第三步:自定義groupingComparator
按照我們自己的邏輯進行分組,通過比較相同的訂單id,將相同的訂單id放到一個組里面去,進過分組之后當中的數據,已經全部是排好序的數據,我們只需要取前topN即可
/*1: 繼承WriteableComparator2: 調用父類的有參構造3: 指定分組的規則(重寫方法)*/import?org.apache.hadoop.io.WritableComparable;import?org.apache.hadoop.io.WritableComparator;// 1: 繼承WriteableComparatorpublic?class?OrderGroupComparator extends?WritableComparator {// 2: 調用父類的有參構造public?OrderGroupComparator()?{super(OrderBean.class,true);}//3: 指定分組的規則(重寫方法)@Overridepublic?int?compare(WritableComparable a,?WritableComparable b)?{//3.1 對形參做強制類型轉換OrderBean first =?(OrderBean)a;OrderBean second =?(OrderBean)b;//3.2 指定分組規則return?first.getOrderId().compareTo(second.getOrderId());}}
???????第四步:程序main函數入口
public?class?GroupingRunner {public?static?void?main(String[]?args)?throws?IOException,?ClassNotFoundException,?InterruptedException {//1、創建建一個job任務對象Configuration configuration =?new?Configuration();Job job =?Job.getInstance(configuration,?"grouping_demo");//2、指定job所在的jar包job.setJarByClass(GroupingRunner.class);//3、指定源文件的讀取方式類和源文件的讀取路徑job.setInputFormatClass(TextInputFormat.class);?//按照行讀取//TextInputFormat.addInputPath(job, new Path("hdfs://node1:8020/input/wordcount")); //只需要指定源文件所在的目錄即可TextInputFormat.addInputPath(job,?new?Path("file:///E:\\input\\grouping_demo"));?//只需要指定源文件所在的目錄即可//4、指定自定義的Mapper類和K2、V2類型job.setMapperClass(GroupingMapper.class);?//指定Mapper類job.setMapOutputKeyClass(OrderBean.class);?//K2類型job.setMapOutputValueClass(Text.class);//V2類型//5、指定自定義分區類(如果有的話)job.setPartitionerClass(MyPartitioner.class);//6、指定自定義分組類(如果有的話)job.setGroupingComparatorClass(GroupingComparator.class);//7、指定自定義Combiner類(如果有的話)//job.setCombinerClass(MyCombiner.class);//設置ReduceTask個數job.setNumReduceTasks(3);//8、指定自定義的Reducer類和K3、V3的數據類型job.setReducerClass(GroupingReducer.class);?//指定Reducer類job.setOutputKeyClass(Text.class);?//K3類型job.setOutputValueClass(NullWritable.class);??//V3類型//9、指定輸出方式類和結果輸出路徑job.setOutputFormatClass(TextOutputFormat.class);//TextOutputFormat.setOutputPath(job, new ?Path("hdfs://node1:8020/output/wordcount")); //目標目錄不能存在,否則報錯TextOutputFormat.setOutputPath(job,?new??Path("file:///E:\\output\\grouping_demo"));?//目標目錄不能存在,否則報錯//10、將job提交到yarn集群boolean?bl =?job.waitForCompletion(true);?//true表示可以看到任務的執行進度//11.退出執行進程System.exit(bl?0:1);}}
- 📢博客主頁:https://lansonli.blog.csdn.net
- 📢歡迎點贊 👍 收藏 ?留言 📝 如有錯誤敬請指正!
- 📢本文由 Lansonli 原創,首發于 CSDN博客🙉
- 📢大數據系列文章會每天更新,停下休息的時候不要忘了別人還在奔跑,希望大家抓緊時間學習,全力奔赴更美好的生活?
總結
以上是生活随笔為你收集整理的2021年大数据Hadoop(二十二):MapReduce的自定义分组的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Hadoop(十五):H
- 下一篇: 2021年大数据Hadoop(二十五):