性能提升约7倍!Apache Flink 与 Apache Hive 的集成
導讀:隨著 Flink 在流式計算的應用場景逐漸成熟和流行,如果 Flink 能同時把批量計算的應用場景處理好,就能減少用戶在使用 Flink 時開發和維護的成本,并且能夠豐富 Flink 的生態。SQL 是批計算中比較常用的工具,所以 Flink 針對于批計算也以 SQL 為主要接口。本次分享主要介紹 Flink 對批處理的設計與 Hive 的集成。主要分為下面三點展開:
設計架構
首先和大家分享一下 Flink 批處理的設計架構。
1. 背景
Flink 提升批處理的主要原因是為了減少客戶的維護成本和更新成本和更好的完善 Flink 生態環境。SQL 是批計算場景中一個非常重要的工具,所以希望以 SQL 作為在批計算場景的主要接口,為此我們著重優化了 Flink SQL 的功能。當前 Flink SQL 主要有下面幾點需要優化:
- 需要完整的元數據管理體制。
- 缺少對 DDL(數據定義語言 DDL 用來創建數據庫中的各種對象,如表、視圖、索引、同義詞、聚簇等)的支持。
- 與外部系統進行對接不是很方便,尤其是 Hive, 因為 Hive 是大數據領域最早的 SQL 引擎,所以 Hive 的用戶基礎非常廣泛,新的一些 SQL 工具,如 Spark SQL、Impala 都提供了與 Hive 對接的功能,這樣用戶才能更好地將其應用從 Hive 遷移過來,所以與 Hive 對接對 Flink SQL 而言也十分重要。
2. 目標
所以我們要完成以下目標:
- 定義統一的 Catalog 接口,這個是 Flink SQL 更方便與外部對接的前提條件。如果大家用過 Flink 的 TableSource 和 TableSink 來對接外部的系統的表,會發現不管是通過寫程序還是配置 yaml 文件會跟傳統的 SQL 使用方式會有些不同。所以我們肯定不希望 Hive 的用戶遷移 Flink SQL 需要通過定義 TableSouces 和 TableSink 的方式來與 Hive 進行交互。因此我們提供了一套新的 Catalog 接口以一種更接近傳統 SQL 的方式與 Hive 進行交互。
- 提供基于內存和可持久化的實現。基于內存就是 Flink 原有的方式,用戶所有的元數據的生命周期是跟他的 Session(會話)綁定的,Session(會話)結束之后所有的元數據都沒有了。因為要跟 Hive 交互所以肯定還要提供一個持久化的 Catalog。
- 支持 Hive 的互操作。有了 Catalog 之后用戶就可以通過 Catalog 訪問 Hive 的元數據,提供 Data Connector 讓用戶能通過 Flink 讀寫 Hive 的實際數據,實現 Flink 與 Hive 的交互。
- 支持 Flink 作為 Hive 的計算引擎(長期目標),像 Hive On Spark,Hive On Tez。
3. 全新設計的 Catalog API(FlIP-30)
用戶通過 SQL Client 或者 Table API 提交請求,Flink 會創建 TableEnvironment, TableEnvironment 會創建 CatalogManager 加載并配置 Catalog 實例,并且 Catalog 支持多種元數據類型 table、database、function、view、partition 等,在 1.9.0 的版本當中 Catalog 會有兩個實現:
- 一個是基于內存的 GenericinMemoryCatalog。
- 另一是 HiveCatalog,HiveCatalog 通過 HiveShim 與 Hive Metasotre 交互來操作 Hive 元數據,HiveShim 的作用是處理 Hive 在大版本中 Hive Metastore 不兼容的問題。
從這種實現的方式可以看出,用戶可以創建多個 Catalog,也可以訪問多個 Hive Metastore,來達到跨 Catalog 查詢的操作。
4. 讀寫 Hive 數據
有了元數據之后我們就可以實現 Flink SQL 的 Data Connector 來真正的讀寫 Hive 實際數據。Flink SQL 寫入的數據必須要兼容 Hive 的數據格式,也就是 Hive 可以正常讀取 Flink 寫入的數據,反過來也是一樣的。為了實現這一點我們大量復用 Hive 原有的 Input/Output Format、SerDe 等 API,一是為了減少代碼冗余,二是盡可能的保持兼容性。
在 Data Connect 中讀取 Hive 表數據具體實現類為:HiveTableSource、HiveTableInputFormat。寫 Hive 表的具體實現類為:HiveTableSink、HiveTableOutputFormat。
項目進展
其次和大家分享 Flink 1.9.0 的現狀和 1.10.0 中的新特性還有未來工作。
1. Flink 1.9.0 的現狀
Flink SQL 作為 1.9.0 版本中作為試用功能發布的,它的功能還不是很完善:
- 支持的數據類型還不全。(1.9.0 中帶參數的數據類型基本上都不支持:如 DECIMAL,CHAR 等)
- 對分區表的支持不完善,只能讀取分區表,不能寫分區表。
- 不支持表的 INSERT OVERWRITE。
2. Flink 1.10.0 中的新特性
Flink SQL 在 1.10.0 版本里我們做了比較多的進一步開發,與 Hive 集成的功能更加完整。
- 支持讀寫靜態分區和動態分區表。
- 在表級別和分區級別都支持 INSERT OVERWRITE。
- 支持了更多地數據類型。(除 UNION 類型都支持)
- 支持更多地 DDL。(CREATE TABLE/DATABASE)
- 支持在 Flink 中調用 Hive 的內置函數。(Hive 大約 200 多個內置函數)
- 支持了更多的 Hive 版本。(Hive 的 1.0.0~3.1.1)
- 做了很多性能優化如,Project/Predicate Pushdown,向量的讀取 ORC 數據等。
3. Module 接口
為了能讓用戶調用 Flink SQL 中調用 Hive 的內置函數,我們在 Flink 1.10 當中引入了一個 Module 接口。這個 Module 是為了讓用戶能夠方便的把外部系統的內置函數接入到系統當中。
- 使用方式和 Catalog 類似,用戶可以通過 Table API 或 Yaml 文件來配置 Module。
- Module 可以同時加載多個,Flink 解析函數的時候通過 Module 的加載順序在多個 Module 中查找函數的解析。也就是如果兩個 Module 包含名字相同的 Function,先加載的 Module 會提供 Function 的定義。
- 目前 Module 有兩個實現,CoreModule 提供了 Flink 原生的內置函數,HiveModule 提供了 Hive 的內置函數。
4. 未來工作
未來的工作主要是先做功能的補全,其中包括:
- View 的支持(有可能在 1.11 中完成)。
- 持續改進 SQL CLI 的易用性,現在支持翻頁顯示查詢結果,后續支持滾動顯示。并支持 Hive 的 -e -f 這種非交互式的使用方式。
- 支持所有的 Hive 常用 DDL,例如 CREATE TABLE AS。
- 兼容 Hive 的語法,讓原來在 Hive 上的工程在 Flink 的順滑的遷移過來。
- 支持 SQL CLI 的遠程模式,類似 HiveServer2 的遠程連接模式。
- 支持流式的寫入 Hive 數據。
性能測試
下面是 Flink 在批處理作業下與 HiveMR 對比測試的測試環境和結果。
1. 測試環境
首先我們的測試環境使用了 21 個節點的物理機群,一個 Master 節點和 20 個 Slave 節點。節點的硬件配置是 32 核,64 個線程,256 內存,網絡做了端口聚合,每個機器是 12 塊的 HDD 硬盤。
2. 測試工具
測試工具使用了 Hortonworks 的 hive-testbench,github 中一個開源的工具。我們使用這個工具生成了 10TB 的 TPC-DS 測試數據集,然后分別通過 Flink SQL 和 Hive 對該數據集進行 TPC-DS 的測試。
一方面我們對比了 Flink 和 Hive 的性能,另一方面我們驗證了 Flink SQL 能夠很好的訪問 Hive 的數據。測試用到了 Hive 版本是 3.1.1,Flink 用到的是 Master 分支代碼。
3. 測試結果
測試結果 Flink SQL 對比 Hive On MapReduce 取得了大約 7 倍的性能提升。這得益于 Flink SQL 所做的一系列優化,比如在調度方面的優化,以及執行計劃的優化等。總體來說如果用的是 Hive On MapReduce,遷移到 Flink SQL 會有很大性能的提升。
附最新性能對比詳情及思路解析:Flink 1.10 和 Hive 3.0 性能對比
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的性能提升约7倍!Apache Flink 与 Apache Hive 的集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 技术运维的经营大法——对话阿里云MVP熊
- 下一篇: 掌门1对1微服务体系Solar|阿里巴巴