hadoop streaming编程小demo(python版)
大數(shù)據(jù)團(tuán)隊(duì)搞數(shù)據(jù)質(zhì)量評測。自動(dòng)化質(zhì)檢和監(jiān)控平臺(tái)是用django,MR也是通過python實(shí)現(xiàn)的。(后來發(fā)現(xiàn)有orc壓縮問題,python不知道怎么解決,正在改成java版本)
這里展示一個(gè)python編寫MR的例子吧。
抄一句話:Hadoop Streaming是Hadoop提供的一個(gè)編程工具,它允許用戶使用任何可執(zhí)行文件或者腳本文件作為Mapper和Reducer。
?
1、首先,先介紹一下背景,我們的數(shù)據(jù)是存放在hive里的。hive建表語句如下:
我們將會(huì)解析元數(shù)據(jù),和HDFS上的數(shù)據(jù)進(jìn)行merge,方便處理。這里的partition_key用的是year/month/day。
hive (gulfstream_ods)> desc g_order; OK col_name data_type comment order_id bigint 訂單id driver_id bigint 司機(jī)id,司機(jī)搶單前該值為0 driver_phone string 司機(jī)電話 passenger_id bigint 乘客id passenger_phone string 乘客電話 car_id int 接駕車輛id area int 城市id district string 城市區(qū)號 type int 訂單時(shí)效,0 實(shí)時(shí) 1預(yù)約 current_lng decimal(19,6) 乘客發(fā)單時(shí)的經(jīng)度 current_lat decimal(19,6) 乘客發(fā)單時(shí)的緯度 starting_name string 起點(diǎn)名稱 starting_lng decimal(19,6) 起點(diǎn)經(jīng)度 starting_lat decimal(19,6) 起點(diǎn)緯度 dest_name string 終點(diǎn)名稱 dest_lng decimal(19,6) 終點(diǎn)經(jīng)度 dest_lat decimal(19,6) 終點(diǎn)緯度 driver_start_distance int 司機(jī)與出發(fā)地的路面距離,單位:米 start_dest_distance int 出發(fā)地與終點(diǎn)的路面距離,單位:米 departure_time string 出發(fā)時(shí)間(預(yù)約單的預(yù)約時(shí)間,實(shí)時(shí)單為發(fā)單時(shí)間) strive_time string 搶單成功時(shí)間 consult_time string 協(xié)商時(shí)間 arrive_time string 司機(jī)點(diǎn)擊‘我已到達(dá)’的時(shí)間 setoncar_time string 上車時(shí)間(暫時(shí)不用) begin_charge_time string 司機(jī)點(diǎn)機(jī)‘開始計(jì)費(fèi)’的時(shí)間 finish_time string 完成時(shí)間 year string month string day string # Partition Information # col_name data_type comment year string month string day string?
2、我們解析元數(shù)據(jù)
這里是解析元數(shù)據(jù)的過程。之后我們把元數(shù)據(jù)序列化后存入文件desc.gulfstream_ods.g_order,我們將會(huì)將此配置文件連同MR腳本一起上傳到hadoop集群。
import subprocess from subprocess import Popendef desc_table(db, table):process = Popen('hive -e "desc %s.%s"' % (db, table),shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)stdout, stderr = process.communicate()is_column = Truestructure_list = list()column_list = list()for line in stdout.split('\n'):value_list = list()if not line or len(line.split()) < 2:breakif is_column:column_list = line.split()is_column = Falsecontinueelse:value_list = line.split()structure_dict = dict(zip(column_list, value_list))structure_list.append(structure_dict)return structure_list?
3、下面是hadoop streaming執(zhí)行腳本。
#!/bin/bashsource /etc/profile
source ~/.bash_profile
#hadoop目錄
echo "HADOOP_HOME: "$HADOOP_HOME
HADOOP="$HADOOP_HOME/bin/hadoop"
DB=$1
TABLE=$2
YEAR=$3
MONTH=$4
DAY=$5
echo $DB--$TABLE--$YEAR--$MONTH--$DAY
if [ "$DB" = "gulfstream_ods" ]
then
DB_NAME="gulfstream"
else
DB_NAME=$DB
fi
TABLE_NAME=$TABLE
#輸入路徑
input_path="/user/xiaoju/data/bi/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY/*"
#標(biāo)記文件后綴名
input_mark="_SUCCESS"
echo $input_path
#輸出路徑
output_path="/user/bigdata-t/QA/yangfan/$DB_NAME/$TABLE_NAME/$YEAR/$MONTH/$DAY"
output_mark="_SUCCESS"
echo $output_path
#性能約束參數(shù)
capacity_mapper=500
capacity_reducer=200
map_num=10
reducer_num=10
queue_name="root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop"
#啟動(dòng)job name
job_name="DW_Monitor_${DB_NAME}_${TABLE_NAME}_${YEAR}${MONTH}${DAY}"
mapper="python mapper.py $DB $TABLE_NAME"
reducer="python reducer.py"
$HADOOP fs -rmr $output_path
$HADOOP jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.7.2.jar \
-jobconf mapred.job.name="$job_name" \
-jobconf mapred.job.queue.name=$queue_name \
-jobconf mapred.map.tasks=$map_num \
-jobconf mapred.reduce.tasks=$reducer_num \
-jobconf mapred.map.capacity=$capacity_mapper \
-jobconf mapred.reduce.capacity=$capacity_reducer \
-input $input_path \
-output $output_path \
-file ./mapper.py \
-file ./reducer.py \
-file ./utils.py \
-file ./"desc.${DB}.${TABLE_NAME}" \
-mapper "$mapper" \
-reducer "$reducer"
if [ $? -ne 0 ]; then
echo "$DB_NAME $TABLE_NAME $YEAR $MONTH $DAY run faild"
fi
$HADOOP fs -touchz "${output_path}/$output_mark"
rm -rf ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
$HADOOP fs -get $output_path/part-00000 ./${DB_NAME}.${TABLE_NAME}.${YEAR}-${MONTH}-${DAY}
?
?4、這里是Wordcount的進(jìn)階版本,第一個(gè)功能是分區(qū)域統(tǒng)計(jì)訂單量,第二個(gè)功能是在一天中分時(shí)段統(tǒng)計(jì)訂單量。
mapper腳本
# -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding('utf-8')# 將字段和元數(shù)據(jù)匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作為partitionkey, 無用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口 def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)def mapper_plugin_1(item, mapper_result):# key在現(xiàn)實(shí)中可以是不同appkey, 是用來分發(fā)到不同的reducer上的, 相同的route用來分發(fā)到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returnmapper_output(key, {'area': area, 'district': district, 'order_id': order_id, 'count': 1})def mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]mapper_output(key, {'order_id': order_id, 'strive_time': strive_time, 'count': 1, 'day_hour': day_hour})except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)
reducer腳本
#!/usr/bin/env python # vim: set fileencoding=utf-8 import sys reload(sys) sys.setdefaultencoding('utf-8') import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route1':reducer_plugin_1(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])if route_key == 'route2':reducer_plugin_2(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_plugin_1(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('area') or not data.get('district') or not data.get('count'):continuekey = '_'.join([data.get('area'), data.get('district')])reducer_result[route_key].setdefault(key, 0)reducer_result[route_key][key] += int(data.get('count'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])def reducer_plugin_2(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif not data.get('order_id') or not data.get('strive_time') or not data.get('count') or not data.get('day_hour'):continuekey = data.get('day_hour')reducer_result[route_key].setdefault(key, {})reducer_result[route_key][key].setdefault('count', 0)reducer_result[route_key][key].setdefault('order_list', [])reducer_result[route_key][key]['count'] += int(data.get('count'))if len(reducer_result[route_key][key]['order_list']) < 100:reducer_result[route_key][key]['order_list'].append(data.get('order_id'))# print >> sys.stderr, '%s' % json.dumps(reducer_result[route_key])
def serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()
?
5、上一個(gè)版本,遭遇了reduce慢的情況,原因有兩個(gè):一是因?yàn)閞oute的設(shè)置,所有相同的route都將分發(fā)到同一個(gè)reducer,造成單個(gè)reducer處理壓力大,性能下降。二是因?yàn)榧菏谴罱ㄔ谔摂M機(jī)上的,性能本身就差。可以對這個(gè)問題進(jìn)行改進(jìn)。改進(jìn)版本如下,方案是在mapper階段先對數(shù)據(jù)進(jìn)行初步的統(tǒng)計(jì),緩解reducer的計(jì)算壓力。
mapper腳本
# -*- coding:utf-8 -*- #!/usr/bin/env python import sys import json import pickle reload(sys) sys.setdefaultencoding('utf-8')# 將字段和元數(shù)據(jù)匹配, 返回迭代器 def read_from_input(file, separator, columns):for line in file:if line is None or line == '':continuedata_list = mapper_input(line, separator)if not data_list:continueitem = None# 最后3列, 年月日作為partitionkey, 無用if len(data_list) == len(columns) - 3:item = dict(zip(columns, data_list))elif len(data_list) == len(columns):item = dict(zip(columns, data_list))if not item:continueyield itemdef index_columns(db, table):with open('desc.%s.%s' % (db, table), 'r') as fr:structure_list = deserialize(fr.read())return [column.get('col_name') for column in structure_list]# map入口 def main(separator, columns):items = read_from_input(sys.stdin, separator, columns)mapper_result = {}for item in items:mapper_plugin_1(item, mapper_result)mapper_plugin_2(item, mapper_result)for route_key, route_value in mapper_result.iteritems():for key, value in route_value.iteritems():ret_dict = dict()ret_dict['route_key'] = route_keyret_dict['key'] = keyret_dict.update(value)mapper_output('route_total', ret_dict)def mapper_plugin_1(item, mapper_result):# key在現(xiàn)實(shí)中可以是不同appkey, 是用來分發(fā)到不同的reducer上的, 相同的route用來分發(fā)到相同的reducerkey = 'route1'area = item.get('area')district = item.get('district')order_id = item.get('order_id')if not area or not district or not order_id:returntry:# total統(tǒng)計(jì) mapper_result.setdefault(key, {})mapper_result[key].setdefault('_'.join([area, district]), {})mapper_result[key]['_'.join([area, district])].setdefault('count', 0)mapper_result[key]['_'.join([area, district])].setdefault('order_id', [])mapper_result[key]['_'.join([area, district])]['count'] += 1if len(mapper_result[key]['_'.join([area, district])]['order_id']) < 10:mapper_result[key]['_'.join([area, district])]['order_id'].append(order_id)except Exception, ex:passdef mapper_plugin_2(item, mapper_result):key = 'route2'strive_time = item.get('strive_time')order_id = item.get('order_id')if not strive_time or not order_id:returntry:day_hour = strive_time.split(':')[0]# total統(tǒng)計(jì) mapper_result.setdefault(key, {})mapper_result[key].setdefault(day_hour, {})mapper_result[key][day_hour].setdefault('count', 0)mapper_result[key][day_hour].setdefault('order_id', [])mapper_result[key][day_hour]['count'] += 1if len(mapper_result[key][day_hour]['order_id']) < 10:mapper_result[key][day_hour]['order_id'].append(order_id)except Exception, ex:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def mapper_input(line, separator='\t'):try:return line.split(separator)except Exception, ex:return Nonedef mapper_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s%s%s' % (key, separator, data)# print >> sys.stderr, '%s%s%s' % (key, separator, data)if __name__ == '__main__':db = sys.argv[1]table = sys.argv[2]columns = index_columns(db, table)main('||', columns)reducer腳本
#!/usr/bin/env python # vim: set fileencoding=utf-8 import sys reload(sys) sys.setdefaultencoding('utf-8') import json import pickle from itertools import groupby from operator import itemgetterdef read_from_mapper(file, separator):for line in file:yield reducer_input(line)def main(separator='\t'):reducer_result = {}line_list = read_from_mapper(sys.stdin, separator)for route_key, group in groupby(line_list, itemgetter(0)):if route_key is None:continuereducer_result.setdefault(route_key, {})if route_key == 'route_total':reducer_total(route_key, group, reducer_result)reducer_output(route_key, reducer_result[route_key])def reducer_total(route_key, group, reducer_result):for _, data in group:if data is None or len(data) == 0:continueif data.get('route_key') == 'route1':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)elif data.get('route_key') == 'route2':reducer_result[route_key].setdefault(data.get('route_key'), {})reducer_result[route_key][data.get('key')].setdefault('count', 0)reducer_result[route_key][data.get('key')].setdefault('order_id', [])reducer_result[route_key][data.get('key')]['count'] += data.get('count')for order_id in data.get('order_id'):if len(reducer_result[route_key][data.get('key')]['order_id']) <= 10:reducer_result[route_key][data.get('key')]['order_id'].append(order_id)else:passdef serialize(data, type='json'):if type == 'json':try:return json.dumps(data)except Exception, ex:return ''elif type == 'pickle':try:return pickle.dumps(data)except Exception, ex:return ''else:return ''def deserialize(data, type='json'):if type == 'json':try:return json.loads(data)except Exception, ex:return []elif type == 'pickle':try:return pickle.loads(data)except Exception, ex:return []else:return []def reducer_input(data, separator='\t'):data_list = data.strip().split(separator, 2)key = data_list[0]data = deserialize(data_list[1])return [key, data]def reducer_output(key, data, separator='\t'):key = str(key)data = serialize(data)print '%s\t%s' % (key, data)# print >> sys.stderr, '%s\t%s' % (key, data)if __name__ == '__main__':main()?
遇到的問題:
1、The DiskSpace /user/bigdata/qa quota of ?is exceeded
在reducer結(jié)束后,遭遇如上問題,是因?yàn)镠DFS ?路徑下的disk容量已經(jīng)被沾滿,釋放容量即可;
?
轉(zhuǎn)載于:https://www.cnblogs.com/kangoroo/p/6151104.html
總結(jié)
以上是生活随笔為你收集整理的hadoop streaming编程小demo(python版)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 五菱混动车:不坑穷人?
- 下一篇: 又一重要Windows版本被放弃!Ser