idea下mapreduce的wordcount
生活随笔
收集整理的這篇文章主要介紹了
idea下mapreduce的wordcount
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
idea下mapreduce的wordcount
pom.xml
<?xml version="1.0" encoding="UTF-8"?> ? <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion> ?<groupId>com.henu</groupId><artifactId>henu</artifactId><version>1.0-SNAPSHOT</version> ?<name>henu</name><!-- FIXME change it to the project's website --><url>http://www.example.com</url> ?<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties> ?<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.2</version></dependency></dependencies> ?<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target><encoding>utf-8</encoding></configuration></plugin></plugins></build> </project>WordCount
package com.henu; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ? import java.io.IOException; ? /*** @author George* @description**/ public class WC { ?public static class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable>{Text k1 = new Text();IntWritable v1 = new IntWritable(1); ?@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] strings = line.split("\\s+");for (String s : strings) {k1.set(s);context.write(k1,v1);}}} ?public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> {int count;IntWritable v2 = new IntWritable(); ?@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {count = 0;for (IntWritable value : values) {count += value.get();}v2.set(count);context.write(key,v2);}} ?public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ?Configuration conf = new Configuration();Job job = Job.getInstance(conf); ?job.setJarByClass(WC.class); ?job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class); ?job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); ?job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); ?FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1])); ?job.waitForCompletion(true);} ? }進(jìn)行分區(qū):
package com.henu; ? import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; ? import java.io.IOException; ? /*** @author George* @description**/ public class WC { ?public static class WCMapper extends Mapper<LongWritable, Text,Text, IntWritable>{Text k1 = new Text();IntWritable v1 = new IntWritable(1); ?@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] strings = line.split("\\s+");for (String s : strings) {k1.set(s);context.write(k1,v1);}}} ?public static class WCReducer extends Reducer<Text, IntWritable,Text, IntWritable> {int count;IntWritable v2 = new IntWritable(); ?@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {count = 0;for (IntWritable value : values) {count += value.get();}v2.set(count);context.write(key,v2);}} ?public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { ?Configuration conf = new Configuration();Job job = Job.getInstance(conf); ?job.setJarByClass(WC.class); ?job.setMapperClass(WCMapper.class);job.setReducerClass(WCReducer.class); ?job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class); ?job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class); ?//map階段設(shè)置分區(qū)job.setPartitionerClass(MyPartitoner.class);job.setNumReduceTasks(2); ?FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1])); ?job.waitForCompletion(true);} ?private static class MyPartitoner extends Partitioner<Text,IntWritable> {@Overridepublic int getPartition(Text text, IntWritable intWritable, int i) {String kStr = text.toString();return kStr.equalsIgnoreCase("hello")?0:1;}} }發(fā)送到linux上運(yùn)行:
yarn jar henu-1.0-SNAPSHOT.jar com.henu.WC /hello /abc??
?
總結(jié)
以上是生活随笔為你收集整理的idea下mapreduce的wordcount的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: hadoop HA启动时 两个namen
- 下一篇: HDFS的Shell客户端操作