使用Hadoop Streaming 完成MapReduce(Python代码)
一 Map和Reduce?
?
首先看下MR的工作原理
?
MapReduce的好處是它可以把在內存中不能完成的事轉變成可以在硬盤上高效完成。
Map--‐Reduce 對于集群的好處:
1,在多節點上冗余地存儲數據,以保證數據的持續性和一直可取性
2, 將計算移向數據端,以最大程度減少數據移動
3,簡單的程序模型隱藏所有的復雜度
Map,Reduce一般的流程:
Map階段:
a, 逐個文件逐行掃描
b, 掃描的同時抽取出我們感興趣的內容 (Keys)
Group by key
排序和洗牌
(Group by key階段會自動的運行,不需要自己寫)
Reduce階段:
a, 聚合 、 總結 、 過濾或轉換
b, 寫入結果
?
二? Hadoop Streaming原理
Hadoop 不僅可以使用Java進行MapReduce的編寫,也通過Hadoop Streaming的方式提供了其他語言編寫MR的接口。更重要的是,使用python來編寫MR,比使用親兒子Java編寫MR要更簡單和方便……所以在一些不非常復雜的任務中使用python來編寫MR比起使用Java,是更加劃算的。
Hadoop streaming是Hadoop的一個工具, 它幫助用戶創建和運行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行文件或腳本文件充當mapper或者reducer。
比如可以使用python語言來寫map-reduce使用“Hadoop Streaming”來完成傳統mapreduce的功能。
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -input myInputDirs \ -output myOutputDir \ -mapper mapper.py \ -reducer reducer.py
上述代碼通過參數input,output,mapper,reducer來定義輸入數據,輸出數據,mapper文件,reducer文件。
在上面的代碼中,mapper和reducer都是可執行文件,它們從標準輸入讀入數據(一行一行讀), 并把計算結果發給標準輸出。Streaming工具會創建一個Map/Reduce作業, 并把它發送給合適的集群,同時監視這個作業的整個執行過程。
如果一個可執行文件被用于mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行文件作為一個單獨的進程啟動。 mapper任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。 同時,mapper收集可執行文件進程標準輸出的內容,并把收到的每一行內容轉化成key/value對,作為mapper的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。
如果一個可執行文件被用于reducer,每個reducer任務會把這個可執行文件作為一個單獨的進程啟動。 Reducer任務運行時,它把輸入切分成行并把每一行提供給可執行文件進程的標準輸入。 同時,reducer收集可執行文件進程標準輸出的內容,并把每一行內容轉化成key/value對,作為reducer的輸出。 默認情況下,一行中第一個tab之前的部分作為key,之后的(不包括tab)作為value。
?
三 詞頻統計的例子?
?
Python實現Wordcount:
1. mapper.py
[root@vm wordcount]# vim mapper.py
寫入
2. reducer.py
[root@vm wordcount]# vim reducer.py
寫入
3. 準備一個測試文件test.txt
[root@vm wordcount]# vim test.txt
寫入
4. 本地測試
[root@vm wordcount]# cat test.txt |python mapper.py |sort|python reducer.py
a 4 is 4 test 4 this 4
[root@vm wordcount]#
5. 集群運行
集群運行前要將本地的測試文件上傳到hdfs
[root@vm wordcount]# hadoop fs -mkdir /user/root/wordcount
[root@vm wordcount]# hadoop fs -put test.txt /user/root/wordcount/
[root@vm wordcount]# hadoop fs -ls /user/root/wordcount/
Found 1 items
-rw-r--r--?? 3 root root???????? 60 2018-05-14 09:58 /user/root/wordcount/test.txt
[root@vm wordcount]#
?
運行mapreduce
[root@vm wordcount]# hadoop jar /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar -D mapred.reduce.tasks=1 -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py -input /user/root/wordcount/test.txt -output /user/root/wordcount/out
?
命令行查看結果
[root@vm wordcount]# hadoop fs -cat /user/root/wordcount/out/part-00000 a 4 is 4 test 4 this 4 [root@vm wordcount]#四 使用第三方的Python庫
$HADOOP_HOME/bin/hadoop streaming -D mapred.job.priority='VERY_HIGH' -D mared.job.map.capacity=500 -D mapred.reduce.tasks=0 -D mapred.map.tasks=500 -input myInputDirs(你得HDFS路徑) -output myOutputDir(你的HDFS路徑) -mapper "python? yourpythonfile.py" -reducer "python? yourpythonfile.py" -file yourpythonfile.py(需要幾個就添加幾個-file) -cacheArchive "/xx/xx/xx/myvp.tar.gz#myvp"(此處是一個HDFS路徑,稍后用到)
?
使用第三方庫
需要使用第三方庫如bs4,numpy等時,需要用到虛擬環境virtualenv
virtualenv的使用
安裝
pip install virtualenv
新建虛擬環境
virtualenv myvp
使得虛擬環境的路徑為相對路徑
virtualenv --relocatable myvp
激活虛擬環境
source myvp/bin/activate
如果想退出,可以使用下面的命令
deactivate
激活后直接安裝各種需要的包
pip install XXX
壓縮環境包
tar -czf myvp.tar.gz myvp
在mapreduce上使用
在上面的腳本中可以看到使用了-catchArchive,但是路徑是HDFS的路徑,因此需要提前將本地的myvp.tai.gz包上傳到HDFS上。
同時#后面的myvp是文件的文件夾,解壓后還有一個myvp(因為壓縮的時候把文件夾本身也壓縮進去了),所有map中使用的時候的路徑就是myvp/myvp/bin/…
在map的python腳本中加入如下的代碼,會把第三方庫加入到python 路徑
import sys
sys.path.append("myvp/myvp/lib/python2.7")
?
?
?
?
?
參考:
https://blog.csdn.net/wawa8899/article/details/80305720
https://blog.csdn.net/wh357589873/article/details/70049088 ?
總結
以上是生活随笔為你收集整理的使用Hadoop Streaming 完成MapReduce(Python代码)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 40+个精选的VSCode前端插件,总有
- 下一篇: users的权限_user是啥