简单的MapReduce实践
簡單的MapReduce實踐
文章目錄
- 簡單的MapReduce實踐
- 操作環境
- 實現文件合并和去重操作
- 新建項目
- 新建Java程序
- 打包程序
- 運行程序
- 實現文件的倒排索引
- 第一步,Map
- 第二步,Combiner
- 第三步,Reduce
- 配置參數
- 總體代碼
- 參考文章
操作環境
- 操作系統:Ubuntu 16.04
- JDK 版本:1.8
- Hadoop 版本:Hadoop 3.1.3
- Java IDE:Eclipse
我的 Hadoop安裝目錄是“/usr/local/hadoop”,環境變量 HDAOOP_HOME也是這個目錄,在博客中看到“/usr/local/hadoop”這樣的目錄或環境變量 HADOOP_HOME,請記得轉換為自己的 Hadoop安裝目錄。
實現文件合并和去重操作
對于兩個輸入文件,即文件 A 和文件 B,請編寫 MapReduce 程序,對兩個文件進行合并, 并剔除其中重復的內容,得到一個新的輸出文件 C。
輸入文件A 樣例
hadoop spark flink storm s4 pig hive hbase spark sql輸入文件B 樣例
model view controller hadoop spark合并去重之后的輸出文件C 樣例如下
controller flink hadoop hbase hive model pig s4 spark sql storm view我們先在 home目錄下建立兩個文件 A.txt、B.txt,并把樣例內容輸入進去。在之后的步驟中我們再上傳這兩個文件到 HDFS中。
新建項目
我們啟動 Eclipse,在菜單欄選擇 “File”->“New”->“Java Project”,創建一個新的 Java項目。
在“Project name”中輸入工程名稱,這里我們就叫“MapReduce_Practice”。勾選“Use defaul locationt”,讓工程文件保存在我們設置的 Eclipse的工作區里。JRE部分選擇“Use a project specific JRE”,使用我們自己安裝的 JDK版本。然后點擊“next”,進入下一步。
我們需要為項目導入必要的 JAR包,這些 JAR包中包含了可以訪問 MapReduce的 Java API。JAR包的位置在“Hadoop安裝目錄/share/hadoop”目錄下。比如我的是在“/usr/local/hadoop/share/hadoop”目錄下,下面的操作中請選擇到自己配置的 hadoop目錄下導入 JAR包。
我們點擊標題欄的“Libraries”,點擊“Add Externtal JARs”
在新的彈窗中,我們通過選擇上面的文件目錄,進入“/usr/local/hadoop/share/hadoop”目錄,記住是進入自己的Hadoop安裝目錄。
我們需要向 Java工程中添加以下 JAR包:
- “/usr/local/hadoop/share/hadoop/common”目錄下的所有 JAR包,即 hadoop-common-3.1.3.jar、hadoop-common-3.1.3-tests.jar、haoop-nfs-3.1.3.jar和、haoop-kms-3.1.3.jar,不包括 jdiff、lib、sources、webapps四個文件夾。
- “/usr/local/hadoop/share/hadoop/common/lib”目錄下的所有 JAR包
- “/usr/local/hadoop/share/hadoop/mapreduce”目錄下的所有 JAR包。同樣地,不包括 jdiff、lib、sources、webapps四個文件夾。
- “/usr/local/hadoop/share/hadoop/mapreduce/lib”目錄下的所有 JAR包
我們分四次操作,把需要的 JAR包全部導入進來(lib目錄下的 JAR包太多,我們可以使用 Ctrl+A快捷鍵進行全選)。所需的 JAR包全部添加完畢以后,我們點擊右下角的“Finish”,完成 Java工程的創建。
新建Java程序
我們開始新建一個 Java程序,在 Eclipse界面左側找到我們剛才創建的項目,點擊鼠標右鍵,選擇“New”->“Class”。
在“Package”中填入包名,這里我們填“test”。在“Name”中輸入程序的名字,這里我們就叫“Merge”。其他的設置都保持默認,點擊“finish”。
界面如下
接下來,我們就開始編寫實現文件合并和去重操作的 MapReduce程序了。
這個程序比較簡單,就分為 Map和 Reduce兩步。
- 在 Map中,直接記錄每一個單詞即可,將輸入中的 value復制到輸出數據的 key上。
- 在 Reduce中更簡單,直接根據 key來劃分的,相同的 key放在一起,將輸入中的 key復制到輸出數據的 key上,寫一次 context即可。
打包程序
下面我們需要把剛才編寫的 Java程序打成 JAR包,部署到 Hadoop平臺上去運行。
我們在 Hadoop目錄下,使用命令行創建一個 myapp目錄,用來存放打好的 JAR包。
cd $HADOOP_HOME mkdir myapp我們回到 Eclipse,在左側的“Package Explorer”面板中,找到我們的工程“MapReduce_Practice”,點擊鼠標右鍵,在彈出的選項中選擇“Export”。
在彈出的窗口中選擇“Java”->“Runnable JAR file”,點擊“Next”。
將下列三項配置完成,然后點擊“Finish”。
- “Launch configuration”用于設置 JAR包被部署時運行的主類,我們需要在下拉列表中選擇剛才編寫的“Merge”。
- “Export destination”用于設置 JAR包保存的位置,這里我們直接就設置為剛才新建的 myapp目錄,即“/usr/local/hadoop/myapp/Merge.jar”。
- “Library handling”用于設置打包的方式,我們選擇“Extract required libraries into generated JAR”。
如果想知道打包方式之間的具體區別,請參考這篇博客 eclipse 導出可運行jar包時三種Library handling的區別,這里我們記住選擇“Extract required libraries into generated JAR”就可以了。
點擊“Finish”之后,系統會彈出警告,忽略掉即可,直接點擊下方的“OK”按鈕,將程序進行打包。
打包完成后,會彈出一個警告窗口進行提示,點擊“OK”即可。
到這一步,我們已經成功把 Java程序打包為 JAR包并放置在了指定目錄,我們可以在終端中進行查看。
cd /usr/local/hadoop/myapp ll運行程序
運行 JAR包之前,我們需要做三步準備工作。
第一步,打開 Terminal終端,用命令行啟動 Hadoop進程
cd $HADOOP_HOME ./sbin/start-dfs.sh第二步,刪除用戶目錄下之前存在的 input、output文件夾(如果沒有這兩個文件夾則跳過這一步)
cd $HADOOP_HOME ./bin/hdfs dfs -rm -r input ./bin/hdfs dfs -rm -r output第三步,在用戶目錄下建立 input文件夾,并將之前創建的 A.txt、B.txt文件上傳到 input文件夾中
cd $HADOOP_HOME ./bin/hdfs dfs -mkdir input ./bin/hdfs dfs -put ~/A.txt input ./bin/hdfs dfs -put ~/B.txt input準備工作完成后,我們就可以使用 hadoop jar來運行 JAR包了
cd $HADOOP_HOME ./bin/hadoop jar ./myapp/Merge.jar input output記住,這里我們不需要建立 output文件夾,hadoop運行過程中會自動建立的。
運行結束后,輸入文件的合并和去重結果就寫入 output文件夾中了,我們可以輸入命令查看結果
./bin/hdfs dfs -cat output/*
可以看到,文件的合并和去重操作順利完成。由于 Hadoop的設定,如果要再次運行 Merge.jar程序,必須先刪除 output文件夾。
實現文件的倒排索引
編寫 MapReduce 程序,實現對多個輸入文件的內容建立倒排索引,輸出單詞到文檔的映射關系及單詞在該文檔中的出現次數。
我們建立一個新的 class,名稱為 ReverseIndex,包名還是 test。這里我們主要講解代碼,具體的操作細節自行參考上面的文件合并。輸入數據仍然采用 input文件夾中的 A.txt、B.txt。
這個程序比文件合并稍微復雜一些。需要記錄文件名,還要統計單詞在文件中出現的次數。我們分為三步進行,Map-Combiner-Reduce。
第一步,Map
因為需要輸出單詞到文檔的映射關系及單詞在該文檔中的出現次數,所以我們需要獲取文件的名稱。在 Map中,通過 FileSplit獲取文件的完整路徑,切割掉最后一個“/”之前的字符,就得到了文件的名稱。然后,用“–”作為連接符,將單詞和文件名放在一起作為key值,value值就填為1。
public static class myMap extends Mapper<Object, Text, Text, Text>{public void map(Object key, Text value, Context context) throws IOException,InterruptedException{FileSplit fileSplit = (FileSplit)context.getInputSplit();String filePath = fileSplit.getPath().toString();String fileName = filePath.substring(filePath.lastIndexOf("/")+1);context.write(new Text(value+"--"+fileName), new Text("1"));} }第二步,Combiner
在 Combiner中進行預處理,統計單詞在文件中出現的次數。將單詞和文件名拆分開,word[0]是單詞,word[1]是文件名。設置key值為單詞,設置value值為文件名加次數。
public static class myCombiner extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{int sum = 0;for(Text v:values) {sum++;}String[] word = key.toString().split("--");context.write(new Text(word[0]), new Text(word[1]+" show: "+sum+" times"));} }第三步,Reduce
MapReduce固定的輸出格式中會在 value的開頭加 tab制表符,我們需要調整一下輸出格式。在 Reduce中,給 value加入適當的回車符和制表符。之前經過了 myCombiner類的預處理,可以直接輸出了。key值就是單詞,value值是文件名加單詞次數。
public static class myReduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{StringBuilder wordIndex = new StringBuilder();for(Text v:values) {wordIndex.append(v.toString()).append("\n\t");}context.write(new Text(key.toString()+"\n"), new Text(wordIndex.toString()));} }配置參數
在主函數中配置必要的參數。指定 Conf配置項,設置 Job的使用類和輸出類型,設置文件的輸入輸出路徑,并使用 try-catch語句來處理異常。
這里我們有一個優化,如果文件路徑下存在 output文件夾則自動刪除,這樣避免了每次手動刪除 output文件夾的麻煩。
//指定Conf配置項 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); System.setProperty("HDAOOP_USER_NAME", "hadoop")try {//設置JobJob job = Job.getInstance(conf, "ReverseIndex");job.setJarByClass(ReverseIndex.class);job.setMapperClass(myMap.class);job.setCombinerClass(myCombiner.class);job.setReducerClass(myReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//設置文件路徑Path input = new Path("/user/hadoop/input");Path output = new Path("/user/hadoop/output");//自動刪除output文件夾FileSystem fs = FileSystem.get(conf);if(fs.exists(output)) {fs.delete(output, true);}FileInputFormat.addInputPath(job, input);FileOutputFormat.setOutputPath(job, output);System.exit(job.waitForCompletion(true)?0:1);} catch (Exception e) {e.printStackTrace();System.exit(1); }總體代碼
package test;import java.io.IOException;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class ReverseIndex {public static class myMap extends Mapper<Object, Text, Text, Text>{public void map(Object key, Text value, Context context) throws IOException,InterruptedException{FileSplit fileSplit = (FileSplit)context.getInputSplit();String filePath = fileSplit.getPath().toString();String fileName = filePath.substring(filePath.lastIndexOf("/")+1);context.write(new Text(value+"--"+fileName), new Text("1"));}}public static class myCombiner extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{int sum = 0;for(Text v:values) {sum++;}String[] word = key.toString().split("--");context.write(new Text(word[0]), new Text(word[1]+" show: "+sum+" times"));}}public static class myReduce extends Reducer<Text, Text, Text, Text>{public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException{StringBuilder wordIndex = new StringBuilder();for(Text v:values) {wordIndex.append(v.toString()).append("\n\t");}context.write(new Text(key.toString()+"\n"), new Text(wordIndex.toString()));}}public static void main(String[] args) {Configuration conf = new Configuration();conf.set("fs.defaultFS", "hdfs://localhost:9000");System.setProperty("HDAOOP_USER_NAME", "hadoop")try {Job job = Job.getInstance(conf, "ReverseIndex");job.setJarByClass(ReverseIndex.class);job.setMapperClass(myMap.class);job.setCombinerClass(myCombiner.class);job.setReducerClass(myReduce.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);Path input = new Path("/user/hadoop/input");Path output = new Path("/user/hadoop/output");FileSystem fs = FileSystem.get(conf);if(fs.exists(output)) {fs.delete(output, true);}FileInputFormat.addInputPath(job, input);FileOutputFormat.setOutputPath(job, output);System.exit(job.waitForCompletion(true)?0:1);} catch (Exception e) {e.printStackTrace();System.exit(1);}} }最終執行結果如下
參考文章
MapReduce編程實踐(Hadoop3.1.3)
簡單的HDFS操作
MapReduce編程(二) 文件合并和去重
hadoop < MapReduce 編寫程序 實現倒排索引>
總結
以上是生活随笔為你收集整理的简单的MapReduce实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第一章-大数据概述
- 下一篇: 第二章-大数据处理框Hadoop