滴滴 Flink-1.10 升级之路
作者|Alan
導(dǎo)讀:滴滴實時計算引擎從 Flink-1.4 無縫升級到 Flink-1.10 版本,做到了完全對用戶透明。并且在新版本的指標、調(diào)度、SQL 引擎等進行了一些優(yōu)化,在性能和易用性上相較舊版本都有很大提升。
這篇文章介紹了我們升級過程中遇到的困難和思考,希望能給大家?guī)韱l(fā)。
一、 背景
在本次升級之前,我們使用的主要版本為 Flink-1.4.2,并且在社區(qū)版本上進行了一些增強,提供了 StreamSQL 和低階 API 兩種服務(wù)形式?,F(xiàn)有集群規(guī)模達到了 1500 臺物理機,運行任務(wù)數(shù)超過 12000 ,日均處理數(shù)據(jù) 3 萬億條左右。
不過隨著社區(qū)的發(fā)展,尤其是 Blink 合入 master 后有很多功能和架構(gòu)上的升級,我們希望能通過版本升級提供更好的流計算服務(wù)。今年 2 月份,里程碑版本 Flink-1.10 發(fā)布,我們開始在新版上上進行開發(fā)工作,踏上了充滿挑戰(zhàn)的升級之路。
二、 Flink-1.10 新特性
作為 Flink 社區(qū)至今為止的最大的一次版本升級,加入的新特性解決了之前遇到很多的痛點。
1. 原生 DDL 語法與 Catalog 支持
Flink SQL 原生支持了 DDL 語法,比如 CREATE TABLE/CREATE FUNCTION,可以使用 SQL 進行元數(shù)據(jù)的注冊,而不需要使用代碼的方式。
也提供了 Catalog 的支持,默認使用 InMemoryCatalog 將信息臨時保存在內(nèi)存中,同時也提供了 HiveCatalog 可以與 HiveMetastore 進行集成。也可以通過自己拓展 Catalog 接口實現(xiàn)自定義的元數(shù)據(jù)管理。
2.Flink SQL 的增強
- 基于 ROW_NUMBER 實現(xiàn)的 TopN 和去重語法,拓展了 StreamSQL 的使用場景。
- 實現(xiàn)了 BinaryRow 類型作為內(nèi)部數(shù)據(jù)交互,將數(shù)據(jù)直接以二進制的方式構(gòu)建而不是對象數(shù)組,比如使用一條數(shù)據(jù)中的某個字段時,可以只反序列其中部分數(shù)據(jù),減少了不必要的序列化開銷。
- 新增了大量內(nèi)置函數(shù),例如字符串處理、FIRST/LAST_VALUE 等等,由于不需要轉(zhuǎn)換為外部類型,相較于自定義函數(shù)效率更高。
- 增加了 MiniBatch 優(yōu)化,通過微批的處理方式提升任務(wù)的吞吐
3.內(nèi)存配置優(yōu)化
之前對 Flink 內(nèi)存的管理一直是一個比較頭疼的問題,尤其是在使用 RocksDB 時,因為一個 TaskManager 中可能存在多個 RocksDB 實例,不好估算內(nèi)存使用量,就導(dǎo)致經(jīng)常發(fā)生內(nèi)存超過限制被殺。
在新版上增加了一些內(nèi)存配置,例如 state.backend.rocksdb.memory.fixed-per-slot 可以輕松限制每個 slot的RocksDB 內(nèi)存的使用上限,避免了 OOM 的風(fēng)險。
三、挑戰(zhàn)與應(yīng)對
本次升級最大的挑戰(zhàn)是,如何保證 StreamSQL 的兼容性。StreamSQL 的目的就是為了對用戶屏蔽底層細節(jié),能夠更加專注業(yè)務(wù)邏輯,而我們可以通過版本升級甚至更換引擎來提供更好的服務(wù)。保證任務(wù)的平滑升級是最基本的要求。
1. 內(nèi)部 patch 如何兼容
由于跨越多個版本架構(gòu)差距巨大,內(nèi)部 patch 基本無法直接合入,需要在新版本上重新實現(xiàn)。我們首先整理了所有的歷史 commit,篩選出那些必要的修改并且在新版上進行重新實現(xiàn),目的是能覆蓋已有的所有功能,確保新版本能支持現(xiàn)有的所有任務(wù)需求。
例如:
- 新增或修改 Connectors 以支持公司內(nèi)部需要,例如 DDMQ(滴滴開源消息隊列產(chǎn)品),權(quán)限認證功能等。
- 新增 Formats 實現(xiàn),例如 binlog,內(nèi)部日志采集格式的解析等。
- 增加 ADD JAR 語法,可以在 SQL 任務(wù)中引用外部依賴,比如 UDF JAR,自定義 Source/Sink。
- 增加 SET 語法,可以在 SQL 中設(shè)置 TableConfig,指導(dǎo)執(zhí)行計劃的生成
2. StreamSQL 語法兼容
社區(qū)在 1.4 版本時,FlinkSQL還處于比較初始的階段,也沒有原生的 DDL 語法支持,我們使用 Antlr 實現(xiàn)了一套自定義的 DDL 語法。但是在 Flink1.10 版本上,社區(qū)已經(jīng)提供了原生的 DDL 支持,而且與我們內(nèi)部的語法差別較大。現(xiàn)在擺在我們面前有幾條路可以選擇:
- 放棄內(nèi)部語法的支持,修改全部任務(wù)至新語法。(違背了平滑遷移的初衷,而且對已有用戶學(xué)習(xí)成本高)
- 修改 Flink 內(nèi)語法解析的模塊(sql-parser),支持對內(nèi)部語法的解析。(實現(xiàn)較為復(fù)雜,且不利于后續(xù)的版本升級)
- 在 sql-parser 之上封裝一層語法轉(zhuǎn)換層,將原本的 SQL 解析提取有效信息后,通過字符串拼接的方式組織成社區(qū)語法再運行。
最終我們選用了第三種方案,這樣可以最大限度的減少和引擎的耦合,作為插件運行,未來再有引擎升級完全可以復(fù)用現(xiàn)有的邏輯,能夠降低很多的開發(fā)成本。
例如:我們在舊版本上使用 "json-path" 的庫實現(xiàn)了 json 解析,通過在建表語句里定義類似 $.status 的表達式表示如何提取此字段。
新版本上原生的 json 類型解析可以使用 ROW 類型來表示嵌套結(jié)構(gòu),在轉(zhuǎn)換為新語法的過程中,將原本的表達是解析為樹并構(gòu)建出新的字段類型,再使用計算列的方式提取出原始表中的字段,確保表結(jié)構(gòu)與之前一致。類型名稱、配置屬性也通過映射轉(zhuǎn)換為社區(qū)語法。
3. 兼容性測試
最后是測試階段,需要進行完善的測試確保所有任務(wù)都能做到平滑升級。我們原本的計劃是準備進行回歸測試,對已有的所有任務(wù)替換配置后進行回放,但是在實際操作中有很多問題:
- 測試流程過長,一次運行可能需要數(shù)個小時。
- 出現(xiàn)問題時不好定位,可能發(fā)生在任務(wù)的整個生命周期的任何階段。
- 無法驗證計算結(jié)果,即新舊版本語義是否一致
所以我們按任務(wù)的提交流程分成多個階段進行測試,只有在當(dāng)前階段能夠全部測試通過后后進入下一個階段測試,提前發(fā)現(xiàn)問題,將問題定位范圍縮小到當(dāng)前階段,提高測試效率。
- 轉(zhuǎn)換測試:對所有任務(wù)進行轉(zhuǎn)換,測試結(jié)果符合預(yù)期,抽象典型場景為單元測試。
- 編譯測試:確保所有任務(wù)可以通過 TablePlanner 生成執(zhí)行計劃,在編譯成 JobGraph,真正提交運行前結(jié)束。
- 回歸測試:在測試環(huán)境對任務(wù)替換配置后進行回放,確認任務(wù)可以提交運行
- 對照測試:對采樣數(shù)據(jù)以文件的形式提交至新舊兩個版本中運行,對比結(jié)果是否完全一致(因為部分任務(wù)結(jié)果不具有確定性,所以使用舊版本連續(xù)運行 2 次,篩選出確定性任務(wù),作為測試用例)
四、引擎增強
除了對舊版本的兼容,我們也結(jié)合了新版本的特性,對引擎進行了增強。
1. Task-Load 指標
我們一直希望能精確衡量任務(wù)的負載狀況,使用反壓指標指標只能粗略的判斷任務(wù)的資源夠或者不夠。
結(jié)合新版的 Mailbox 線程模型,所有互斥操作全部運行在 TaskThread 中,只需統(tǒng)計出線程的占用時間,就可以精確計算任務(wù)負載的百分比。
未來可以使用指標進行任務(wù)的資源推薦,讓任務(wù)負載維持在一個比較健康的水平。
2. SubTask 均衡調(diào)度
在 FLIP-6 后,Flink 修改了資源調(diào)度模型,移除了--container 參數(shù),slot 按需申請確保不會有閑置資源。但是這也導(dǎo)致了一個問題,Source 的并發(fā)數(shù)常常是小于最大并發(fā)數(shù)的,而 SubTask 調(diào)度是按 DAG 的拓撲順序調(diào)度,這樣 SourceTask 就會集中在某些 TaskManager 中導(dǎo)致熱點。
我們加入了"最小 slot 數(shù)"的配置,保證在 Flink session 啟動后立即申請相應(yīng)數(shù)量的 slot,且閑置時也不主動退出,搭配 cluster.evenly-spread-out-slots 參數(shù)可以保證在 slot 數(shù)充足的情況下,SubTask 會均勻分布在所有的 TaskManager 上。
3. 窗口函數(shù)增強
以滾動窗口為例 TUMBLE(time_attr, INTERVAL '1' DAY),窗口為一天時開始和結(jié)束時間固定為每天 0 點 -24 點,無法做到生產(chǎn)每天 12 點-次日 12 點的窗口。
對于代碼可以通過指定偏移量實現(xiàn),但是 SQL 目前還未實現(xiàn),通過增加參數(shù) TUMBLE(time_attr, INTERVAL '1' DAY, TIME '12:00:00') 表示偏移時間為 12 小時。
還有另外一種場景,比如統(tǒng)計一天的 UV,同時希望展示當(dāng)前時刻的計算結(jié)果,例如每分鐘觸發(fā)窗口計算。對于代碼開發(fā)的方式可以通過自定義 Trigger 的方式?jīng)Q定窗口的觸發(fā)邏輯,而且 Flink 也內(nèi)置了一些 Tigger 實現(xiàn),比如 ContinuousTimeTrigger 就很適合這種場景。所以我們又在窗口函數(shù)里增加了一種可選參數(shù),代表窗口的觸發(fā)周期,TUMBLE(time_attr, INTERVAL '1' DAY, INTERVAL '1' MINUTES) 。
通過增加 offset 和 tiggger 周期參數(shù)(TUMBLE(time_attr, size[,offset_time][,trigger_interval])),拓展了 SQL 中窗口的使用場景,類似上面的場景可以直接使用 SQL 開發(fā)而不需要使用代碼的方式。
4. RexCall 結(jié)果復(fù)用
在很多 SQL 的使用場景里,會多次使用上一個計算結(jié)果,比如將 JSON 解析成 Map 并提取多個字段 。
雖然通過子查詢,看起來 json 解析只調(diào)用一次,但是經(jīng)過引擎的優(yōu)化后,通過結(jié)果表的投影 (Projection) 生成函數(shù)調(diào)用鏈 (RexCall),結(jié)果類似:
這樣會導(dǎo)致 json 解析的計算重復(fù)運行了3次,即使使用視圖分割成兩步操作,經(jīng)過 Planner 的優(yōu)化一樣會變成上邊的樣子。
對于確定性 (isDeterministic=true) 的函數(shù)來說,相同的輸入一定代表相同的結(jié)果,重復(fù)執(zhí)行 3 次 json 解析其實是沒有意義的,如何優(yōu)化才能實現(xiàn)對函數(shù)結(jié)果的復(fù)用呢?
在代碼生成時,將 RexCall 生成的唯一標識(Digest)和變量符號的映射保存在 CodeGenContext 中,如果遇到 Digest 相同的函數(shù)調(diào)用,則可以復(fù)用已經(jīng)存在的結(jié)果變量,這樣解析 JSON 只需要執(zhí)行第一次,之后就可以復(fù)用第一次的結(jié)果。
五、總結(jié)
通過幾個月的努力,新版本已經(jīng)上線運行,并且作為 StreamSQL 的默認引擎,任務(wù)重啟后直接使用新版本運行。兼容性測試的通過率達到 99.9%,可以基本做到對用戶的透明升級。對于新接觸 StreamSQL 用戶可以使用社區(qū) SQL 語法進行開發(fā),已有任務(wù)也可以修改 DML 部分語句來使用新特性?,F(xiàn)在新版本已經(jīng)支持了公司內(nèi)許多業(yè)務(wù)場景,例如公司實時數(shù)據(jù)倉庫團隊依托于新版本更強的表達能力和性能,承接了多種多樣的數(shù)據(jù)需求做到穩(wěn)定運行且與離線口徑保持一致。
版本升級不是我們的終點,隨著實時計算的發(fā)展,公司內(nèi)也有越來越多團隊需要使用 Flink 引擎, 也向我們提出了更多的挑戰(zhàn),例如與 Hive 的整合做到將結(jié)果直接寫入 Hive 或直接使用 Flink 作為批處理引擎,這些也是我們探索和發(fā)展的方向,通過不斷的迭代向用戶提供更加簡單好用的流計算服務(wù)。
原文鏈接:https://developer.aliyun.com/article/781604?
版權(quán)聲明:本文內(nèi)容由阿里云實名注冊用戶自發(fā)貢獻,版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識產(chǎn)權(quán)保護指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進行舉報,一經(jīng)查實,本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的滴滴 Flink-1.10 升级之路的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 云效故障定位研究论文被ICSE 2021
- 下一篇: 回看2020-数据库大讲堂