用Java编写Hadoop MapReduce任务
盡管Hadoop框架本身是使用Java創建的,但MapReduce作業可以用許多不同的語言編寫。 在本文中,我將展示如何像其他Java項目一樣,基于Maven項目在Java中創建MapReduce作業。
- 準備示例輸入
讓我們從一個虛構的商業案例開始。 在這種情況下,我們需要一個CSV文件,其中包含字典中的英語單詞,并添加了其他語言的所有翻譯,并以'|'分隔 符號。 我已經根據這篇文章給出了這個例子。 因此,這項工作將閱讀不同語言的詞典,并將每個英語單詞與另一種語言的翻譯匹配。 作業的輸入字典是從此處獲取的 。 我下載了幾種不同語言的文件,并將它們放到一個文件中(Hadoop處理多個大文件比處理多個小文件更好)。 我的示例文件可以在這里找到。
- 創建Java MapReduce項目
下一步是為MapReduce作業創建Java代碼。 就像我在使用Maven項目之前所說的那樣,所以我在自己的IDE IntelliJ中創建了一個新的空Maven項目。 我修改了默認pom以添加必要的插件和依賴項:
我添加的依賴項:
Hadoop依賴關系對于使用MapReduce作業中的Hadoop類是必需的。 由于我想在AWS EMR上運行作業,因此請確保我具有匹配的Hadoop版本。 此外,由于Hadoop框架將在Hadoop群集上可用,因此可以將范圍設置為“已提供”。
除了依賴關系之外,我還在pom.xml中添加了以下兩個插件:
<plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><configuration><archive><manifest><addClasspath>true</addClasspath><mainClass>net.pascalalma.hadoop.Dictionary</mainClass></manifest></archive></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.6</source><target>1.6</target></configuration></plugin> </plugins>第一個插件用于創建我們項目的可執行jar。 這使JAR在Hadoop集群上的運行更加容易,因為我們不必聲明主類。
為了使創建的JAR與AWS EMR集群的實例兼容,第二個插件是必需的。 該AWS集群隨附JDK 1.6。 如果您忽略此選項,則群集將失敗(我收到類似“不支持的major.minor版本51.0”之類的消息)。 稍后,我將在另一篇文章中介紹如何設置此AWS EMR集群。
這是基本項目,就像常規的Java項目一樣。 接下來讓我們實現MapReduce作業。
- 實現MapReduce類
我已經描述了我們要在第一步中執行的功能。 為此,我在Hadoop項目中創建了三個Java類。 第一類是“ Mapper ”:
package net.pascalalma.hadoop;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException; import java.util.StringTokenizer;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/ public class WordMapper extends Mapper<Text,Text,Text,Text> {private Text word = new Text();public void map(Text key, Text value, Context context) throws IOException, InterruptedException{StringTokenizer itr = new StringTokenizer(value.toString(),",");while (itr.hasMoreTokens()){word.set(itr.nextToken());context.write(key, word);}} }這個課不是很復雜。 它只是從輸入文件中接收一行,并為其創建一個Map,該映射中的每個鍵都有一個值(在此階段允許多個鍵)。
下一類是“ Reducer ”,它將地圖縮小為所需的輸出:
package net.pascalalma.hadoop;import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** Created with IntelliJ IDEA.* User: pascal* Date: 17-07-13* Time: 19:50*/ public class AllTranslationsReducer extends Reducer<Text, Text, Text, Text> {private Text result = new Text();@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {String translations = "";for (Text val : values) {translations += "|" + val.toString();}result.set(translations);context.write(key, result);} }減少步驟將收集給定鍵的所有值,并將它們彼此之間用“ |”分隔 符號。
剩下的最后一堂課是將所有內容放在一起以使其可運行的工作:
package net.pascalalma.hadoop;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/*** Created with IntelliJ IDEA.* User: pascal* Date: 16-07-13* Time: 12:07*/ public class Dictionary {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = new Job(conf, "dictionary");job.setJarByClass(Dictionary.class);job.setMapperClass(WordMapper.class);job.setReducerClass(AllTranslationsReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setOutputFormatClass(TextOutputFormat.class);FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);} }在這種主要方法中,我們將一個Job放在一起并運行它。 請注意,我只是希望args [0]和args [1]是輸入文件和輸出目錄的名稱(不存在)。 我沒有為此添加任何檢查。 這是我在IntelliJ中的“運行配置”:
只需確保在運行類時輸出目錄不存在。 作業創建的日志記錄輸出如下所示:
2013-08-15 21:37:00.595 java[73982:1c03] Unable to load realm info from SCDynamicStore aug 15, 2013 9:37:01 PM org.apache.hadoop.util.NativeCodeLoader <clinit> WARNING: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles WARNING: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient copyAndConfigureFiles WARNING: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). aug 15, 2013 9:37:01 PM org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus INFO: Total input paths to process : 1 aug 15, 2013 9:37:01 PM org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit> WARNING: Snappy native library not loaded aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob INFO: Running job: job_local_0001 aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.Task initialize INFO: Using ResourceCalculatorPlugin : null aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> INFO: io.sort.mb = 100 aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> INFO: data buffer = 79691776/99614720 aug 15, 2013 9:37:01 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init> INFO: record buffer = 262144/327680 aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush INFO: Starting flush of map output aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill INFO: Finished spill 0 aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.Task done INFO: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting aug 15, 2013 9:37:02 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob INFO: map 0% reduce 0% aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate INFO: aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task sendDone INFO: Task 'attempt_local_0001_m_000000_0' done. aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Task initialize INFO: Using ResourceCalculatorPlugin : null aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate INFO: aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge INFO: Merging 1 sorted segments aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.Merger$MergeQueue merge INFO: Down to the last merge-pass, with 1 segments left of total size: 524410 bytes aug 15, 2013 9:37:04 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate INFO: aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task done INFO: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate INFO: aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.Task commit INFO: Task attempt_local_0001_r_000000_0 is allowed to commit now aug 15, 2013 9:37:05 PM org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask INFO: Saved output of task 'attempt_local_0001_r_000000_0' to /Users/pascal/output aug 15, 2013 9:37:05 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob INFO: map 100% reduce 0% aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate INFO: reduce > reduce aug 15, 2013 9:37:07 PM org.apache.hadoop.mapred.Task sendDone INFO: Task 'attempt_local_0001_r_000000_0' done. aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob INFO: map 100% reduce 100% aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.JobClient monitorAndPrintJob INFO: Job complete: job_local_0001 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Counters: 17 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: File Output Format Counters aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Bytes Written=423039 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: FileSystemCounters aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: FILE_BYTES_READ=1464626 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: FILE_BYTES_WRITTEN=1537251 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: File Input Format Counters aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Bytes Read=469941 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Map-Reduce Framework aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Reduce input groups=11820 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Map output materialized bytes=524414 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Combine output records=0 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Map input records=20487 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Reduce shuffle bytes=0 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Reduce output records=11820 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Spilled Records=43234 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Map output bytes=481174 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Total committed heap usage (bytes)=362676224 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Combine input records=0 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Map output records=21617 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: SPLIT_RAW_BYTES=108 aug 15, 2013 9:37:08 PM org.apache.hadoop.mapred.Counters log INFO: Reduce input records=21617Process finished with exit code 0可以在提供的輸出目錄中找到此作業創建的輸出文件,如以下屏幕截圖所示:
如您所見,我們可以在IDE中(或從命令行)運行此main方法,但是我想在去之前在Mapper和Reducer上執行一些單元測試。 我將在另一篇文章中演示如何做到這一點。
翻譯自: https://www.javacodegeeks.com/2013/08/writing-a-hadoop-mapreduce-task-in-java.html
總結
以上是生活随笔為你收集整理的用Java编写Hadoop MapReduce任务的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Redmi K70 Pro影像细节曝光:
- 下一篇: iPhone 15有望推动苹果超过三星