hive left outer join 子查询临时表_基于历史数据的用户访问次数,每天新老用户,日活,周活,月活的hive计算...
最近有一個需求,統計每天的新老用戶,日活,周活,月活。
我們每天的增量數據會加入到hive歷史數據表中,包含用戶訪問網站的一些信息,字段有很多,包括用戶唯一標識guid。
當然了日活,周活,月活就是一個count(distinct(guid))語句,非常常用的sql。
但是這里的問題是:
A:每天的新老用戶應該怎么統計呢?B:這還不簡單,判斷用戶guid是否存在與歷史庫guid中嘛?
A:歷史數據幾十個T,大概一百億行,你要每天將當日數據(2~3億行)與歷史數據幾億行進行join判斷?
B:額,這個,這個,好像不行哦!
是的,歷史數據里面是用戶網站訪問行為,同一個用戶在同一天,不同的天都有可能出現,guid在歷史表中會有多次。如果直接join,性能很差,實際上是做了很多不必要的工作。
解決方案:
維護一張用戶表,里面有4列:guid, starttime, endtime, num,分別是用戶的guid,第一次訪問時間,最后一次訪問時間,訪問天數;從某個狀態開始,歷史表中guid是唯一的;
當天數據去重后,與歷史庫join,如果guid在歷史庫出現過,則將endtime更新為當天時間,num加一;
否則,這是一個新用戶,插入歷史庫,starttime, endtime都為當天時間,num初始值為1。
維護了這么一張用戶表后,接下來就可以寫hql統計業務了,計算當天新老用戶時,只需要與這個歷史庫進行join就行了(目前為止4千萬),當日guid去重后是1千多萬,這樣就是4千萬~1千萬的join了,與開始4千萬~100億的join,性能會有巨大提升。
hive歷史表的設計與hive相關配置
可以看到這里hive歷史表history_helper需要頻繁修改,hive表支持數據修改需要在${HIVE_HOME}/conf/hive-site.xml中添加事務支持:
<property><name>hive.support.concurrency</name><value>true</value> </property> <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value> </property> <property><name>hive.txn.manager</name><value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> </property> <property><name>hive.compactor.initiator.on</name><value>true</value> </property> <property><name>hive.compactor.worker.threads</name><value>1</value> </property>為了提高查詢速度,hive歷史表與增量表這里都分桶,hive-xite.xml配置:
<property><name>hive.enforce.bucketing</name><value>true</value> </property>為了提高reduce并行度,也設置一下:
set mapred.reduce.tasks = 50;這個最好在hive命令行配置,表明只在當前程序使用該配置,就不要配置配置文件了。
歷史庫建表語句:
create external table if not exists hm2.history_helper (guid string,starttime string,endtime string,num int ) clustered by(guid) into 50 buckets stored as orc TBLPROPERTIES ("transactional"="true");當天增量表,保存去重后的guid,建表語句:
create table if not exists hm2.daily_helper (guid string,dt string ) clustered by(guid) into 50 buckets stored as orc TBLPROPERTIES ("transactional"="true");思路
由于這種需要寫成定時模式,所以這里用python腳本來實現,將hive查詢結果保存到本地文件result.txt,然后python讀取result.txt,連接數據庫,保存當天的查詢結果。
代碼
helper.py
#!/usr/bin/python # -*- coding:utf-8 -*-# hive更新歷史用戶表,日常查詢,保存到MySQLimport sys import datetime import commands import MySQLdb# 獲取起始中間所有日期 def getDays(starttime,endtime,regx):datestart=datetime.datetime.strptime(starttime,regx)dateend=datetime.datetime.strptime(endtime,regx)days = []while datestart<=dateend:days.append(datestart.strftime(regx))datestart+=datetime.timedelta(days=1)return days# 獲得指定時間的前 n 天的年、月、日,n取負數往前,否則往后 def getExacYes(day, regx, n):return (datetime.datetime.strptime(day,regx) + datetime.timedelta(days=n)).strftime(regx)# 獲得距離現在天數的年、月、日,n 取值正負含義同上,昨天就是getYes(regx,-1) def getYes(regx, n):now_time = datetime.datetime.now()yes_time = now_time + datetime.timedelta(days=n)yes_time_nyr = yes_time.strftime(regx)return yes_time_nyr# 執行hive命令 def execHive(cmd):print cmdres = commands.getstatusoutput(cmd)return res# 獲得當前是星期幾 def getWeek(regx):now_time = datetime.datetime.now()week = now_time.strftime(regx)return week# 格式化日期,加上雙引號 def formatDate(day):return """ + day + """# 數據保存到mysql def insertMysql(dt, path, tbName, regx):# new, dayAll, stayvalues = []with open(path) as file:line = file.readline()while line:values.append(line.strip())line = file.readline()dayAll = int(values[1])new = float(values[0])/dayAllold = 1 - new# 獲取數據庫連接conn = MySQLdb.connect("0.0.0.0", "statistic", "123456", "statistic")# 獲取游標cursor = conn.cursor()# 查詢昨天的用戶人數yesDay = getExacYes(dt, regx, -1)sql = 'select dayAll from %s where dt = %s'%(tbName, formatDate(yesDay))try:cursor.execute(sql)except Exception as e:print eyesAll = int(cursor.fetchall()[0][0])stay = float(values[2]) / yesAllprint stay# 獲取游標cursor2 = conn.cursor()sql = 'insert into %svalues("%s",%f,%f,%f,%d)'%(tbName, dt, new, old, stay, dayAll)print sqltry:cursor2.execute(sql)conn.commit()except:conn.rollback()finally:conn.close()# 初始化,刪除臨時表,并且創建 def init():# 設置分桶環境cmd = 'source /etc/profile;hive -e 'set hive.enforce.bucketing = true;set mapred.reduce.tasks = 50;''(status,result) = execHive(cmd)# 清除當天的臨時表,結果保存cmd = 'source /etc/profile;hive -e 'drop table hm2.daily_helper;''(status,result) = execHive(cmd)if status == 0:print '%s昨天臨時表刪除完畢...'%(day)else:print resultsys.exit(1)cmd = 'source /etc/profile;hive -e 'create table if not exists hm2.daily_helper(guid string,dt string)clustered by(guid) into 50 buckets stored as orc TBLPROPERTIES ("transactional"="true");''(status,result) = execHive(cmd)if status == 0:print '%s臨時表創建完畢...'%(day)else:print resultsys.exit(1)# 主函數入口 if __name__ == '__main__':regx = '%Y-%m-%d'resultPath = '/home/hadoop/statistic/flash/helper/result.txt'days = getDays('2018-07-01','2018-07-20',regx)tbName = 'statistic_flash_dailyActive_helper'for day in days:init()# 當天數據去重后保存到臨時表daily_helpercmd = 'source /etc/profile;hive -e 'insert into hm2.daily_helper select distinct(guid),dt from hm2.helper where dt = "%s" and guid is not null;''%(day)print '%s數據正在導入臨時表...'%(day)(status,result) = execHive(cmd)if status == 0:print '%s數據導入臨時表完畢...'%(day)else:print resultsys.exit(1)# guid存在則更新 endtime 與 numcmd = 'source /etc/profile;hive -e 'update hm2.history_helper set endtime = "%s",num = num + 1 where guid in (select guid from hm2.daily_helper);''%(day)print '正在更新endtime 與 num...'(status,result) = execHive(cmd)if status == 0:print '%s history_helper數據更新完畢'%(day)else :print resultsys.exit(1)# 當天新用戶cmd = 'source /etc/profile;hive -e 'select count(1) from hm2.daily_helper where guid not in (select guid from hm2.history_helper);' > %s'%(resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 不存在插入cmd = 'source /etc/profile;hive -e 'insert into hm2.history_helperselect daily.guid,dt,dt,1 from hm2.daily_helper dailywhere daily.guid not in (select guid from hm2.history_helper where guid is not null);''print '正在插入數據到history_helper表...'(status,result) = execHive(cmd)if status == 0:print '%s數據插入hm2.history_helper表完成'%(day)else:print resultsys.exit(1)# 當天總人數cmd = 'source /etc/profile;hive -e 'select count(1) from hm2.daily_helper;' >> %s'%(resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 次日活躍留存cmd = 'source /etc/profile;hive -e 'select count(1) from(select guid from hm2.helper where dt = "%s" group by guid) yesinner join(select guid from hm2.helper where dt = "%s" group by guid) todaywhere yes.guid = today.guid;' >> %s'%(getExacYes(day, regx, -1), day, resultPath)(status,result) = execHive(cmd)if status != 0:print resultsys.exit(1)# 結果保存到mysqlinsertMysql(day, resultPath, tbName, regx)print '=========================%s hive 查詢完畢,結果保存數據到mysql完成=============================='%(day)這是在處理歷史數據,然后就是每天定時處理了,在linux crontab里加個定時器任務就好了。
微信掃碼關注總結
以上是生活随笔為你收集整理的hive left outer join 子查询临时表_基于历史数据的用户访问次数,每天新老用户,日活,周活,月活的hive计算...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 电脑ip地址设置_路由器动态IP和静态I
- 下一篇: vue2.0 唤起百度地图app_开车选