大数据之hadoop伪集群搭建与MapReduce编程入门
一句話介紹hadoop: Hadoop的核心由分布式文件系統HDFS與Map/Reduce計算模型組成。
(1)HDFS分布式文件系統 HDFS由三個角色構成: 1)NameNode 2)DataNode:文件存儲的基本單元,它將文件塊block存儲在本地文件系統中 3)Client:需要獲取分布式文件系統文件的應用程序 文件寫入:client向NameNode發起文件寫入請求,NameNode分配合適的DataNode地址給Client,Client將文件劃分為多個Block后,根據DataNode地址順序寫入數據 文件讀取:client向NameNode發起文件讀取請求,NameNode返回文件存儲的DataNode信息,Client讀取文件信息 一個Block會有三份備份,一份放在NameNode指定的DataNode上,一份放在與指定DataNode不在同一臺機器的DataNode上,最后一份放在與指定DataNode同一Rack的DataNode上
(2)Map/Reduce計算模型 由一個單獨運行在主節點的JobTracker和運行在每個集群從節點的TaskTracker共同構成。 一個Map/Reduce作業job通常會把輸入的數據集切分為若干獨立的數據塊,由Map任務task以完全并行的方式處理它們。框架會先對Map的輸出進行排序,然后把結果輸入給Reduce任務。 mapper->combine->partition->reduce 在hadoop上運行的作業需要指明程序的輸入/輸出位置,并通過實現合適的接口或抽象類提供Map和Reduce函數。同時還需要指定作業的其他參數,構成作業配置Job Configuration。 在Hadoop的JobClient提交作業(JAR包/可執行程序等)和配置信息給JobTracker之后,JobTracker會負責分發這些軟件和配置信息給slave及調度任務,并監控它們的執行,同時提供狀態和診斷信息給JobClient
關鍵術語: NameNode\DataNode: JobTracker\TaskTracker mapper->combine->partition->reduce
二、開發環境搭建 ubuntu 64位為例 第一步:安裝java 第二步:安裝ssh,rsync
apt-get install ssh rsync
第三步:下載hadoop穩定版本,解壓縮慣例,了解關鍵文件 bin/ ? hadoop, hdfs, mapred,yarn等可執行文件sbin/ ? start-dfs.sh start-yarn.sh stop-dfs.sh stop-yarn.sh等可執行文件etc/hadoop env和site等配置文件libexec/ hadoop-config.sh hdfs-config.sh mapred-confg.sh yarn-config.sh等用于配置的可執行文件logs/ 日志文件share 文檔與jar包,可以認真瀏覽一下這些jar包,Map-Reduce編程接口就靠它include/ 頭文件?lib/ 動態鏈接庫
第四步:hadoop配置
cd?hadoop-2.6.0/
(1)修改腳本要用的環境變量,以運行hadoopvim etc/hadoop/hadoop-env.sh?
編輯export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64?[替換JDK所在目錄]
(2)配置HDFS和MapReduce常用的I/O設置
vim etc/hadoop/core-site.xml?
編輯<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> #配置HDFS的地址及端口號 </property> </configuration>? ? ? ? ? ? ?
(3)Hadoop守護進程的配置項,包括namenode,輔助namenode和datanodevim etc/hadoop/hdfs-site.xml?
編輯
<configuration> <property> <name>dfs.replication</name> #配置備份方式,默認為3.在單機版里需要改為1 <value>1</value> </property> </configuration>
(4)? ??MapReduce守護進程的配置項,包括jobtracker和tasktracker?? ? ? ?? cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xmlvim?etc/hadoop/mapred-site.xml? ? ? ? ? ? ? ? ? 編輯
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> </configuration>
? ? ? ? ? ? ??vim etc/hadoop/yarn-site.xml
編輯<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
第五步:配置SSH服務免密碼登陸 ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsacat ~/.ssh/id_dsa.pub >>~/.ssh/authorized_keys 檢查是否配置成功
ssh localhost
第六步:啟動hadoop 1. 格式化HDFS
?bin/hdfs namenode -format
2. 啟動進程 啟動NameNode與DataNode守護進程sbin/start-dfs.sh??
啟動ResourceManager 和NodeManager守護進程
sbin/start-yarn.sh?
NameNode的web接口?http://localhost:50070/ Resourcemanager的web接口?http://localhost:8088/
第七步: 體驗執行MapReduce job, 對文件內容進行字符匹配 第1)步:創建MapReduce job執行目錄 bin/hdfs dfs -mkdir /userbin/hdfs dfs -mkdir /user/root
第2)步:拷貝hadoop配置文件到hadoop input目錄下
bin/hdfs dfs -put etc/hadoop input
很不幸,報錯了 15/05/08 13:54:30 WARN hdfs.DFSClient: DataStreamer Exception
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/xxxxx._COPYING_ could only be replicated to 0 nodes instead of minReplication (=1).??There are 0 datanode(s)?running and no node(s) are excluded in this operation.
定位原因:查看logs目錄下datanode日志【在google之前先看日志,是非常良好的習慣】 發現以下提示 ?Lock on /tmp/hadoop-root/dfs/data/in_use.lock acquired by
關閉hdfs后手動刪除該目錄 sbin/stop-dfs.shrm -rf /tmp/hadoop-root/dfs/data/* 重新啟動
sbin/start-dfs.sh
查看DataNode是否啟動
jps
輸出如下 22848 Jps20870 DataNode
20478 NameNode
31294 FsShell
19474 Elasticsearch
21294 SecondaryNameNode
確認ok了,重新copy文件
bin/hdfs dfs -put etc/hadoop input
出錯原因:反復執行了?bin/hdfs namenode -format?,但沒有手動清除文件鎖定
第3)步:運行mapreduce示例
bin/hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.0.jar grep input output 'dfs[a-z.]+' ???????
我們可以在Resourcemanager的web接口?http://localhost:8088/?看到任務運行情況 ?
第4)步:查看結果
bin/hdfs dfs -cat output/*
6 dfs.audit.logger4 dfs.class
3 dfs.server.namenode.
2 dfs.period
2 dfs.audit.log.maxfilesize
2 dfs.audit.log.maxbackupindex
1 dfsmetrics.log
1 dfsadmin
1 dfs.servers
1 dfs.replication
1 dfs.file 我們也可以把結果merge到單個文件中來看
bin/hdfs dfs -getmerge output/ output
三、MapReduce編程入門 環境IDE:eclipse 導入hadoop核心包 hadoop-core-1.2.1.jar 導出:我們自己的fat jar包
示例代碼放在我的git上 https://github.com/tanjiti/mapreduceExample HelloWorld版本的MapReduce 仍然用單詞切割的例子,單詞切割就是HelloWorld版本的MapReduce 文件結構如下 ├── bin 存放編譯后的字節文件 ├── lib 存放依賴jar包,來自hadoop安裝文件share/hadoop/ │?? ├── commons-cli-1.2.jar │?? ├── hadoop-common-2.6.0.jar │?? └── hadoop-mapreduce-client-core-2.6.0.jar ├── mymainfest 配置文件,指定classpath └── src 源文件└── mapreduceExample└── WordCount.java 我一般在eclipse里編碼(可視化便利性),然后采用命令行編譯打包(命令行高效性)
第一步:源碼編輯
vim src/mapreduceExample/WordCount.java
編輯package mapreduceExample;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser;public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: mapreduceExample.WordCount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //設置mapper類 job.setCombinerClass(IntSumReducer.class); //設置combiner類,該類的作用是合并map結果,減少網絡I/O job.setReducerClass(IntSumReducer.class);//設置reducer類 job.setOutputKeyClass(Text.class);//設置reduce結果輸出 key的類型 job.setOutputValueClass(IntWritable.class); //設置reduce結果輸出 value的類型 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//設置輸入數據文件路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//設置輸出數據文件路徑 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
第二步:編譯打包編譯
?javac?-d bin/?-sourcepath src/?-cp lib/hadoop-common-2.6.0.jar:lib/hadoop-mapreduce-client-core-2.6.0.jar:lib/commons-cli-1.2.jar src/mapreduceExample/WordCount.java
編寫配置文件
vim mymainfest?
編輯Class-Path:?lib/hadoop-common-2.7.0.jar?lib/hadoop-mapreduce-client-common-2.7.0.jar?lib/commons-cli-1.2.jar注:mainfest 配置詳見 https://docs.oracle.com/javase/tutorial/deployment/jar/manifestindex.html其他重要配置項Main-Class: 指定入口class,因為一個mapreduce項目,往往會有多種入口,因此不配置該項
打包
jar cvfm mapreduceExample.jar mymainfest lib/* src/* -C bin .
查看jar包內容
jar tf mapreduceExample.jar?
META-INF/ META-INF/MANIFEST.MF lib/commons-cli-1.2.jar lib/hadoop-common-2.6.0.jar lib/hadoop-mapreduce-client-core-2.6.0.jar src/mapreduceExample/ src/mapreduceExample/WordCount.java mapreduceExample/ mapreduceExample/WordCount.class mapreduceExample/WordCount$TokenizerMapper.class mapreduceExample/WordCount$IntSumReducer.class
第三步: 運行查看Usagebin/hadoop jar /home/tanjiti/mapreduceExample/mapreduceExample.jar mapreduceExample.WordCount
顯示Usage: just4test.WordCount <in> <out>
當然,這是個很簡陋的使用說明,但起碼知道了參數是哪些,好了,正式運行
bin/hadoop jar?
/home/tanjiti/mapreduceExample/mapreduceExample.jar[jar所在路徑]?
mapreduceExample.WordCount[main?Class,如果打包jar包時在manifest文件中指定了就不需要指定該參數]?
/in[輸入文件路徑,在執行任務前必須存在]?
/out[結果輸出路徑,在執行任務前不應該存在該路徑]
先合并結果再查看bin/hdfs dfs -getmerge /out wordcount_result
tail wordcount_result
部分結果如下"/?app=vote&controller=vote&action=total&contentid=7%20and%201=2%20union%20select%20group_concat(md5(7));%23" 1
MapReduce的思想其實非常簡單??一句話來描述這個過程,就是開啟一個MapReducer Job,設置好相應的Configuration,指定輸入輸出數據源的路徑與格式,指定數據流K,V的格式(很多時候需要自定義格式,繼承Writable),指定處理過程(map,combine,partition,sort,reduce)。四、更多再實現這些基本功能后,我們下一步會考慮如何共享數據,是讀寫HDFS文件,還是采用Configuration配置,還是使用DistributedCache;如何何處理多個mapreducejob,是線性的方式,還是ControlledJob,還是ChainMapper、ChainReducer。等功能都搞定了,我們再考慮性能優化的問題,例如數據預處理(合并小文件,過濾雜音、設置InputSplit的大小),是否啟用壓縮方式,設置Map和Reduce任務的數量等job屬性,全靠實戰來填坑。
總結
以上是生活随笔為你收集整理的大数据之hadoop伪集群搭建与MapReduce编程入门的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编程范式,程序员的编程世界观
- 下一篇: Scheme 语言概要