MapReduce关系代数运算
常見關系代數運算包括:選擇、投影、并、交、差以及自然連接操作等,都可以十分容易利用MapReduce框架進行并行化計算
?
| NAME | SEX | AGE |
| 小明 | 男 | 25 |
| 小紅 | 女 | 18 |
| 小張 | 男 | 22 |
| 小米 | 女 | 23 |
| 小麗 | 女 | 21 |
| 小王 | 男 | 19 |
| 小美 | 女 | 25 |
| 小朱 | 女 | 26 |
選擇操作
將關系R的數據存儲在relationR文件,然后移入HDFS下的data文件夾,如代碼1-1
代碼1-1
root@lejian:/data# cat relationR 小明 男 25 小紅 女 18 小張 男 22 小米 女 23 小麗 女 21 小王 男 19 小美 女 25 小朱 女 26 root@lejian:/data# hadoop fs -put selection /data root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 112 2017-01-07 15:03 /data/relationR?
對于關系R的應用條件C,選擇性別為女的數據,只需在Map階段對每個輸入的記錄進行判斷,將滿足條件的數據輸出即可,輸出鍵值為(key,null)。Reduce階段無需做額外的工作
代碼1-2
<?xml version="1.0"?> <configuration><property><name>sex</name><value>女</value></property> </configuration>?
代碼1-3
package com.hadoop.mapreduce;public class Person {private String name;private String sex;private int age;public Person(String line) {super();String[] lines = line.split(" ");this.name = lines[0];this.sex = lines[1];this.age = Integer.parseInt(lines[2]);}public String getName() {return name;}public String getSex() {return sex;}public int getAge() {return age;}public String getVal(String col) {if ("name".equals(col)) {return name;}if ("sex".equals(col)) {return sex;}return age + "";}@Overridepublic String toString() {return name + " " + sex + " " + age;}}?
代碼1-4
package com.hadoop.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class SelectionMap extends Mapper<LongWritable, Text, Text, NullWritable> {private String sex = "";private Text val = new Text();protected void setup(Context context) throws java.io.IOException, InterruptedException {Configuration conf = context.getConfiguration();sex = conf.get("sex");};protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {Person person = new Person(value.toString());if (sex.equals(person.getVal("sex"))) {val.set(person.toString());context.write(val, NullWritable.get());}};}?
代碼1-5
package com.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Selection {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args == null || args.length != 2) {throw new RuntimeException("請輸入輸入路徑、輸出路徑");}Configuration conf = new Configuration();conf.addResource("conf.xml");Job job = Job.getInstance(conf);job.setJobName("Selection");job.setMapperClass(SelectionMap.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
運行代碼1-5,運行結果如代碼1-6
代碼1-6
root@lejian:/data# hadoop jar selection.jar com.hadoop.mapreduce.Selection /data /output ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-07 15:05 /output/_SUCCESS -rw-r--r-- 1 root supergroup 70 2017-01-07 15:05 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 小麗 女 21 小朱 女 26 小米 女 23 小紅 女 18 小美 女 25?
投影操作
例如在關系R上應用投影操作獲得屬性AGE的所有值,我們只需要在Map階段將每條記錄的AGE屬性和NullWritable輸出,而Reduce端僅獲取key即可,注意,此時投影操作具有去重功能
代碼1-7
package com.hadoop.mapreduce;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class ProjectionMap extends Mapper<LongWritable, Text, IntWritable, NullWritable> {private IntWritable age = new IntWritable();protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {Person person = new Person(value.toString());age.set(person.getAge());context.write(age, NullWritable.get());};}?
代碼1-8
package com.hadoop.mapreduce;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer;public class ProjectionReduce extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws java.io.IOException, InterruptedException {context.write(key, NullWritable.get());};}?
代碼1-9
package com.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Projection {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args == null || args.length != 2) {throw new RuntimeException("請輸入輸入路徑、輸出路徑");}Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("Projection");job.setMapperClass(ProjectionMap.class);job.setReducerClass(ProjectionReduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
運行代碼1-9,運行結果如代碼1-10
代碼1-10
root@lejian:/data# hadoop jar projection.jar com.hadoop.mapreduce.Projection /data /output ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-07 15:52 /output/_SUCCESS -rw-r--r-- 1 root supergroup 21 2017-01-07 15:52 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 18 19 21 22 23 25 26?
交運算
如果有一個關系A和關系B為同一個模式,希望得到關系A和關系B的交集,那么在Map階段對于A和B中的每一條記錄r輸出(r,1),在Reduce階段匯總計數,如果計數為2,則將該條記錄輸出。依舊以Person類為例,這里把Person作為主鍵,為了使得關系A和關系B相同的Person發送到同一個Reduce節點進行計算,需要對原先代碼1-3的Person類進行修改,如代碼1-11,MapReduce默認會先調用對象的compareTo方法進行對象間的比較,如果對象相等,再比較其hashCode,如果hashCode相等,則認為這兩個對象為同一個對象
修改代碼1-3的Person類為代碼1-11
代碼1-11
package com.hadoop.mapreduce;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class Person implements WritableComparable<Person> {private String name;private String sex;private int age;public Person() {super();// TODO Auto-generated constructor stub}public Person(String line) {super();String[] lines = line.split(" ");this.name = lines[0];this.sex = lines[1];this.age = Integer.parseInt(lines[2]);}public String getName() {return name;}public String getSex() {return sex;}public int getAge() {return age;}public String getVal(String col) {if ("name".equals(col)) {return name;}if ("sex".equals(col)) {return sex;}return age + "";}@Overridepublic String toString() {return name + " " + sex + " " + age;}@Overridepublic int hashCode() {int res = 20;res = name.hashCode() + 10 * res;res = sex.hashCode() + 10 * res;res = age + 10 * res;return res;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(name);out.writeUTF(sex);out.writeInt(age);}@Overridepublic void readFields(DataInput in) throws IOException {name = in.readUTF();sex = in.readUTF();age = in.readInt();}@Overridepublic int compareTo(Person o) {// TODO Auto-generated method stubif (hashCode() > o.hashCode()) {return 1;}if (hashCode() < o.hashCode()) {return -1;}return 0;}public static void main(String[] args) {System.out.println(new Person("Lily female 22").hashCode());}}?
?將關系A和關系B移入HDFS下的data文件夾,如代碼1-12
root@lejian:/data# cat relationA Tom male 21 Amy female 19 Daivd male 16 Lily female 22 Lucy female 20 John male 19 Rose female 19 Jojo female 26 root@lejian:/data# cat relationB Daivd male 16 Jack male 15 Lily female 22 Lucy female 20 Tom male 25 root@lejian:/data# hadoop fs -put relation* /data root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 113 2017-01-07 20:48 /data/relationA -rw-r--r-- 1 root supergroup 69 2017-01-07 20:48 /data/relationB?
代碼1-13
package com.hadoop.mapreduce;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;public class IntersectionMap extends Mapper<LongWritable, Text, Person, IntWritable> {private static final IntWritable ONE = new IntWritable(1);protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {Person person = new Person(value.toString());context.write(person, ONE);};}?
代碼1-14
package com.hadoop.mapreduce;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer;public class IntersectionReduce extends Reducer<Person, IntWritable, Person, NullWritable> {protected void reduce(Person key, Iterable<IntWritable> values, Context context) throws java.io.IOException, InterruptedException {int count = 0;for (IntWritable val : values) {count += val.get();}if (count == 2) {context.write(key, NullWritable.get());}}; }?
代碼1-15
package com.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Intersection {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args == null || args.length != 2) {throw new RuntimeException("請輸入輸入路徑、輸出路徑");}Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("Intersection");job.setJarByClass(Intersection.class);job.setMapperClass(IntersectionMap.class);job.setMapOutputKeyClass(Person.class);job.setMapOutputValueClass(IntWritable.class);job.setReducerClass(IntersectionReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
運行代碼1-15,運行結果如代碼1-16
代碼1-16
root@lejian:/data# hadoop jar intersection.jar com.hadoop.mapreduce.Intersection /data /output ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-07 20:30 /output/_SUCCESS -rw-r--r-- 1 root supergroup 44 2017-01-07 20:30 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 Daivd male 12 Lily female 22 Lucy female 20?
差運算
計算關系A-關系B的差集,即找出在關系A中存在而在關系B中不存在的記錄,在Map階段,對于關系A和關系B中每一條記錄r輸出鍵值對(r,A),(r,B),在Reduce階段檢查每一條記錄r和其對應的關系名稱,只有關系名稱只存在A,才輸出記錄
先顯示HDFS中data文件夾下得relationA和relationB的文件內容,如代碼1-17
代碼1-17
root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 113 2017-01-07 20:48 /data/relationA -rw-r--r-- 1 root supergroup 69 2017-01-07 20:48 /data/relationB root@lejian:/data# hadoop fs -cat /data/relationA Tom male 21 Amy female 19 Daivd male 16 Lily female 22 Lucy female 20 John male 19 Rose female 19 Jojo female 26 root@lejian:/data# hadoop fs -cat /data/relationB Daivd male 16 Jack male 15 Lily female 22 Lucy female 20 Tom male 25?
代碼1-18
package com.hadoop.mapreduce;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class DifferenceMap extends Mapper<LongWritable, Text, Person, Text> {private Text relationName = new Text();protected void setup(Context context) throws java.io.IOException, InterruptedException {FileSplit fileSplit = (FileSplit) context.getInputSplit();relationName.set(fileSplit.getPath().getName());};protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {Person person = new Person(value.toString());context.write(person, relationName);};}?
代碼1-19
package com.hadoop.mapreduce;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;public class DifferenceReduce extends Reducer<Person, Text, Person, NullWritable> {private String remove = "";protected void setup(Context context) throws java.io.IOException, InterruptedException {Configuration conf = context.getConfiguration();remove = conf.get("remove");};protected void reduce(Person key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {for (Text val : values) {if (remove.equals(val.toString())) {return;}}context.write(key, NullWritable.get());};}?
代碼1-20
package com.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Difference {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args == null || args.length != 3) {throw new RuntimeException("請輸入輸入路徑、輸出路徑和被減集合");}Configuration conf = new Configuration();conf.set("remove", args[2]);Job job = Job.getInstance(conf);job.setJobName("Difference");job.setJarByClass(Difference.class);job.setMapperClass(DifferenceMap.class);job.setMapOutputKeyClass(Person.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(DifferenceReduce.class);job.setOutputKeyClass(Person.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
運行代碼1-20,運行結果如代碼1-21
代碼1-21
root@lejian:/data# hadoop jar difference.jar com.hadoop.mapreduce.Difference /data /output relationB ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-08 08:59 /output/_SUCCESS -rw-r--r-- 1 root supergroup 69 2017-01-08 08:59 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 Tom male 21 Amy female 19 John male 19 Jojo female 26 Rose female 19?
自然連接
如代碼1-22,student集合的第一列是id,第二列是姓名,第三列是性別,第四列是年齡,grade集合第一列是id,第二列是科目,第三列是科目成績,需要對student集合和grade集合做自然連接。在Map階段將student和grade中每一條記錄r作為value,而記錄中的id作為key輸出。在Reduce階段則將同一鍵收集而來的數據根據它們的來源(student或grade)做笛卡爾積然后將結果輸出
代碼1-22中,將student集合和grade集合存儲在HDFS下的data文件夾中
代碼1-22
root@lejian:/data# cat student 1 Amy female 18 2 Tom male 19 3 Sam male 21 4 John male 19 5 Lily female 21 6 Rose female 20 root@lejian:/data# cat grade 1 Math 89 2 Math 75 4 English 85 3 English 95 5 Math 91 5 English 88 6 Math 78 6 English 99 2 English 80 root@lejian:/data# hadoop fs -put student /data root@lejian:/data# hadoop fs -put grade /data root@lejian:/data# hadoop fs -ls -R /data -rw-r--r-- 1 root supergroup 105 2017-01-08 09:59 /data/grade -rw-r--r-- 1 root supergroup 93 2017-01-08 09:59 /data/student?
代碼1-23
package com.hadoop.mapreduce;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class NaturalJoinMap extends Mapper<LongWritable, Text, IntWritable, Text> {private String fileName = "";private Text val = new Text();private IntWritable stuKey = new IntWritable();protected void setup(Context context) throws java.io.IOException, InterruptedException {FileSplit fileSplit = (FileSplit) context.getInputSplit();fileName = fileSplit.getPath().getName();};protected void map(LongWritable key, Text value, Context context) throws java.io.IOException, InterruptedException {String[] arr = value.toString().split(" ");stuKey.set(Integer.parseInt(arr[0]));val.set(fileName + " " + value.toString());context.write(stuKey, val);};}?
代碼1-24
package com.hadoop.mapreduce;import java.util.ArrayList; import java.util.List;import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;public class NaturalJoinReduce extends Reducer<IntWritable, Text, Text, NullWritable> {private Text student = new Text();private Text value = new Text();protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws java.io.IOException, InterruptedException {List<String> grades = new ArrayList<String>();for (Text val : values) {if (val.toString().contains("student")) {student.set(studentStr(val.toString()));} else {grades.add(gradeStr(val.toString()));}}for (String grade : grades) {value.set(student.toString() + grade);context.write(value, NullWritable.get());}};private String studentStr(String line) {String[] arr = line.split(" ");StringBuilder str = new StringBuilder();for (int i = 1; i < arr.length; i++) {str.append(arr[i] + " ");}return str.toString();}private String gradeStr(String line) {String[] arr = line.split(" ");StringBuilder str = new StringBuilder();for (int i = 2; i < arr.length; i++) {str.append(arr[i] + " ");}return str.toString();}}?
代碼1-25
package com.hadoop.mapreduce;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class NaturalJoin {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {if (args == null || args.length != 2) {throw new RuntimeException("請輸入輸入路徑、輸出路徑");}Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("NaturalJoin");job.setJarByClass(NaturalJoin.class);job.setMapperClass(NaturalJoinMap.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(NaturalJoinReduce.class);job.setOutputKeyClass(IntWritable.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPaths(job, args[0]);FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);}}?
運行代碼1-25,運行結果如代碼1-26
代碼1-26
root@lejian:/data# hadoop jar naturalJoin.jar com.hadoop.mapreduce.NaturalJoin /data /output ………… root@lejian:/data# hadoop fs -ls -R /output -rw-r--r-- 1 root supergroup 0 2017-01-08 11:19 /output/_SUCCESS -rw-r--r-- 1 root supergroup 237 2017-01-08 11:19 /output/part-r-00000 root@lejian:/data# hadoop fs -cat /output/part-r-00000 1 Amy female 18 Math 89 2 Tom male 19 English 80 2 Tom male 19 Math 75 3 Sam male 21 English 95 4 John male 19 English 85 5 Lily female 21 English 88 5 Lily female 21 Math 91 6 Rose female 20 English 99 6 Rose female 20 Math 78?
轉載于:https://www.cnblogs.com/baoliyan/p/6259278.html
總結
以上是生活随笔為你收集整理的MapReduce关系代数运算的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: BZOJ 4448 主席树+树链剖分(在
- 下一篇: 为什么电脑桌面没有图标不见了怎么办啊 电