python读取hive方案分析
python讀取hive方案對比
引言
最近接到一項任務–開發(fā)python工具,方便從HDFS讀取文件和Hive表數(shù)據(jù)。當前網(wǎng)上的方案大多是通過第三方python包實現(xiàn),只需導入指定pypi包即可完成,這種方案雖然在功能上具有可行性,但是當數(shù)據(jù)量級增大時,讀取數(shù)據(jù)效率低下,無法滿足業(yè)務場景需要,為此需調(diào)研其他方案實現(xiàn)python讀取Hive表功能。
方案分析
方案一(pyhive):
目前實驗場景下常見的方案使用pyhive,pyhive通過與HiveServer2通訊來操作Hive數(shù)據(jù)。當hiveserver2服務啟動后,會開啟10000的端口,對外提供服務,此時pyhive客戶端通過JDBC連接hiveserver2進行Hive sql操作。
Pyhive Client通過JDBC與hiveserver2建立通信,hiveserver2服務端發(fā)送HQL語句到Driver端,Driver端將HQL發(fā)送至Compiler組件進行語法樹解析,此時需在metastore獲取HQL相關(guān)的database和table等信息,在對HQL完成解析后,Compiler組件發(fā)送執(zhí)行計劃至Driver端等待處理,Driver端發(fā)送執(zhí)行計劃至Executor端,再由Executor端發(fā)送MapReduce任務至Hadoop集群執(zhí)行Job,Job完成后最終將HQL查詢數(shù)據(jù)發(fā)送Driver端,再由hive server2返回數(shù)據(jù)至pyhive Client。
python讀取hive表的Demo:
from pyhive import hivedef read_jdbc(host, port, database: str, table: str, query_sql: str) -> DataFrame:# 1、連接hive服務端hive.Connection(host=host, port=10000, database=database)cursor = conn.cursor()logger.info('connect hive successfully.')# 2、執(zhí)行hive sqlcursor.execute(query_sql)logger.info('query hive table successfully.')# 3、返回pandas.dataframetable_len = len(table) + 1columns = [col[0] for col in cursor.description]col = list(map(lambda x: x[table_len:], columns))result = cursor.fetchall()return pd.DataFrame(result, columns=col)方案二(impyla):
目前還有用戶通過impyla訪問hive表,impyla通過與HiveServer2通訊來操作Hive數(shù)據(jù)。當hiveserver2服務啟動后,會開啟10000的端口,對外提供服務,此時impyla客戶端通過JDBC連接hiveserver2進行Hive sql操作。impyla與hive通信方式和大體相同,具體流程可以參考方案一流程圖。
python讀取hive表的Demo:
from impala.dbapi import connectdef read_jdbc(host, port, database: str, table: str, query_sql: str) -> DataFrame:# 1、連接hive服務端conn = connect(host=host, port=10000, database="test", auth_mechanism='PLAIN')cursor = conn.cursor()# 2、執(zhí)行hive sqlcursor.execute(query_sql)logger.info('query hive table successfully.')# 3、返回pandas.dataframetable_len = len(table) + 1columns = [col[0] for col in cursor.description]col = list(map(lambda x: x[table_len:], columns))result = cursor.fetchall()return pd.DataFrame(result, columns=col)方案三(pyarrow+thrift):
從方案一流程圖中可以了解到上述兩種方案都JDBC和服務端建立連接,客戶端和hiveserver2建立通信后,解析Hive sql并執(zhí)行MapReduce的方式訪問Hive數(shù)據(jù)文件,當Hive數(shù)據(jù)量增大時,對數(shù)據(jù)進行MapReduce操作和數(shù)據(jù)之間的網(wǎng)絡(luò)傳輸會使得讀取數(shù)據(jù)面臨延遲高,效率低等問題。
分析上述方案我們可知,在Hadoop集群進行Mapreduce,查詢后結(jié)果數(shù)據(jù)經(jīng)Driver、Executor和hiveserver2才可返回至Client,在數(shù)據(jù)量級增大的情況下,這些步驟無疑會成為制約python訪問hive的效率的因素,為了解決上述問題,我們采用直接讀取Hdfs存儲文件的方式獲取Hive數(shù)據(jù)的方式,規(guī)避上述問題。
- hive metastore中存儲Hive創(chuàng)建的database、table、表的字段、存儲位置等元信息,在讀取HDFS文件之前,首先需通過thrift協(xié)議和hive metastore服務端建立連接,獲取元數(shù)據(jù)信息;
- 為了解決數(shù)據(jù)快速增長和復雜化的情況下,大數(shù)據(jù)分析性能低下的問題,Apache Arrow應運而生,在讀取HDFS文件時采用pyarrow讀取hive數(shù)據(jù)文件的方式。
為了在本地生成hive metastore服務端文件,首先在hive源碼中下載hive_metastore.thrift文件,在thrift源碼中下載fb303.thrift文件,其次執(zhí)行以下命令。
執(zhí)行后可以得到以下目錄文件
python向hive表中寫入數(shù)據(jù)和讀取hive表的Demo:
from hive_service import ThriftHive from hive_service.ttypes import HiveServerException from thrift import Thrift from thrift.transport import TSocket from thrift.transport import TTransport from thrift.protocol import TBinaryProtocol import subprocessfrom pyarrow.parquet import ParquetDataSet import pyarrow.parquet as pq import pyarrow as pa from libraries.hive_metastore.ThriftHiveMetastore import Clientdef connect_hive() -> Client:"""通過thrift連接hive metastore服務端"""transport = TSocket.TSocket(host, int(port))transport = TTransport.TBufferedTransport(transport)transport.open()protocol = TBinaryProtocol.TBinaryProtocol(transport)return ThriftHiveMetastore.Client(protocol)def write_table(client: Client, database: str, table: str, dataframe: DataFrame, partitions: list = None):"""提供給用戶將dataFrame寫入hive表中的方式Examples:client = connect_hive(host, port)df = pd.DataFrame({'index': [1, 2, 3],'name': ['xiaoming', 'xiaowang', 'xiaozhang'],'prt_dt': ['2020', '2019', '2020']})partition_cols = ['prt_dt']write_table(client, database, table, df, partition_cols)Args:client(Client):hive客戶端,通過thrift協(xié)議訪問hive metastoredatabase(str):數(shù)據(jù)庫table(str):表名dataframe(pandas.DataFrame):pandas.DataFramepartitions(list):分區(qū)信息raise:HiveDatabaseNOTEXIST:Hive庫不存在時拋出異常HiveTableNOTEXIST:Hive表不存在時拋出異常"""# 1、連接hive服務端client = connect_hive(host, port)# 2、檢查數(shù)據(jù)庫是否存在,如果不存在則拋出異常databases = client.get_all_databases()if database not in databases:raise HiveDatabaseNOTEXIST('Hive database is not exist.')# 3、創(chuàng)建hive表,如果表名重復則拋出異常tables = client.get_all_tables(database)if table not in tables:raise HiveTableNOTEXIST('Hive table is not exist.')# 4、將pandas中字段int64類型轉(zhuǎn)為intcolumns = dataframe.columnsint64_fields = {}float64_fields = {}for field in columns:if pd.api.types.is_int64_dtype(dataframe[field]):int64_fields[field] = 'int32'if pd.api.types.is_float_dtype(dataframe[field]):float64_fields[field] = 'float32'transfer_fields = dict(int64_fields, **float64_fields)transfer_df = dataframe.astype(transfer_fields)# 5、將dataframe寫入hive表中table_hdfs_path = client.get_table(database, table).sd.locationtable = pa.Table.from_pandas(transfer_df)pq.write_to_dataset(table=table, root_path=table_hdfs_path, partition_cols=partitions)# 6、寫入分區(qū)表時需刷新元數(shù)據(jù)信息(msck repair table ***)shell = "hive -e 'msck repair table {}' ".format('train_data.telecom_train')subprocess.Popen(shell,shell=True)def read_table(data_source: DataSource, database: str, table: str, partitions: list = None) -> DataFrame:"""提供給用戶根據(jù)hive庫名和表名訪問數(shù)據(jù)的方式-->dataframe(thrift、urllib、pyarrow、pyhdfs)Examples:client = connect_hive(host, port)read_table(client,'test','test')Args:client(Client):hive客戶端,通過thrift協(xié)議訪問hive metastoredatabase(str):hive庫名table(str):hive表名partitions(list):hive表分區(qū)(用戶需按照分區(qū)目錄填寫),如果查詢所有數(shù)據(jù),則無需填寫分區(qū)Return:pandas.dataframe"""# 1、連接hive服務端client = connect_hive(host, port)# 2、查詢hive表元數(shù)據(jù)table = client.get_table(database, table)table_hdfs_path = table.sd.locationlogging.info('table_hdfs_path:' + table_hdfs_path)print(table_hdfs_path)# 3、判斷hive是否為分區(qū)表,當用戶沒有輸入partitions時需查找所有分區(qū)數(shù)據(jù)if partitions is not None:table_hdfs_path = [table_hdfs_path + constant.FILE_SEPARATION + x for x in partitions][0]dataframe = pq.ParquetDataset(table_hdfs_path).read().to_pandas()# pyarrow訪問分區(qū)目錄時,dataframe不含分區(qū)列,因此需添加分區(qū)列信息for partition in partitions:index = partition.find('=')field = partition[:index]field_value = partition[index + 1:]dataframe[field] = field_valueelse:dataframe = pq.ParquetDataset(table_hdfs_path).read().to_pandas()return dataframe方案對比
為了驗證分析三種方案在讀取數(shù)據(jù)性能的差異,我們設(shè)置了對比實驗,準備27維數(shù)據(jù),在數(shù)據(jù)量不斷遞增情況下執(zhí)行SELECT查詢語句,我們可以得到如下折線圖。
方式在讀取效率上優(yōu)于pyarrow+thrift方案,此后,隨著數(shù)據(jù)量級不斷增大,pyarrow+thrift方案較其他兩種方案有明顯優(yōu)勢。在線下測試中我們發(fā)現(xiàn),讀取百萬級數(shù)據(jù)時,pyhive和impyla需要大約4分鐘,而pyarrow+thrift只需20s。
結(jié)論
上一章節(jié)中,三種方案在讀取同一數(shù)據(jù)時性能上的差異,可以清楚知道數(shù)據(jù)量在3w左右時,三種方案在讀取數(shù)據(jù)性能上的表現(xiàn)相差不大,但當數(shù)據(jù)量級不斷增大時,通過pyarrow+thrift方案在讀取性能上明顯優(yōu)于前兩種方案。因此,在萬級數(shù)據(jù)以上推薦使用pyarrow+thrift方式訪問Hive數(shù)據(jù),可以極大提高python讀取hive數(shù)據(jù)的效率。
總結(jié)
以上是生活随笔為你收集整理的python读取hive方案分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安装pyhive包
- 下一篇: 数据库和SQL基本知识点