辅助排序和Mapreduce整体流程
一、輔助排序
需求:先有一個訂單數據文件,包含了訂單id、商品id、商品價格,要求將訂單id正序,商品價格倒序,且生成結果文件個數為訂單id的數量,每個結果文件中只要一條該訂單最貴商品的數據。
思路:1.封裝訂單類OrderBean,實現WritableComparable接口;
2.自定義Mapper類,確定輸入輸出數據類型,寫業務邏輯;
3.自定義分區,根據不同的訂單id返回不同的分區值;
4.自定義Reducer類;
5.輔助排序類OrderGroupingComparator繼承WritableComparator類,并定義無參構成方法、重寫compare方法;
6.書寫Driver類;
代碼如下:
/*** @author: PrincessHug* @date: 2019/3/25, 21:42* @Blog: https://www.cnblogs.com/HelloBigTable/*/ public class OrderBean implements WritableComparable<OrderBean> {private int orderId;private double orderPrice;public OrderBean() {}public OrderBean(int orderId, double orderPrice) {this.orderId = orderId;this.orderPrice = orderPrice;}public int getOrderId() {return orderId;}public void setOrderId(int orderId) {this.orderId = orderId;}public double getOrderPrice() {return orderPrice;}public void setOrderPrice(double orderPrice) {this.orderPrice = orderPrice;}@Overridepublic String toString() {return orderId + "\t" + orderPrice;}@Overridepublic int compareTo(OrderBean o) {int rs ;if (this.orderId > o.getOrderId()){rs = 1;}else if (this.orderId < o.getOrderId()){rs = -1;}else {rs = (this.orderPrice > o.getOrderPrice()) ? -1:1;}return rs;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(orderId);out.writeDouble(orderPrice);}@Overridepublic void readFields(DataInput in) throws IOException {orderId = in.readInt();orderPrice = in.readDouble();} }public class OrderMapper extends Mapper<LongWritable, Text,OrderBean, NullWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//獲取數據String line = value.toString();//切割數據String[] fields = line.split("\t");//封裝數據int orderId = Integer.parseInt(fields[0]);double orderPrice = Double.parseDouble(fields[2]);OrderBean orderBean = new OrderBean(orderId, orderPrice);//發送數據context.write(orderBean,NullWritable.get());} }public class OrderPartitioner extends Partitioner<OrderBean, NullWritable> {@Overridepublic int getPartition(OrderBean orderBean, NullWritable nullWritable, int i) {//構造參數中i的值為reducetask的個數return (orderBean.getOrderId() & Integer.MAX_VALUE ) % i;} }public class OrderReducer extends Reducer<OrderBean, NullWritable,OrderBean,NullWritable> {@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key,NullWritable.get());} }public class OrderGrouptingComparator extends WritableComparator {//必須使用super調用父類的構造方法來定義對比的類為OrderBeanprotected OrderGrouptingComparator(){super(OrderBean.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean aBean = (OrderBean)a;OrderBean bBean = (OrderBean)b;int rs ;if (aBean.getOrderId() > bBean.getOrderId()){rs = 1;}else if (aBean.getOrderId() < bBean.getOrderId()){rs = -1;}else {rs = 0;}return rs;} }public class OrderDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//配置信息,Job對象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//執行類job.setJarByClass(OrderBean.class);//設置Mapper、Reducer類job.setMapperClass(OrderMapper.class);job.setReducerClass(OrderReducer.class);//設置Mapper輸出數據類型job.setMapOutputKeyClass(OrderBean.class);job.setMapOutputValueClass(NullWritable.class);//設置Reducer輸出數據類型job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);//設置輔助排序job.setGroupingComparatorClass(OrderGrouptingComparator.class);//設置分區類job.setPartitionerClass(OrderPartitioner.class);//設置reducetask數量job.setNumReduceTasks(3);//設置文件輸入輸出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\order\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\order\\out"));//提交任務if (job.waitForCompletion(true)){System.out.println("運行完成!");}else {System.out.println("運行失敗!");}} }由于這是敲了很多次的代碼,沒有加太多注釋,請諒解!
?
二、Mapreduce整體的流程
1.有一塊200M的文本文件,首先將待處理的數據提交客戶端;
2.客戶端會向Yarn平臺提交切片信息,然后Yarn計算出所需要的maptask的數量為2;
3.程序默認使用FileInputFormat的TextInputFormat方法將文件數據讀到maptask;
4.maptask運行業務邏輯,然后將數據通過InputOutputContext寫入到環形緩沖區;
5.環形緩沖區其實是內存開辟的一塊空間,就是內存,當環形緩沖區內數據達到默認大小100M的80%時,發生溢寫;
6.溢寫出的數據會進行多次的分區排序(shuffle機制,下一個隨筆詳細解釋);
7.分區排序后的數據塊可以選擇進行Combiner合并,然后寫入本地磁盤;
8.reducetask等maptask完全運行完畢后,開始從磁盤中讀取maptask產出寫出的數據,然后進行合并文件,歸并排序(這時就是進行上面輔助排序的時候);
9.Reducer一次讀取一組數據,然后使用默認的TextOutputFormat方法將數據寫出到結果文件。
?
轉載于:https://www.cnblogs.com/HelloBigTable/p/10617937.html
總結
以上是生活随笔為你收集整理的辅助排序和Mapreduce整体流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iOS开发造轮子 | 通用占位图
- 下一篇: Android TV Overscan