快手基于 Flink 的持续优化与实践
本文由快手實(shí)時(shí)計(jì)算負(fù)責(zé)人董亭亭分享,主要介紹快手基于 Flink 的持續(xù)優(yōu)化與實(shí)踐的介紹。內(nèi)容包括:
一、Flink 穩(wěn)定性持續(xù)優(yōu)化
第一部分是 Flink 穩(wěn)定性的持續(xù)優(yōu)化。該部分包括兩個(gè)方面,第一個(gè)方面,主要介紹快手在 Flink Kafka Connector 方面做的一些高可用,是基于內(nèi)部的雙機(jī)房讀或雙機(jī)房寫和一些容錯(cuò)的策略。第二部分關(guān)于 Flink 任務(wù)的故障恢復(fù)。我們在加速故障恢復(fù)方面做了一些優(yōu)化工作。
首先,介紹 Source 方面的高可用。在公司內(nèi)部比較重要的數(shù)據(jù)寫 Kafka 時(shí),Kafka 層面為保障高可用一般都會(huì)創(chuàng)建雙集群的 topic。雙集群的 topic 共同承擔(dān)全部流量,如果單集群發(fā)生故障,上游自動(dòng)分流。Kafka 層面通過這種方式做到雙集群的高可用。但是 Flink 任務(wù)在消費(fèi)雙集群 topic 時(shí),本身是不能做到高可用的。Flink 任務(wù)通過兩個(gè) Source union 方式消費(fèi),Source 分別感知上游 topic 故障,單集群故障需手動(dòng)將故障 Source 摘除。這種方式的缺點(diǎn)是故障時(shí)需要人工的干預(yù),必須手動(dòng)去修改代碼的邏輯,程序內(nèi)部本身是不能做到高可用的。這是做雙機(jī)房讀的背景。
為了解決上述問題,我們封裝了一個(gè) Kafka 的 Cluster Source,它在 API 上支持讀取雙集群的 topic。同時(shí)做到,可以容忍單集群故障,集群故障恢復(fù)時(shí)也可以自動(dòng)將故障集群重新加入。
接下來是關(guān)于 Sink 方面的高可用。Flink 寫雙集群 Kafka topic,會(huì)定義不同集群 Sink,邏輯內(nèi)控制拆流。這種方式靈活性差,且不能容忍單機(jī)房故障。如果單集群發(fā)生故障,仍需要手動(dòng)摘除對應(yīng)的 Sink。
同樣,針對 sink 我們也定制了一個(gè) Cluster Sink,它 API 上支持寫雙集群 topic。具體寫的策略,可以支持輪詢和主從寫的方式。如果單集群發(fā)生故障,邏輯內(nèi)會(huì)自動(dòng)將流量切到正常集群 topic。如果單集群故障恢復(fù)之后,也能感知到集群的恢復(fù),可以自動(dòng)的再把相應(yīng)集群恢復(fù)回來。
另外,基于 Kafka 的 connector,我們也做了一些容錯(cuò)的策略,這里提到三點(diǎn)。
- 第一點(diǎn)就是 Kafka Sink 容忍丟失。該問題的背景是,如果 Kafka 服務(wù)異常引發(fā)任務(wù)失敗,并且業(yè)務(wù)可以容忍少量數(shù)據(jù)丟失,但是不期望任務(wù)掛掉的情況。針對該問題,我們的優(yōu)化是,設(shè)置 Kafka Sink 容忍 M 時(shí)間內(nèi) X% 丟失。具體實(shí)現(xiàn)上,Sink 單 task 統(tǒng)計(jì)失敗頻率,失敗頻率超過閾值任務(wù)才失敗。
- 第二點(diǎn)是 Kafka Source 一鍵丟 lag。該問題背景是, 一旦任務(wù) lag 較長時(shí)間,未及時(shí)發(fā)現(xiàn),或者任務(wù) debug 環(huán)節(jié),需要丟掉歷史驗(yàn)證。之前只能靠重啟任務(wù)來丟棄 lag,任務(wù)重啟代碼比較好,耗時(shí)長。我們優(yōu)化后,可以熱更新、無需重啟任務(wù)即可以丟棄 lag。實(shí)現(xiàn)邏輯是動(dòng)態(tài)發(fā)操作命令給 source,source 收到命令后 seek 到最新位置。
- 第三點(diǎn)是 Kafka broker 列表動(dòng)態(tài)獲取。該問題背景是, 生產(chǎn)環(huán)境中 Kafka broker 機(jī)器可能會(huì)故障下線,一旦請求到下線機(jī)器,會(huì)發(fā)生獲取 metadata 超時(shí),任務(wù)頻繁失敗。我們優(yōu)化后,Source task 啟動(dòng),可以獲取集群信息,動(dòng)態(tài)重新獲取 Kafka brokerlist,避免頻繁重啟。
第二部分是 Flink 任務(wù)的故障恢復(fù)優(yōu)化,分為兩個(gè)過程。一個(gè)是故障發(fā)現(xiàn),另外一個(gè)是故障恢復(fù)。實(shí)際的生產(chǎn)環(huán)境中,一些不穩(wěn)定的因素會(huì)導(dǎo)致故障恢復(fù)的時(shí)間特別的長,用戶的感知會(huì)比較差。同時(shí),內(nèi)部也有一些比較高優(yōu)的任務(wù),它對穩(wěn)定性的要求比較高。我們希望做一些事情,把整個(gè)故障恢復(fù)的時(shí)間盡可能縮短。我們定了一個(gè)優(yōu)化目標(biāo),20 秒內(nèi)做到一個(gè)自動(dòng)的恢復(fù)。
在故障發(fā)現(xiàn)階段的優(yōu)化包括三點(diǎn):
- 第一,內(nèi)部自研 Hawk 系統(tǒng),5s 發(fā)現(xiàn)宕機(jī)。
- 第二,Yarn 整合 Hawk,快速感知宕機(jī)。
- 第三,Flink 感知宕機(jī) container release。
在故障恢復(fù)階段的優(yōu)化包括:
- 第一,允許冗余部分 Container。
- 第二,適當(dāng)調(diào)整 cancel task timeout 時(shí)間。
- 第三,針對適合任務(wù)開啟 Region Failover。
二、Flink 任務(wù)啟動(dòng)優(yōu)化
第二部分是任務(wù)啟動(dòng)優(yōu)化,Flink 任務(wù)啟動(dòng)的時(shí)候,一般會(huì)涉及到比較多的角色,還有多個(gè)實(shí)例。如下圖所示,它的啟動(dòng)在客戶端包括,初始化 Client,構(gòu)建 jobGraph,上傳 Flink lib、job jar,申請 AM。在 Job Master,AM 啟動(dòng)后、初始化,構(gòu)建 ExectutionGraph,申請、啟動(dòng) Container,Job Task 調(diào)度。在 Task Manager 端, 容器申請到之后,啟動(dòng)下載 jar 包資源,再去初始化 Task Manager 服務(wù),然后收到 task 后才會(huì)去做部署。我們發(fā)現(xiàn),線上啟動(dòng)一個(gè)任務(wù)的時(shí)候,基本上在分鐘級別,耗時(shí)比較長。如果有一些任務(wù)需要升級,比如說,改了一些簡單的邏輯,需要將原來的任務(wù)停掉,然后再去重新啟動(dòng)一個(gè)新的任務(wù),這種場景可能就會(huì)更慢。因此,我在任務(wù)啟動(dòng)的時(shí)候做一些優(yōu)化,盡可能縮短任務(wù)啟動(dòng)的時(shí)間,業(yè)務(wù)的斷流時(shí)間也進(jìn)一步縮短。
在 Flink 新任務(wù)啟動(dòng)優(yōu)化方面,我們發(fā)現(xiàn) IO 交互會(huì)比較耗時(shí)。在客戶端的 IO 包括,Flink 引擎 lib 包上傳 HDFS,用戶上傳 jar 包上傳 HDFS。在 JobMaster 包括, Container 下載啟動(dòng)資源,TaskManager conf 上傳 HDFS。在 TaskManager 包括, Container 下載啟動(dòng)資源,Conf 文件下載。
因此,想盡量的減少這樣的一些 lO 的操作。針對 Flink 引擎 lib 包,設(shè)置 Public 權(quán)限,App 之間共享。對于用戶 jar 包,提供工具,提前預(yù)發(fā)布到集群機(jī)器。對于 Conf 文件,通過環(huán)境變量傳遞。針對 JobMaster 啟動(dòng) TM 頻繁文件判斷,增加 cache 緩存。
以上是針對一個(gè)新任務(wù)啟動(dòng)場景,下面介紹任務(wù)升級的場景。以前是同步升級,比如說,任務(wù) A 在運(yùn)行著,然后我要把任務(wù) A 停掉,再去啟動(dòng)新的任務(wù) B。如下圖所示,不可用時(shí)間包括停任務(wù) A 和啟動(dòng)新任務(wù) B。是否可以不用等任務(wù) A 完全停掉之后,再啟動(dòng)任務(wù) B。針對這個(gè)想法我們做了一個(gè)異步升級的策略。新任務(wù)提前啟動(dòng),初始化到 JobMaster 階段。舊任務(wù)停掉后,完成新任務(wù)后續(xù)啟動(dòng)工作,這樣新舊任務(wù)無縫切換。通過內(nèi)部提交平臺(tái)將該步驟串聯(lián)起來,目標(biāo)是異步升級在 20s 以內(nèi)完成。
三、Flink SQL 實(shí)踐與優(yōu)化
第三部分會(huì)介紹一下我們在使用 Flink SQL 的一些實(shí)踐和優(yōu)化。首先介紹一下 Flink SQL 在快手的現(xiàn)狀。目前,我們內(nèi)部 Flink SQL 的任務(wù)占比在 30% 左右。Flink SQL 的任務(wù)個(gè)數(shù)是 360 多個(gè)。然后它的峰值處理的條目數(shù)還是比較高的,大約是 4億每秒。在我們內(nèi)部的一些重要活動(dòng)的實(shí)時(shí)大屏的場景下,目前 Flink SQL 也作為一條鏈路,參與了相關(guān)指標(biāo)的計(jì)算。
接下來介紹一下我們在使用 Flink SQL 的時(shí)候遇到的一些問題,以及我們做的一些優(yōu)化。首先,關(guān)于 Flink SQL 的傾斜問題,在 UnBounded Agg 場景下的傾斜問題,已經(jīng)有比較全面的思路去解決,總結(jié)為三點(diǎn)。
- 第一,MiniBatch Aggregation,思路是內(nèi)存緩存 batch 數(shù)據(jù)再進(jìn)行聚合,減少狀態(tài)訪問次數(shù)。
- 第二,Local Global Aggregation,思路是聚合操作拆分為兩階段, Local 階段預(yù)聚合減少數(shù)據(jù)條數(shù),Global 解決全局聚合。
- 第三,Split Distinct Aggregation,思路是針對 count distinct 場景, 對分組 key 先分桶預(yù)聚合, 再對分桶結(jié)果全局聚合。
所以我們解決的第一個(gè)問題就是 Bounded Agg 的傾斜問題。如下圖所示,拿左邊的 SQL 作為例子,group by一個(gè)user,假定一天的窗口,然后去 select 每一個(gè)用戶總的交易額。右邊的圖,假定有一些用戶的交易特別多,就會(huì)造成某一些 Window Agg 的數(shù)據(jù)量特別大。
解決思路分為兩點(diǎn)。
- 第一,兩階段聚合,分為 Local window Agg 和 Global window Agg。Local window Agg:預(yù)聚合 window 大小與 global 階段保持一致,checkpoint 時(shí)將結(jié)果寫出,不保存狀態(tài) 。Global window Agg:全量聚合。
- 第二,增加 mini-batch,好處是 local 階段 mini-batch 避免數(shù)據(jù)量緩存過多,Global 階段 mini-batch 減少狀態(tài)訪問次數(shù)。
我們解決的第二個(gè)問題是 Flink SQL 下的 UDF 函數(shù)復(fù)用的問題。如下圖所示,以左邊的 SQL 為例,可以看到有兩個(gè) UDF 的函數(shù),這兩個(gè)函數(shù)在 SQL 里面都重復(fù)出現(xiàn)了多次。
- 優(yōu)化前:相同 UDF 多次執(zhí)行,性能變差。
- 優(yōu)化后:同一條數(shù)據(jù)下 UDF 結(jié)果復(fù)用,避免多次調(diào)用執(zhí)行,節(jié)約資源,性能也得到提升。拿示例 SQL 來說,性能提升了 2 倍。
四、未來工作
第四部分介紹我們未來的一些規(guī)劃,分為三塊。
- 第一,關(guān)于資源利用率。目標(biāo)是提升集群整體資源利用均衡性,Flink 任務(wù)內(nèi)調(diào)度均衡性,以及 Flink 任務(wù)資源使用合理性。
- 第二,關(guān)于 Flink SQL。我們會(huì)持續(xù)的去做推廣。我們希望提升 SQL 任務(wù)穩(wěn)定性和 SQL 任務(wù)資源的利用率。
- 第三,探索流批統(tǒng)一,這也是業(yè)界的一個(gè)方向。我們希望可以一套代碼就解決問題,不用重復(fù)開發(fā)兩套任務(wù)。
原文鏈接:https://developer.aliyun.com/article/782330?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的快手基于 Flink 的持续优化与实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 阿里云AIoT正式发布IoT安全中心和I
- 下一篇: ACK正式支持对基于Alibaba Cl