Flink SQL 在字节跳动的优化与实践
整理 | Aven (Flink 社區志愿者)
摘要:本文由 Apache Flink Committer,字節跳動架構研發工程師李本超分享,以四個章節來介紹 Flink 在字節的應用實戰。 內容如下:
- 整體介紹
- 實踐優化
- 流批一體
- 未來規劃
一、整體介紹
2018 年 12 月 Blink 宣布開源,經歷了約一年的時間 Flink 1.9 于 2019 年 8 月 22 發布。在 Flink 1.9 發布之前字節跳動內部基于 master 分支進行內部的 SQL 平臺構建。經歷了 2~3 個月的時間字節內部在 19 年 10 月份發布了基于 Flink 1.9 的 Blink planner 構建的 Streaming SQL 平臺,并進行內部推廣。在這個過程中發現了一些比較有意思的需求場景,以及一些較為奇怪的 BUG。
基于 1.9 的 Flink SQL 擴展
雖然最新的 Flink 版本已經支持 SQL 的 DDL,但 Flink 1.9 并不支持。字節內部基于 Flink 1.9 進行了 DDL 的擴展支持以下語法:
- create table
- create view
- create function
- add resource
同時 Flink 1.9 版本不支持的 watermark 定義在 DDL 擴展后也支持了。
我們在推薦大家盡量的去用 SQL 表達作業時收到很多“SQL 無法表達復雜的業務邏輯”的反饋。時間久了發現其實很多用戶所謂的復雜業務邏輯有的是做一些外部的 RPC 調用,字節內部針對這個場景做了一個 RPC 的維表和 sink,讓用戶可以去讀寫 RPC 服務,極大的擴展了 SQL 的使用場景,包括 FaaS 其實跟 RPC 也是類似的。在字節內部添加了 Redis/Abase/Bytable/ByteSQL/RPC/FaaS 等維表的支持。
同時還實現了多個內部使用的 connectors:
RocketMQ/ClickHouse/Doris/LogHouse/Redis/Abase/Bytable/ByteSQL/RPC/Print/Metrics
并且為 connector 開發了配套的 format:PB/Binlog/Bytes。
在線的界面化 SQL 平臺
除了對 Flink 本身功能的擴展,字節內部也上線了一個 SQL 平臺,支持以下功能:
- SQL 編輯
- SQL 解析
- SQL 調試
- 自定義 UDF 和 Connector
- 版本控制
- 任務管理
二、實踐優化
除了對功能的擴展,針對 Flink 1.9 SQL 的不足之處也做了一些優化。
Window 性能優化
1、支持了 window Mini-Batch
Mini-Batch 是 Blink planner 的一個比較有特色的功能,其主要思想是積攢一批數據,再進行一次狀態訪問,達到減少訪問狀態的次數降低序列化反序列化的開銷。這個優化主要是在 RocksDB 的場景。如果是 Heap 狀態 Mini-Batch 并沒什么優化。在一些典型的業務場景中,得到的反饋是能減少 20~30% 左右的 CPU 開銷。
2、擴展 window 類型
目前 SQL 中的三種內置 window,滾動窗口、滑動窗口、session 窗口,這三種語意的窗口無法滿足一些用戶場景的需求。比如在直播的場景,分析師想統計一個主播在開播之后,每一個小時的 UV(Unique Visitor)、GMV(Gross Merchandise Volume) 等指標。自然的滾動窗口的劃分方式并不能夠滿足用戶的需求,字節內部就做了一些定制的窗口來滿足用戶的一些共性需求。
-- my_window 為自定義的窗口,滿足特定的劃分方式 SELECT room_id, COUNT(DISTINCT user_id) FROM MySource GROUP BY room_id, my_window(ts, INTERVAL '1' HOURS)3、window offset
這是一個較為通用的功能,在 Datastream API 層是支持的,但 SQL 中并沒有。這里有個比較有意思的場景,用戶想要開一周的窗口,一周的窗口變成了從周四開始的非自然周。因為誰也不會想到 1970 年 1 月 1 號那天居然是周四。在加入了 offset 的支持后就可以支持正確的自然周窗口。
SELECT room_id, COUNT(DISTINCT user_id) FROM MySource GROUP BY room_id, TUMBLE(ts, INTERVAL '7' DAY, INTERVAL '3', DAY)維表優化
1、延遲 Join
維表 Join 的場景下因為維表經常發生變化尤其是新增維度,而 Join 操作發生在維度新增之前,經常導致關聯不上。
所以用戶希望如果 Join 不到,則暫時將數據緩存起來之后再進行嘗試,并且可以控制嘗試次數,能夠自定義延遲 Join 的規則。這個需求場景不單單在字節內部,社區的很多同學也有類似的需求。
基于上面的場景實現了延遲 Join 功能,添加了一個可以支持延遲 Join 維表的算子。當 Join 沒有命中,local cache 不會緩存空的結果,同時將數據暫時保存在一個狀態中,之后根據設置定時器以及它的重試次數進行重試。
2、維表 Keyby 功能
通過拓撲我們發現 Cacl 算子和 lookUpJoin 算子是 chain 在一起的。因為它沒有一個 key 的語義。
當作業并行度比較大,每一個維表 Join 的 subtask,訪問的是所有的緩存空間,這樣對緩存來說有很大的壓力。
但觀察 Join 的 SQL,等值 Join 是天然具有 Hash 屬性的。直接開放了配置,運行用戶直接把維表 Join 的 key 作為 Hash 的條件,將數據進行分區。這樣就能保證下游每一個算子的 subtask 之間的訪問空間是獨立的,這樣可以大大的提升開始的緩存命中率。
除了以上的優化,還有兩點目前正在開發的維表優化。
1、廣播維表:有些場景下維表比較小,而且更新不頻繁,但作業的 QPS 特別高。如果依然訪問外部系統進行 Join,那么壓力會非常大。并且當作業 Failover 的時候 local cache 會全部失效,進而又對外部系統造成很大訪問壓力。那么改進的方案是定期全量 scan 維表,通過Join key hash 的方式發送到下游,更新每個維表 subtask 的緩存。
2、Mini-Batch:主要針對一些 I/O 請求比較高,系統又支持 batch 請求的能力,比如說 RPC、HBase、Redis 等。以往的方式都是逐條的請求,且 Async I/O 只能解決 I/O 延遲的問題,并不能解決訪問量的問題。通過實現 Mini-Batch 版本的維表算子,大量降低維表關聯訪問外部存儲次數。
Join 優化
目前 Flink 支持的三種 Join 方式;分別是 Interval Join、Regular Join、Temporal Table Function。
前兩種語義是一樣的流和流 Join。而 Temporal Table 是流和表的的 Join,右邊的流會以主鍵的形式形成一張表,左邊的流去 Join 這張表,這樣一次 Join 只能有一條數據參與并且只返回一個結果。而不是有多少條都能 Join 到。
它們之間的區別列了幾點:
可以看到三種 Join 方式都有它本身的一些缺陷。
- 不支持 DDl
- 不支持 out join 的語義 (FLINK-7865 的限制)
- 右側數據斷流導致 watermark 不更新,下游無法正確計算 (FLINK-18934)
對于以上的不足之處字節內部都做了對應的修改。
增強 Checkpoint 恢復能力
對于 SQL 作業來說一旦發生條件變化都很難從 checkpoint 中恢復。
SQL 作業確實從 checkpoint 恢復的能力比較弱,因為有時候做一些看起來不太影響 checkpoint 的修改,它仍然無法恢復。無法恢復主要有兩點;
- 第一點:operate ID 是自動生成的,然后因為某些原因導致它生成的 ID 改變了。
- 第二點:算子的計算的邏輯發生了改變,即算子內部的狀態的定義發生了變化。
例子1:并行度發生修改導致無法恢復。
source 是一個最常見的有狀態的算子,source 如果和之后的算子的 operator chain 邏輯發生了改變,是完全無法恢復的。
下圖左上是正常的社區版的作業會產生的一個邏輯, source 和后面的并行度一樣的算子會被 chain 在一起,用戶是無法去改變的。但算子并行度是常會會發生修改,比如說 source 由原來的 100 修改為 50,cacl 的并發是 100。此時 chain 的邏輯就會發生變化。
針對這種情況,字節內部做了修改,允許用戶去配置,即使 source 的并行度跟后面整體的作業的并行度是一樣的,也讓其不與之后的算子 chain 在一起。
例子2:DAG 改變導致無法恢復。
這是一種比較特殊的情況,有一條 SQL (上圖),可以看到 source 沒有發生變化,之后的三個聚合互相之間沒有關系,狀態竟然也是無法恢復。
作業之所以無法恢復,是因為 operator ID 生成規則導致的。目前 SQL 中 operator ID 的生成的規則與上游、本身配置以及下游可以 chain 在一起的算子的數量都有關系。 因為新增指標,會導致新增一個 Calc 的下游節點,進而導致 operator ID 發生變化。
為了處理這種情況,支持了一種特殊的配置模式,允許用戶配置生成 operator ID 的時候可以忽略下游 chain 在一起算子數量的條件。
例子3:新增聚合指標導致無法恢復
這塊是用戶訴求最大的,也是最復雜的部分。用戶期望新增一些聚合指標后,原來的指標要能從 checkpoint 中恢復。
可以看到圖中左部分是 SQL 生成的算子邏輯。count,sum,sum,count,distinct 會以一個 BaseRow 的結構存儲在 ValueState 中。distinct 比較特殊一些,還會單獨存儲在一個 MapState 中。
這導致了如新增或者減少指標,都會使原先的狀態沒辦法從 ValueState 中正常恢復,因為 VauleState 中存儲的狀態 “schema” 和新的(修改指標后)的 “schema”不匹配,無法正常反序列化。
在討論解決方案之前,我們先回顧一下正常的恢復流。先從 checkpoint 中恢復出狀態的 serializer,再通過 serializer 把狀態恢復。接下來 operator 去注冊新的狀態定義,新的狀態定義會和原先的狀態定義進行一個兼容性對比,如果是兼容則狀態恢復成功,如果不兼容則拋出異常任務失敗。
不兼容的另一種處理情況是允許返回一個 migration(實現兩個不匹配類型的狀態恢復)那么也可以恢復成功。
針對上面的流程做出對應的修改:
通過以上的修改基本就可以做到正常的,新增的聚合指標從拆開的方案恢復。
三、流批一體探索
業務現狀
字節跳動內部對流批一體和業務推廣之前,技術團隊提前做了大量技術方面的探索。整體判斷是 SQL 這一層是可以做到流批一體的語義,但實踐中卻又發現不少不同。
比如說流計算的 session window,或是基于處理時間的 window,在批計算中無法做到。同時 SQL 在批計算中一些復雜的 over window,在流計算中也沒有對應的實現。
但這些特別的場景可能只占 10% 甚至更少,所以用 SQL 去落實流批一體是可行的。
流批一體
這張圖是比較常見的和大多數公司里的架構都類似。這種架構有什么缺陷呢?
鑒于上面的問題,提出了基于 Flink 的流批一體架構來解決。
基于 Flink 實現的流批一體架構:
業務收益
四、未來工作和規劃
優化 retract 放大問題
什么是 retract 放大?
上圖有 4 張表,第一張表進行去重操作 (Dedup),之后分別和另外三張表做 Join。邏輯比較簡單,表 A 輸入(A1),最后產出 (A1,B1,C1,D1) 的結果。
當表 A 輸入一個 A2,因為 Dedup 算子,導致數據需要去重,則向下游發送一個撤回 A1 的操作 -(A1) 和一個新增 A2 的操作 +(A2)。第一個 Join 算子收到 -(A1) 后會將 -(A1) 變成 -(A1,B1) 和 +(null,B1)(為了保持它認為的正確語義) 發送到下游。之后又收到了 +(A2) ,則又向下游發送 -(null,B1) 和 +(A2,B1) 這樣操作就放大了兩倍。再經由下游的算子操作會一直被放大,到最終的 sink 輸出可能會被放大 1000 倍之多。
如何解決?
將原先 retract 的兩條數據變成一條 changelog 的格式數據,在算子之間傳遞。算子接收到 changelog 后處理變更,然后僅僅向下游發送一個變更 changelog 即可。
未來規劃
1.功能優化
- 支持所有類型聚合指標變更的 checkpoint 恢復能力
- window local-global
- 事件時間的 Fast Emit
- 廣播維表
- 更多算子的 Mini-Batch 支持:維表,TopN,Join 等
- 全面兼容 Hive SQL 語法
2.業務擴展
- 進一步推動流式 SQL 達到 80%
- 探索落地流批一體產品形態
- 推動實時數倉標準化
原文鏈接:https://developer.aliyun.com/article/781455?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink SQL 在字节跳动的优化与实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 核桃编程 | 前端可观测性建设之路
- 下一篇: 你不知道的CDN圈内黑话有哪些?