Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现
TopN 是統計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的 TopN 不同于批處理的 TopN,它的特點是持續的在內存中按照某個統計指標(如出現次數)計算 TopN 排行榜,然后當排行榜發生變化時,發出更新后的排行榜。本文主要講解 Flink SQL 是如何從語法和實現上設計 TopN 的。
TopN 語法
全局 TopN
用戶最關心的是如何用 SQL 寫出 TopN 的查詢。大家最熟悉的 TopN 的寫法一般是這樣的:
SELECT column_name(s) FROM table_name WHERE condition ORDER BY order_field [DESC|ASC] LIMIT number如上語法是 MySQL 的 TopN 語法,使用?ORDER BY?指定排序鍵和排序方向,使用?LIMIT?來指定選出前幾名。不同的數據庫的 TopN 語法不盡相同,比如 MS SQL Server 使用 TOP 的關鍵字,Oracle 使用 ROWNUM 的隱藏字段。不過幾家數據庫提供的 TopN 語法都是全局 TopN,也就是數據是全局進行排序的,查詢的結果只有一組排行榜。比如希望對全網商家按銷售額排序,計算出銷售額排名前十的商家。這就是全局 TopN,范例如下:
SELECT * FROM shop_sales ORDER BY sales DESC LIMIT 10分組 TopN
上文講述了全局 TopN 的語法,但是很多時候用戶希望根據不同的分組進行排序,計算出每個分組的一個排行榜。例如對全網商家根據行業按銷售額排序,計算出每個行業銷售額前十名的商家。這時候,傳統的 TopN 語法就無法表達這種需求了。有些 Stream SQL 系統為了解決這個問題,會 hack 一種新的 TopN 語法允許用戶指定分組字段。但是 Flink SQL 是基于 ANSI SQL 標準語法的,不能加入任何非標準的語法。于是我們嘗試從批處理的角度去思考這個問題,在傳統批處理中常用 ROW_NUMBER 的開窗聚合函數來解決分組 TopN 的問題。語法如下所示:
SELECT * FROM (SELECT *,ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownumFROM table_name) WHERE rownum <= N [AND conditions]參數說明:
- ROW_NUMBER(): 是一個計算行號的OVER窗口函數,行號計算從1開始。
- PARTITION BY col1[, col2..]?: 指定分區的列,可以不指定。
- ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: 指定排序的列,可以多列不同排序方向。
如上語法所示,TopN 需要兩層 query,子查詢中使用ROW_NUMBER()開窗函數來為每條數據標上排名,排名的計算根據PARTITION BY和ORDER BY來指定分區列和排序列,也就是說每一條數據會計算其在所屬分區中,根據排序列排序得到的排名。在外層查詢中,對排名進行過濾,只取出排名小于 N 的,如 N=10,那么就是取 Top 10 的數據。如果沒有指定PARTITION BY那么就是一個全局 TopN 的計算,所以 ROW_NUMBER 在使用上更為靈活。
如《實時計算 Flink SQL 核心功能解密》中所述,Flink SQL 是一個流與批統一的 API,也就是說理論上一段 SQL 要既能跑在批處理模式下,也能跑在流處理模式下,且輸出的結果是一致的。那么在流處理模式下理所當然地應該支持 ROW_NUMBER 形式的 TopN 語法。例如上文說的對全網商家根據行業按銷售額排序,計算出每個行業銷售額前十名的商家,SQL 范例如下。
SELECT * FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) AS rownumFROM shop_sales) WHERE rownum <= 10TopN 實現和優化
ROW_NUMBER 方式的 TopN 語法非常靈活,能滿足全局 TopN 和分組 TopN 的需求。但是在流計算上的物理執行是一個挑戰。如上文所述的每個行業銷售額前十商家排行榜,經過 SQL 編譯后得到的抽象語法樹(AST)如下所示。
LogicalWindow 會對所有數據進行排名,也就是說每當到達一個數據,就要對歷史數據進行重排序,并輸出歷史數據的新的排名,然后 LogicalCalc 節點會根據排名進行過濾。這在性能上是非常糟糕的,因為這無限放大了流量。而我們知道,最優的流式 TopN 的計算只需要維護一個 N 元素大小的小根堆,每當有數據到達時,只需要與堆頂元素比較,如果比堆頂元素還小,則直接丟棄;如果比堆頂元素大,則更新小根堆,并輸出更新后的排行榜。也就是說我們不需要分為兩個節點進行計算,不需要將所有數據進行排序,只需要在一個節點中就可以高效地完成計算。所以我們在查詢優化器中加入了一條規則,在使用 TopN 語法時,將 LogicalWindow 和 LogicalCalc 合并成了 LogicalRank 節點。LogicalRank 在翻譯成物理執行計劃時,會使用一個經過特殊設計的 TopN 算子。
TopN 算子的實現上主要有兩個數據結構,一個是 TreeMap,另一個是 MapState。TreeMap 的作用類似于上文的小根堆,有序地存放了排名前 N 的元素。但是 TreeMap 是個內存數據結構,在 failover 后會丟失,無法保證數據的一致性。因此我們還有一個 MapState 結構,MapState 是 Flink 提供的狀態接口,用來存儲 TopN 的數據(保證數據不丟)。當有 failover 發生后,MapState 能保證狀態的恢復,而 TreeMap 會從 MapState 中重新構造出來。我們并有沒有把順序也存到狀態中去,因為順序是可以在恢復時重構的。因為每一次狀態的讀寫操作都會涉及到序列化/反序列化,往往是性能的瓶頸,所以 TreeMap 的主要作用是降低了對 MapState 狀態的讀寫操作。對大部分數據來說都是與 TreeMap 進行交互,不需要對 MapState 進行讀寫的,全是內存操作,所以 TopN 的性能是非常高的。
TopN 算子的主要處理流程是,每當有數據到達時,會與 TreeMap 的最小的元素比較,如果比它小,那么該數據就不可能是 TopN 的一員,直接丟棄即可。如果比它大,那么就會先更新 TreeMap,同時更新 MapState 中的存的數據。最后輸出更新后的排行榜。為了減少冗余數據的輸出,我們只會輸出排名發生變化的數據。例如原先的第7名上升到了第六名,那么只需要輸出新的第六名和第七名即可。
嵌套 TopN 解決熱點問題
TopN 的計算與 GroupBy 的計算類似,如果數據存在傾斜,則會有計算熱點的現象。比如全局 TopN,那么所有的數據只能匯集到一個節點進行 TopN 的計算,那么計算能力就會受限于單臺機器,無法做到水平擴展。解決思路與 GroupBy 是類似的,就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用于分散熱點。例如,計算全網排名前十的商鋪,會導致單點的數據熱點,那么可以先加一層分組 TopN,組的劃分規則是根據店鋪 ID 哈希取模后分成128組(并發的倍數)。第二層 TopN 與原先的寫法一樣,沒有 PARTITION BY。第一層會計算出每一組的 TopN,而后在第二層中進行合并匯總,得到最終的全網前十。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴展的。使用嵌套 TopN 的優化寫法如下所示:
CREATE VIEW tmp_topn AS SELECT * FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY HASH_CODE(shop_id)%128 ORDER BY sales DESC) AS rownumFROM shop_sales) WHERE rownum <= 10SELECT * FROM (SELECT shop_id, shop_name, sales,ROW_NUMBER() OVER (ORDER BY sales DESC) AS rownumFROM tmp_topn) WHERE rownum <= 10總結
流式 TopN 不僅在語法以及算法上會遇到很多挑戰,在不同場景下的優化方案也是個非常有意思的話題。目前 Flink SQL 的 TopN 功能已經大量應用于彩票業務、阿里云的CDN項目、WAF項目等等。未來,我們除了會針對更多的場景對 TopN 進行優化,還會提供除了 ROW_NUMBER 外的?RANK、RANK_DENSE?排名函數,使得 TopN 更加靈活
總結
以上是生活随笔為你收集整理的Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cisco 热备份路由器协议HSRP笔记
- 下一篇: 骨骼动画