腾讯基于 Flink SQL 的功能扩展与深度优化实践
整理:戴季國(Flink 社區志愿者)
校對:苗文婷(Flink 社區志愿者)
摘要:本文由騰訊高級工程師杜立分享,主要介紹騰訊實時計算平臺針對 Flink SQL 所做的優化,內容包括:
一、背景及現狀
1. 三種模式的分析
Flink 作業目前有三種創建方式:JAR 模式、畫布模式和 SQL 模式。不同的提交作業的方式針對的人群也是不一樣的。
■ Jar 模式
Jar 模式基于 DataStream/DataSet API 開發,主要針對的是底層的開發人員。
- 優點:
· 功能靈活多變,因為它底層的 DataStream/DataSet API 是 Flink 的原生 API,你可以用它們開發任何你想要的算子功能或者 DAG 圖;
· 性能優化方便,可以非常有針對性的去優化每一個算子的性能。
- 缺點:
· 依賴更新繁瑣,無論擴展作業邏輯或是 Flink 版本的升級,都要去更新作業的代碼以及依賴版本;
· 學習門檻較高。
■ 畫布模式
所謂的畫布模式,一般來講會提供一個可視化的拖拉拽界面,讓用戶通過界面化的方式去進行拖拉拽操作,以完成 Flink 作業的編輯。它面向一些小白用戶。
- 優點:
· 操作便捷,畫布上可以很方便地定義 Flink 的作業所包含的各種算子;
· 功能較全,它基于 Table API 開發,功能覆蓋比較完整;
· 易于理解,DAG 圖比較直觀,用戶能夠非常容易的去理解整個作業的運行流程。
- 缺點:
· 配置復雜:每一個算子都需要去逐個的去配置,如果整個 DAG 圖非常復雜,相應的配置工作也會非常大;
· 邏輯重用困難:如果作業非常的多,不同的作業之間想去共享 DAG 邏輯的話非常困難。
■ SQL 模式
SQL 語言已經存在了很長時間了,它有自己的一套標準,主要面向數據分析人員。只要遵循既有的 SQL 標準,數據分析人員就可以在不同的平臺和計算引擎之間進行切換。
- 優點:
· 清晰簡潔,易于理解和閱讀;
· 與計算引擎解耦,SQL 與計算引擎及其版本是解耦的,在不同的計算引擎之間遷移業務邏輯不需要或極少需要去更改整段 SQL。同時,如果想升級 Flink 版本,也是不需要去更改 SQL;
· 邏輯重用方便,可以通過 create view 的方式去重用我們的 SQL 邏輯。
- 缺點:
· 語法不統一,比如說流與維表 Join,Flink 1.9 之前使用 Lateral Table Join 語法,但是在 1.9 之后,更改成了 PERIOD FOR SYSTEM_TIME 語法,這種語法遵循了 SQL ANSI 2011 標準。語法的變動使得用戶有一定的學習成本;
· 功能覆蓋不全:Flink SQL 這個模塊存在的時間不是很長,導致它的功能的一個覆蓋不是很全。
· 性能調優困難:一段 SQL 的執行效率主要由幾個部分來決定,一個就是 SQL 本身所表達的業務邏輯;另一部分是翻譯 SQL 所產生的執行計劃的一個優化;第三部分的話,在產生最優的邏輯執行計劃之后,翻譯成本地的 native code 的時候方案也決定了 SQL 的執行效率;對于用戶來講的,他們所能優化的內容可能只局限于 SQL 所表達的業務邏輯。
· 問題定位困難:SQL 是一個完整的執行流程,如果我們發現某些數據不對,想針對性地去排查到底是哪個算子出了問題,是比較的困難的。一般來講,我們想定位 Flink SQL 的問題,只能先不斷的精簡我們的整個 SQL 邏輯,然后不斷地去嘗試輸出,這個成本是非常高的。騰訊實時計算平臺后期會針對這個問題,增加 trace 日志和 metrics 信息,輸出到產品側以幫助用戶定位 Flink SQL 使用上的問題。
2. 騰訊實時計算平臺目前的工作
■ 擴展語法
定義了 window table-valued function 語法,以幫助用戶實現基于窗口的流 Join 和交并差操作。另外,實現了自己的流與維表 Join 的語法。
■ 新增功能
新增的一些功能,包括兩個新的 Window 的類型,Incremental Window(增量窗口)和 Ehanced Tumble Window(增強窗口)。實現了 Eventtime Field 與 Table Source 的解耦,很多時候 Eventtime Field 并不能通過 Table Source 字段定義出來,比如 Table Source 是一個子查詢或者某個時間字段是由函數轉換得出,想要用這些中間生成的時間字段作為 Eventtime Field 目前是做不到的,我們目前的方案是,讓用戶可以選擇物理表中任意的時間字段來定義 Window 的時間屬性并輸出 WaterMark。
■ 性能調優
- 回撤流優化;
- 內聯 UDF,如果相同的 UDF 既出現在 LogicalProject 中,又出現在 Where 條件中,那么 UDF 會進行多次調用。將邏輯執行計劃中重復調用的 UDF 提取出來,將該 UDF 的執行結果進行緩存,避免多次調用;
■ Bucket Join
流表維表 Join 中存在數據冷啟動問題,如果 Flink 任務在啟動時大量加載外部數據,很容易造成反壓。可以在啟動時利用 State Processor API 等手段將全部數據預加載到內存中。但這種方案存在一種問題,維表數據加載到所有的 subtask 里面會造成較大的內存消耗。因此我們的解決方案是,在維表的定義中指定一個 bucket 信息,流與維表進行 Join 的時候會基于 bucket 信息去加載維表中對應分片的數據,同時在翻譯執行計劃的時候流表拿到 bucket 信息,以保證流與維表的數據都會基于同一個 bucket 信息進行 Join。這種方式能大大減少全量維表數據預加載帶來的內存消耗問題。
二、 窗口功能擴展
騰訊實時計算平臺基于現有 Flink SQL 語法進行了一些擴展,并另外定義了兩種新的 Window 類型。
1. 新的窗口操作
現有如下需求,需要在兩條流上針對某個時間窗口做 Join 操作或者交并差操作。
使用 Flink SQL 基于某個 Window 去做雙流 Join,現有的方案有兩種,第一種方案就是先做 Join 再做 Group By,第二種就是 Interval Join。首先來分析一下第一種方案能否滿足需求。
■ 1.1 先 Join 再開窗
先 Join 再開窗的邏輯如上圖所示,根據邏輯執行計劃可以看到 Join 節點在 Window Aggregate 節點之下,所以會先進行流與流的 Join,Join 完了之后再去做Window Aggregate。
圖中右側的流程圖也可以看出,首先兩邊的流會做一個 Connect,然后基于 Join Key 做 Keyby 操作,以此保證兩條流中擁有相同 Join Key 的數據能夠 Shuffle 到同一個 task 上。左流會將數據存到自己的狀態中,同時會去右流的狀態中進行 Match,如果能 Match 上會將 Match 后的結果輸出到下游。這種方案存在以下兩個問題:
■ 1.2 Interval Join
Interval Join 相對于前面一種寫法,好處就是不存在狀態無法清理的問題,因為在掃描左右兩條流的數據時可以基于某一確定的窗口,過了窗口時間后,狀態是可以被清理掉的。
但是這種方案相對于第一種方案而言,數據準確性可能會更差一點,因為它對于窗口的劃分不是基于一個確定窗口,而是基于數據進行驅動,即當前數據可以 Join 的另一條流上的數據的范圍是基于當前數據所攜帶的 Eventtime 的。這種窗口劃分的語義與我們的需求還是存在一定差距的。
想象一下現有兩條速率不一致的流,以 low 和 upper 兩條邊界來限定左流可以 Join 的右流的數據范圍,在如此死板的范圍約束下,右流總會存在一些有效數據落在時間窗口 [left + low, left + upper] 之外,導致計算不夠準確。因此,最好還是按照窗口對齊的方式來劃分時間窗口,讓兩條流中 Eventtime 相同的數據落在相同的時間窗口。■ 1.3 Windowing Table-Valued Function
騰訊擴展出了 Windowing Table-Valued Function 語法,該語法可以滿足“在兩條流上針對某個時間窗口做 Join 操作或者交并差操作”的需求。在 SQL 2016 標準中就有關于這一語法的描述,同時該語法在 Calcite1.23 里面就已存在。
Windowing Table-Valued Function 語法中的 Source 可以把它整個的語義描述清楚,From 子句里面包含了 Window 定義所需要的所有信息,包括 Table Source、Eventtime Field、Window Size 等等。
從上圖的邏輯計劃可以看出,該語法在 LogiclTableScan 上加了一個叫 LogicalTableFunctionScan 的節點。另外,LogicalProject 節點(輸出節點)多了兩個字段叫作 WindowStart 和 WindowEnd,基于這兩個字段可以把數據歸納到一個確定的窗口。基于以上原理,Windowing Table-Valued Function 語法可以做到下面這些事情:
- 在單流上面,可以像現有的 Group Window 語法一樣去劃分出一個時間窗口。寫法如上圖,Window 信息全部放到 From 子句中,然后再進行 Group By。這種寫法應該更符合大眾對于時間窗口的理解,比當前 Flink SQL 中的 Group Window 的寫法更加直觀一點。我們在翻譯單流上的 Windowing Table-Valued Function 語法時做了一個討巧,即在實現這段 SQL 的物理翻譯時,并沒有去翻譯成具體的 DataStream API,而是將其邏輯執行計劃直接變換到現在的 Group Window 的邏輯執行計劃,也就是說共用了底層物理執行計劃的代碼,只是做了一個邏輯執行計劃的等價。
另外,可以對 Window 里面的數據做一些 Sort 或者 TopN 的一些輸出,因為 Windowing Table-Valued Function 語法已經提前把數據劃分進了一個個確定的窗口。如上圖所示,首先在 From 子句里面把窗口劃分好,然后 Order By 和 Limit 緊接其后,直接表達了排序和 TopN 語義。
- 在雙流上面,可以滿足“在兩條流上針對某個時間窗口做 Join 操作或者交并差操作”的原始需求。語法如上圖,首先把兩個窗口的 Window Table 構造好,然后利用 Join 關鍵字進行 Join 操作即可;交并差操作也一樣,與傳統數據庫 SQL 的交并差操作無二。
■ 1.4 實現細節
下面簡單介紹一下我們在實現 Windowing Table-Valued Function 語法時的一些細節。
1.4.1 窗口的傳播
原始的邏輯計劃翻譯方式,先基于 LogicalTableScan,然后再翻譯到 Windowing Table-Valued Function,最后再翻譯到 OrderBy Limit 子句。整個過程會存儲很多次狀態,對于性能來講會是比較大的一個消耗,因此做了如下優化,把多個 Logical Relnode 合并在一起去翻譯,這樣可以減少中間環節代碼的產生,從而提高性能。
1.4.2 時間屬性字段
可以看到 Windowing Table-Valued Function 的語法:
SELECT * FROM TABLE(TUMBLE(TABLE <data>, DESCRIPTOR(<timecol>), <size> [, <offset>]))table 不僅僅可以是一張表,還可以是一個子查詢。所以如果定義 Eventtime Field 的時候,把時間屬性和 Table Source 綁定,且 Table Source 恰好是一個子查詢,此時就無法滿足我們的需求。所以我們在實現語法的時候,把時間屬性字段跟 Table Source 解耦,反之,用戶使用物理表中的任意一個時間字段來作為時間屬性,從而產生 watermark。
1.4.3 時間水印
Watermark 的使用邏輯與在其他語法中一樣,兩條流的所有的 Input Task 的最小時間水印,決定窗口的時間水印,以此來觸發窗口計算。
1.4.4 使用約束
目前 Windowing Table-Valued Function 的使用存在一些約束。首先,兩條流的窗口類型必須是一致的,而且窗口大小也是一樣的。然后,目前還沒有實現 Session Window 相關的功能。
2. 新的窗口類型
接下來的介紹擴展出兩個新的窗口類型。
■ 2.1 Incremental Window
有如下需求,用戶希望能夠繪制一天內的 pv/uv 曲線,即在一天內或一個大的窗口內,輸出多次結果,而非等窗口結束之后統一輸出一次結果。針對該需求, 我們擴展出了 Incremental Window。
2.1.1 多次觸發
基于 Tumble Window,自定義了 Incremental Trigger。該觸發器確保,不僅僅是在 Windows 結束之后才去觸發窗口計算,而是每個 SQL 中所定義的 Interval 周期都會觸發一次窗口計算。
如上圖中的 SQL 案例,總的窗口大小是一秒,且每 0.2 秒觸發一次,所以在窗口內會觸發 5 次窗口計算。且下一次的輸出結果是基于上一次結果進行累計計算。
2.1.2 Lazy Trigger
針對 Incremental Window 做了一個名為 Lazy Trigger 的優化。在實際的生產過程中,一個窗口相同 Key 值在多次觸發窗口計算后輸出的結果是一樣的。對于下游來講,對于這種數據是沒必要去重復接收的。因此,如果配置了 Lazy Trigger 的話,且在同一個窗口的同一個 Key 下,下一次輸出的值跟上一次的是一模一樣的,下游就不會接收到這次的更新數據,由此減少下游的存儲壓力和并發壓力。
■ 2.2 Enhanced Tumble Window
有如下需求,用戶希望在 Tumble Window 觸發之后,不去丟棄遲到的數據,而是再次觸發窗口計算。如果使用 DataStream API,使用 SideOutput 就可以完成需求。但是對于 SQL,目前是沒辦法做到的。因此,擴展了現有的 Tumble Window,把遲到的數據也收集起來,同時遲到的數據并不是每來一條就重新觸發窗口計算并向下游輸出,而是會重新定義一個 Trigger,Trigger 的時間間隔使用 SQL 中定義的窗口大小,以此減少向下游發送數據的頻率。
同時,側輸出流在累計數據的時候也會使用 Window 的邏輯再做一次聚合。這里需要注意,如果下游是類似于HBase這樣的數據源,對于相同的 Window 相同的 Key,前一條正常被窗口觸發的數據會被遲到的數據覆蓋掉。理論上,遲到的數據跟正常窗口觸發的數據的重要性是一樣的,不能相互覆蓋。最后,下游會將收到的同一個窗口同一個 Key 下的正常數據和延遲數據再做一次二次聚合。
三、回撤流優化
接下來介紹一下在回撤流上所做的一些優化。
1. 流表二義性
回顧一下關于在 Flink SQL 中關于回撤流的一些概念。
首先介紹一下持續查詢(Continuous Query),相對于批處理一次執行輸出一次結果的特點,流的聚合是上游來一條數據,下游的話就會接收一條更新的數據,即結果是不斷被上游的數據所更新的。因此,對于同一個 Key 下游能夠接收到多條更新結果。
2. 回撤流
以上圖的 SQL 為例,當第二條 Java 到達聚合算子時,會去更新第一條 Java 所產生的狀態并把結果發送到下游。如果下游對于多次更新的結果不做任何處理,就會產生錯誤的結果。針對這種場景,Flink SQL 引入了回撤流的概念。
所謂回撤流的話,就是在原始數據前加了一個標識位,以 True/False 進行標識。如果標識位是 False,就表示這是一條回撤消息,它通知下游對這條數據做 Delete 操作;如果標識位是 True,下游直接會做 Insert 操作。
■ 2.1 什么時候產生回撤流
目前,Flink SQL 里面產生回撤流有以下四種場景:
- Aggregate Without Window(不帶 Window 的聚合場景)
- Rank
- Over Window
- Left/Right/Full Outer Join
解釋一下 Outer Join 為什么會產生回撤。以 Left Outer Join 為例,且假設左流的數據比右流的數據先到,左流的數據會去掃描右流數據的狀態,如果找不到可以 Join 的數據,左流并不知道右流中是確實不存在這條數據還是說右流中的相應數據遲到了。為了滿足 Outer join 的語義的話,左邊流數據還是會產生一條 Join 數據發送到下游,類似于 MySQL Left Join,左流的字段以正常的表字段值填充,右流的相應字段以 Null 填充,然后輸出到下游,如下圖所示:
(圖片來源于云棲社區)
后期如果右流的相應數據到達,會去掃描左流的狀態再次進行 Join,此時,為了保證語義的正確性,需要把前面已經輸出到下游的這條特殊的數據進行回撤,同時會把最新 Join 上的數據輸出到下游。注意,對于相同的 Key,如果產生了一次回撤,是不會再產生第二次回撤的,因為如果后期再有該 Key 的數據到達,是可以 Join 上另一條流上相應的數據的。
■ 2.2 如何處理回撤消息
下面介紹 Flink 中處理回撤消息的邏輯。
對于中間計算節點,通過上圖中的 4 個標志位來控制,這些標識位表示當前節點是產生 Update 信息還是產生 Retract 信息,以及當前節點是否會消費這個 Retract 信息。這 4 個標識位能夠決定整個關于 Retract 的產生和處理的邏輯。
對于 Sink 節點,目前 Flink 中有三種 sink 類型,AppendStreamTableSink、RetractStreamTableSink 和 UpsertStreamTableSink。AppendStreamTableSink 接收的上游數據是一條 Retract 信息的話會直接報錯的,因為它只能描述 Append-Only 語義;RetractStreamTableSink 則可以處理 Retract 信息,如果上游算子發送一個 Retract 信息過來,它會對消息做 Delete 操作,如果上游算子發送的是正常的更新信息,它會對消息做 Insert 操作;UpsertStreamTableSink 可以理解為對于RetractStreamTableSink 做了一些性能的優化。如果 Sink 數據源支持冪等操作,或者支持按照某 key 做 Update 操作,UpsertStreamTableSink 會在 SQL 翻譯的時候把上游 Upsert Key 傳到 Table Sink 里面,然后基于該 Key 去做 Update 操作。
■ 2.3 相關優化
我們基于回撤流做以下優化。
2.3.1 中間節點的優化
產生回撤信息最根本的一個原因是不斷地向下游多次發送更新結果,因此,為了減少更新的頻率并降低并發,可以把更新結果累計一部分之后再發送出去。如上圖所示:
- 第一個場景是一個嵌套 AGG 的場景(例如兩次 Count操作),在第一層 Group By 嘗試將更新結果發送到下游時候會先做一個 Cache,從而減少向下游發送數據頻率。當達到了 Cache 的觸發條件時,再把更新結果發送到下游。
- 第二個場景是 Outer Join,前面提到,Outer Join 產生回撤消息是因為左右兩邊數據的速率不匹配。以 Left Outer Join 為例,可以把左流的數據進行 Cache。左流數據到達時會去右流的狀態里面查找,如果能找到可以與之 Join的數據則不作緩存;如果找不到相應數據,則對這條 Key 的數據先做緩存,當到達某些觸發條件時,再去右流狀態中查找一次,如果仍然找不到相應數據,再去向下游發送一條包含 Null 值的 Join 數據,之后右流相應數據到達就會將 Cache 中該 Key 對應的緩存清空,并向下游發送一條回撤消息。
以此來減小向下游發送回撤消息的頻率。
2.3.2 Sink 節點的優化
針對 Sink 節點做了一些優化,在 AGG 節點和 Sink 節點之間做了一個 Cache,以此減輕 Sink 節點的壓力。當回撤消息在 Cache 中再做聚合,當達到 Cache 的觸發條件時,統一將更新后的數據發送到 Sink 節點。以下圖中的 SQL 為例:
參考優化前后的輸出結果可以看到,優化后下游接收到的數據量是有減少的,例如用戶 Sam,當回撤消息嘗試發送到下游時,先做一層 Cache,下游接收到的數據量可以減少很多。
四、未來規劃
下面介紹一下我們團隊后續的工作規劃:
- Cost-Based Optimization:現在 Flink SQL 的邏輯執行計劃的優化還是基于RBO(Rule Based Optimization)的方式。我們團隊想基于 CBO 所做一些事,主要的工作還是統計信息的收集。統計信息不僅僅來自 Flink SQL 本身,可能還會來自公司內其他產品,例如元數據,不同 Key 所對應的數據分布,或者其他數據分析結果。通過跟公司內其他產品打通,拿到最準的統計數據,產生最優的執行計劃。
- More New Features(CEP Syntax etc.):基于 Flink SQL 定義一些 CEP 的語法,以滿足用戶關于 CEP 的一些需求。
- Continuous Performance Optimization(Join Operator etc.):我們團隊在做的不僅僅是執行計劃層的優化,也在做 Join Operator 或者說數據 Shuffle 的一些細粒度的優化。
- Easier To Debug:最后是關于 Flink SQL任務的調試和定位。目前 Flink SQL在這方面是比較欠缺的,特別是線上關于數據對不齊的問題,排查起來非常的棘手。我們目前的思路是通過配置的方式,讓 SQL 在執行的過程中吐出一些 Trace 信息或者一些 Metrics 信息,然后發送到其他平臺。通過這些 Trace 信息和 Metric 信息,幫助用戶定位出問題的算子。
原文鏈接:https://developer.aliyun.com/article/781670?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的腾讯基于 Flink SQL 的功能扩展与深度优化实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ARMS为深绘智能系统保驾护航
- 下一篇: “云原生”为什么对云计算生态充满吸引力?