58 集团大规模 Storm 任务平滑迁移至 Flink 的秘密
Flink-Storm 是 Flink 官方提供的用于 Flink 兼容 Storm 程序 beta 工具,并且在 Release 1.8 之后去掉相關(guān)代碼。本文主要講述 58 實(shí)時(shí)計(jì)算平臺(tái)如何優(yōu)化 Flink-Storm 以及基于 Flink-Storm 實(shí)現(xiàn)真實(shí)場景下大規(guī)模 Storm 任務(wù)平滑遷移 Flink。
背景
58 實(shí)時(shí)計(jì)算平臺(tái)旨在為集團(tuán)業(yè)務(wù)部門提供穩(wěn)定高效實(shí)時(shí)計(jì)算服務(wù),主要基于 Storm 和 Spark Streaming 構(gòu)建,但在使用過程中也面臨一些問題,主要包括 Storm 在吞吐量不足以及多集群帶來運(yùn)維問題,Spark Streaming 又無法滿足低延遲的要求。Apache Flink 開源之后,其在架構(gòu)設(shè)計(jì)、計(jì)算性能和穩(wěn)定性上體現(xiàn)出的優(yōu)勢,使我們決定采用 Flink 作為新一代實(shí)時(shí)計(jì)算平臺(tái)的計(jì)算引擎。同時(shí)基于 Flink 開發(fā)了一站式高性能實(shí)時(shí)計(jì)算平臺(tái) Wstream,支持 Flink jar,Stream Sql,Flink-Storm 等多樣化任務(wù)構(gòu)建方式。
在完善 Flink 平臺(tái)建設(shè)的同時(shí),我們也啟動(dòng) Storm 任務(wù)遷移 Flink 計(jì)劃,旨在提升實(shí)時(shí)計(jì)算平臺(tái)整體效率,減少機(jī)器成本和運(yùn)維成本。
Storm vs Flink
盡管 Flink 作為高性能計(jì)算引擎可以很好兼容 Storm,但在業(yè)務(wù)遷移過程中,我們?nèi)匀挥龅搅艘恍﹩栴}:
1 .用戶對 Flink 的學(xué)習(xí)成本;
因此我們決定采用 Flink 官方提供的 Flink-Storm 進(jìn)行遷移,在保障遷移穩(wěn)定性同時(shí)無需用戶修改 Storm 代碼邏輯。
Flink-Storm 原理
通過 Storm 原生 TopologyBuilder 構(gòu)建好 Storm topology。
FlinkTopology.createTopology(builder) 將 StormTopology 轉(zhuǎn)換為 Flink 對應(yīng)的 Streaming Dataflow。
SpoutWrapper 用于將 spout 轉(zhuǎn)換為 RichParallelSourceFunction,spout 的OutputFields轉(zhuǎn)換成 source 的T ypeInformation。
BoltWrapper 用于將 bolt 轉(zhuǎn)換成對應(yīng)的 operator,其中 grouping 轉(zhuǎn)換為對 spout 的 DataStream 的對應(yīng)操作。
構(gòu)建完 FlinkTopology 之后,就可以通過 StreamExecutionEnvironment 生成 StreamGraph 獲取 JobGraph,之后將 JobGraph 提交到 Flink 運(yùn)行時(shí)環(huán)境。
實(shí)踐
Flink-Storm 作為官方提供 Flink 兼容 Storm 程序?yàn)槲覀儗?shí)現(xiàn)無縫遷移提供了可行性,但是作為 beta 版本,在實(shí)際使用過程中存在很多無法滿足現(xiàn)實(shí)場景的情況,主要包括版本,功能 bug,復(fù)雜邏輯兼容,無法支持 yarn 等,下面將主要分為平臺(tái)層面和用戶層面講述我們的使用和改進(jìn)。
平臺(tái)層面
1. 版本
當(dāng)前線上使用 Apache Flink 1.6 版本,Flink-Storm 模塊基于 Storm 1.0 開發(fā),我們平臺(tái)運(yùn)行 Storm 版本為 0.9.5 和 1.2 。
1.1 對于 Storm 1.2 運(yùn)行任務(wù),Storm 1.0 API 完全兼容 1.2 版本,因此只需切換 Flink-Storm 模塊依賴的 storm-core 到 1.2.
1.2 對于 Storm 0.9.5 任務(wù),由于 Storm 1.0 API 無法兼容 0.9.5,需要修改依賴 storm-core 為 0.9.5,同時(shí)修改 Flink-Storm 模塊中所有與 Storm 相關(guān)的 API,主要是切換 package 路徑。
1.3 重新構(gòu)建 flink-storm 包 mvn clean package -Dmaven.test.skip=true -Dcheckstyle.skip=true
2.功能
2.1 傳遞語義保證
Storm 使用 ACK 機(jī)制來實(shí)現(xiàn)傳遞語義保證,我們沒有將 Storm 的 ACK 機(jī)制移植到Flink-Storm。因此,某些依賴 ACK 機(jī)制的功能會(huì)受到限制。比如,Kafka spout 將消費(fèi)狀態(tài)存儲(chǔ)在 ZK,狀態(tài)的更新需要依賴 ACK 機(jī)制,tuple 樹結(jié)束后,spout 才會(huì)觸發(fā)狀態(tài)更新,表示這條消息已經(jīng)被完全處理,從而實(shí)現(xiàn) at least once 的傳遞保證。Storm 也提供了at most once 的支持,spout 發(fā)送消息后,無需等待 tuple 樹結(jié)束直接觸發(fā)狀態(tài)更新。我們使用了 Storm 的實(shí)現(xiàn) at most once 的方式,在 Kafka spout 實(shí)現(xiàn) at most once 的基礎(chǔ)上,通過實(shí)現(xiàn) Flink Checkpoint 的狀態(tài)機(jī)制,實(shí)現(xiàn)了 Flink-storm 任務(wù)的 at least once。Storm 任務(wù)遷移到 Flink,傳遞保證不變。
2.2 tick tuple 機(jī)制
Storm 使用 tick tuple 機(jī)制實(shí)現(xiàn)定時(shí)功能,消息超時(shí)重發(fā)、Bolt 定時(shí)觸發(fā)等功能都要依賴 tick tuple 機(jī)制。Storm 0.9.5 版本沒有實(shí)現(xiàn)窗口功能,用戶可以使用 tick tuple 機(jī)制簡單實(shí)現(xiàn)窗口功能。我們同樣為 Flink-Storm 增加了 tick tuple 機(jī)制的支持,使用方式也和 Storm 中使用方式一樣,配置 topology.tick.tuple. freq.secs 參數(shù),即開啟了 tick tuple 功能。
2.3 多輸入下 AllGrouping 支持
AllGrouping 分組方式對應(yīng)于 Flink 是 Broadcast。如圖,bolt-1 有兩個(gè)輸入,這種情況下,原 flink-storm 的實(shí)現(xiàn),spout-2 到 bolt-1 的數(shù)據(jù)分區(qū)的表現(xiàn)形式和Rebalance(Flink 術(shù)語)一樣,而不是 Broadcast。我們優(yōu)化了這種場景,使其數(shù)據(jù)分組表現(xiàn)和 Storm 中是一樣的。
3.Runtime
Flink-Storm 默認(rèn)支持 local 和 standalong 模式任務(wù)提交,無法將任務(wù)提交到 yarn 集群,我們在建設(shè) Flink 集群一開始就選擇了 yarn 模式,便于集群資源管理和統(tǒng)一實(shí)時(shí)計(jì)算平臺(tái),因此需要自行實(shí)現(xiàn)支持 yarn 的 runtime 功能,這里主要涉及 yarn client 端設(shè)計(jì)。
YARN Client 實(shí)現(xiàn)機(jī)制
整個(gè)模塊主要分為四個(gè)部分,其中 client 用于調(diào)用 Flink-Storm 程序轉(zhuǎn)化接口,得到 Flink jobGraph。配置參數(shù)用于初始化 Flink 及 yarn 相關(guān)配置,構(gòu)建運(yùn)行時(shí)環(huán)境,命令行工具主要用于更加靈活的管理。yarnClient 主要實(shí)現(xiàn) ApplicationClientProtocol 接口,完成與 ResourceManager 與 ApplicationMaster 的交互,實(shí)現(xiàn) Flink job 提交和監(jiān)控。
4.任務(wù)部署
為便于任務(wù)提交和集成到 Wstream 平臺(tái),提供類似 Flink 命令行提交方式:
用戶層面
1.maven 依賴
平臺(tái)將編譯好的包上傳到公司 maven 私服供用戶下載對應(yīng)版本 Flink-Storm 依賴包:
2.代碼改動(dòng)
用戶需要將 Storm 提交任務(wù)的方式改成 Flink 提交,其他無需變動(dòng)。
總結(jié)
通過對 Fink-Storm 的優(yōu)化和使用,我們已經(jīng)順利完成多個(gè) Storm 集群任務(wù)遷移和下線,在保障實(shí)時(shí)性及吞吐量的基礎(chǔ)上可以節(jié)約計(jì)算資源 40% 以上,同時(shí)借助 yarn 統(tǒng)一管理實(shí)時(shí)計(jì)算平臺(tái)無需維護(hù)多套 Storm 集群,整體提升了平臺(tái)資源利用率,減輕平臺(tái)運(yùn)維工作量。
原文鏈接
本文為云棲社區(qū)原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
總結(jié)
以上是生活随笔為你收集整理的58 集团大规模 Storm 任务平滑迁移至 Flink 的秘密的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 今天起,我们喝的百年牛奶要变了!
- 下一篇: AliOS Things 3.0应用笔记