python还是hadoop_使用Python和Hadoop Streaming编写MapReduce
最近有個需求,就是對視頻日志中的部分URL提取出來,并隨機挑選五條。由于線上日志比較大,而且需要每天執行一次,如果單純的用python即便是多線程性能也會大大折扣。于是考慮到用hadoop的MR去實現。
準備工作:
1、hadoop集群;
2、mapper和reducer劃分;
初步把提取url的操作劃分為mapper中執行,隨機挑選5條由reducer去執行。
首先看下MR的工作原理:
上圖是MR的workflow,在介紹Hadoop Streaming的時候,可以拿來做參照。
Hadoop 和 MapReduce已經如日中天。Hadoop 不僅可以使用Java進行MapReduce的編寫,也通過Hadoop Streaming的方式提供了其他語言編寫MR的接口。更重要的是,使用python來編寫MR,比使用親兒子Java編寫MR要更簡單和方便……所以在一些不非常復雜的任務中使用python來編寫MR比起使用Java,是更加劃算的。Hadoop Streaming:
Hadoop Streaming提供了一個便于進行MapReduce編程的工具包,使用它可以基于一些可執行命令、腳本語言或其他編程語言來實現Mapper和 Reducer,從而充分利用Hadoop并行計算框架的優勢和能力。Hadoop Streaming比較獨特的一點是利用的UNIX標準輸入輸出stdin和stdout,所以只要能處理stdin和stdout的編程語言都能夠使用Hadoop Streaming來進行MR的編寫。甚至,wc、awk這些linux自帶的能處理標準輸入輸出的程序,也能被用來編寫Hadoop Streaming。怎么工作的呢:Hadoop Streaming 提供了一個hadoop-streaming.jar,默認處于$HADOOP_HOME目錄下。如果不在可以自己搜索下,然后使用Hadoop 執行該jar,傳入MR job 的參數們開始MapReduce。一個最基本的使用Hadoop Streaming來執行MR的命令行如下:
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop ?jar \
/home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar \
-input /user/eng-test/vidio ?-output /user/eng-test/vidiooutnew ?\
-mapper ?/home/eng-test/donhiyue/mapper.py ?\
-reducer /home/eng-test/donhiyue/reducer.py
這時候我們執行下上面的命令,這個時候會報錯:1、sh: /home/eng-test/donhiyue/mapper.py: 沒有那個文件或目錄
這里因為map是需要分發給下面的各個slave去執行的,所以有個文件拷貝的動作,這里加上-file就可以了,于是命令變為如下:
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop jar /home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar
-input /user/eng-test/vidio ?-output /user/eng-test/vidiooutnew3
-file/home/eng-test/dongshiyue/mapper.py
-file/home/eng-test/dongiyue/reducer.py
-mapper ?/home/eng-test/dongiyue/mapper.py
-reducer /home/eng-test/dongshiyue/reducer.py2、報各種命令錯誤如import失敗等,是因為-mapper 應該用Python執行
/home/xitong/software/hadoop-0.20.2.1U29/bin/hadoop jar /home/xitong/software/hadoop-0.20.2.1U29/contrib/streaming/hadoop-streaming.jar
-input /user/eng-test/vidio ?-output /user/eng-test/vidiooutnew3
-file/home/eng-test/dongshiyue/mapper.py
-file/home/eng-test/dongiyue/reducer.py
-mapper ?“python /home/eng-test/dongiyue/mapper.py"
-reducer ?"python /home/eng-test/dongshiyue/reducer.py
總結下流程:
Hadoop Streaming的工作流程大概如下:
hadoop-streaming.jar向Hadoop集群注冊一個新的job,傳入input path和output path等
開始mapper時,Hadoop Streaming會將輸入文件按行傳入stdin
我們自己編寫的mapper需要讀取stdin每一行,對其進行處理
mapper處理完畢的中間結果也要寫入stdout,在Python中print語句默認即輸出到stdout,當然若是不放心,也可以手動指定輸出流。對于輸出到stdout中的每一行,hadoop將默認以’\t’作為分隔符切割成k-v的格式。
mapper處理結束后,Hadoop 自動進行partition、sort、group,準備進行reduce任務
Hadoop Streaming將中間結果按行傳給reducer
我們編寫的reducer需要讀取stdin的每一行,對其進行處理
處理結束之后仍然輸出到stdout中
Hadoop 轉存到output path中
結束
最后:
我們一般在本地調試的話,用這條命令就可以了:
cat part-00001 |python mapper.py |sort|python reducer.py
part-00001為要執行的文件。調試通過后再去集群上執行;
map代碼示例:
點擊(此處)折疊或打開
import json
import sys
import pattern
def map():
patterns = {}
urls = []
for line in sys.stdin:
line = line.strip()
#print line
json_obj = json.loads(line)
play_url = json_obj['play_url'].strip()
if play_url != "":
urlpatter = pattern.hand_url(play_url)
if urlpatter not in patterns.keys():
patterns[urlpatter] = [play_url]
else:
patterns[urlpatter].append(play_url)
for i in patterns.keys():
print "%s\t%s"%(i,patterns[i])
if __name__ == '__main__':
map()
reduce代碼示例(只是示例,暫未做隨機挑選5條的功能):
點擊(此處)折疊或打開
import sys
def reducer():
for line in sys.stdin:
#print line
key,value = line.rstrip('\n').split('\t')
print key,value
if __name__ == "__main__":
reducer()
總結
以上是生活随笔為你收集整理的python还是hadoop_使用Python和Hadoop Streaming编写MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python单例_Python单例模式
- 下一篇: python 培训 儿童_Python编