Flink + Iceberg,腾讯百亿级实时数据入湖实战
本文整理自騰訊數據湖研發高級工程師陳俊杰在 4 月 17 日 上海站 Flink Meetup 分享的《百億級實時數據入湖實戰》,文章內容為:
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點贊送 star~
一、騰訊數據湖介紹
從上圖可以看出來,整個平臺比較大,包括了數據接入、上層的分析、中間的管理 (如任務管理,分析管理和引擎管理),再到最下層的 Table Format。
二、百億級數據落地場景落地
1. 傳統平臺架構
如上圖所示,過去的傳統平臺架構無非是兩種,一種是 Lambda 架構,一種是 Kappa 架構:
Lambda 架構中,批和流是分開的,所以運維要有兩套集群,一套是 For Spark/Hive,一套是 For Flink。這存在幾個問題:
- 第一是運維的成本比較大;
- 第二是開發成本。例如在業務方面,一會要寫 Spark,一會要寫 Flink 或者 SQL,總體來說,開發成本對數據分析人員不是特別友好。
- 第二個是 Kappa 架構。其實就是消息隊列,到底層的傳輸,再到后面去做一些分析。它的特點是比較快,基于 Kafka 有一定的實時性。
這兩種架構各有利弊,最大的問題是存儲可能會不統一,導致數據鏈路割裂。目前我們平臺已經接入了 Iceberg,下面會根據不同場景,闡述遇到的問題及解決的過程。
2. 場景一: 手 Q 安全數據入湖
手機 QQ 安全數據入湖是一個非常典型的場景。
目前的業務場景是消息隊列 TubeMQ 通過 Flink 落地成 ODS 到 Iceberg,然后再用 Flink 做一些用戶表的關聯,之后做成一個寬表去做一些查詢,放到 COS 中,可能會在 BI 場景做一些分析。
這個過程看似平平無奇,但是要知道,手 Q 的用戶關聯維表為 28 億,每天的消息隊列是百億級的,因此會面臨一定的挑戰。
小文件挑戰
- Flink Writer 產生小文件
Flink 寫入沒有 shuffle,分發的數據無序,導致小文件多。
- 延遲要求高
checkpoint 間隔短,commit 間隔小,放大小文件問題。
- 小文件爆炸
幾天時間元數據和數據的小文件同時爆炸,集群壓力巨大。
- 合并小文件又放大問題
為了解決小文件問題,開 Action 進行小文件合并,結果產生更多文件。
- 來不及刪數據
刪除快照,刪孤兒文件,但是掃描文件太多,namenode 壓力巨大。
解決方案
Flink 同步合并
- 增加小文件合并 Operators;
- 增加 Snapshot 自動清理機制。
1)snapshot.retain-last.nums
2)snapshot.retain-last.minutes
Spark 異步合并
- 增加后臺服務進行小文件合并和孤兒文件刪除;
- 增加小文件過濾邏輯,逐步刪除小文件;
- 增加按分區合并邏輯,避免一次生成太多刪除文件導致任務 OOM。
- Flink 同步合并
把所有的 Data 文件 Commit 之后,會產生一個 Commit Result。我們會拿 Commit Result 生成一個壓縮的任務,再給它并發成多個 Task Manager 去做 Rewrite 的工作,最終把結果 Commit 到 Iceberg 表里面。
當然,這里面的關鍵所在是 CompactTaskGenerator 怎么做。剛開始的時候我們想盡量地合并,于是去做表的 scan,把很多文件都掃一遍。然而它的表非常大,小文件非常多,一掃使得整個 Flink 立馬掛掉。
我們想了個方法,每次合并完,增量地去掃數據。從上一個 Replace Operation 里面到現在做一個增量,看這中間又增了多少,哪些符合 Rewrite 的策略。
這里面其實有許多配置,去看達到了多少個 snapshot,或者達到了多少個文件可以去做合并,這些地方用戶可以自己設置。當然,我們本身也設有默認值,從而保證用戶無感知地使用這些功能。
- Fanout Writer 的坑
在 Fanout Writer 時,如果數據量大可能會遇到多層分區。比如手 Q 的數據分省、分市;但分完之后還是很大,于是又分 bucket。此時每個 Task Manager 里可能分到很多分區,每個分區打開一個 Writer,Writer 就會非常的多,造成內存不足。
這里我們做了兩件事情:
- 第一是 KeyBy 支持。根據用戶設置的分區做 KeyBy 的動作,然后把相同分區的聚集在一個 Task Manager 中,這樣它就不會打開那么多分區的 Writer。當然,這樣的做法會帶來一些性能上的損失。
- 第二是做 LRU Writer,在內存里面維持一個 Map。
3. 場景二:新聞平臺索引分析
上方是基于 Iceberg 流批一體的新聞文章在線索引架構。左邊是 Spark 采集 HDFS 上面的維表,右邊是接入系統,采集以后會用 Flink 和維表做一個基于 Window 的 Join,然后寫到索引流水表中。
功能
- 準實時明細層;
- 實時流式消費;
- 流式 MERGE INTO;
- 多維分析;
- 離線分析。
場景特點
上述場景有以下幾個特點:
- 數量級:索引單表超千億,單 batch 2000 萬,日均千億;
- 時延需求:端到端數據可見性分鐘級;
- 數據源:全量、準實時增量、消息流;
- 消費方式:流式消費、批加載、點查、行更新、多維分析。
挑戰:MERGE INTO
有用戶提出了 Merge Into 的需求,因此我們從三個方面進行了思考:
- 功能:將每個 batch join 后的流水表 Merge into 到實時索引表,供下游使用;
- 性能:下游對索引時效性要求高,需要考慮 merge into 能追上上游的 batch 消費窗口;
- 易用性:Table API?還是 Action API?又或是 SQL API?
解決方案
第一步
- 參考 Delta Lake 設計 JoinRowProcessor;
- 利用 Iceberg 的 WAP 機制寫臨時快照。
第二步
- 可選擇跳過 Cardinality-check;
- 寫入時可以選擇只 hash,不排序。
第三步
- 支持 DataframeAPI;
- Spark 2.4 支持 SQL;
- Spark 3.0 使用社區版本。
4. 場景三:廣告數據分析
廣告數據主要有以下幾個特點:
- 數量級:日均千億 PB 數據,單條 2K;
- 數據源:SparkStreaming 增量入湖;
- 數據特點:標簽不停增加,schema 不停變換;
- 使用方式:交互式查詢分析。
遇到的挑戰與對應的解決方案:
- 挑戰一:Schema 嵌套復雜,平鋪后近萬列,一寫就 OOM。
解決方案:默認每個 Parquet Page Size 設置為 1M,需要根據 Executor 內存進行 Page Size 設置。
- 挑戰二:30 天數據基本集群撐爆。
解決方案:提供 Action 進行生命周期管理,文檔區分生命周期和數據生命周期。
挑戰三:交互式查詢。
解決方案:
- 1)column projection;
- 2)predicate push down。
- 挑戰一:Schema 嵌套復雜,平鋪后近萬列,一寫就 OOM。
三、未來規劃
對于未來的規劃主要分為內核側與平臺側。
1. 內核側
在未來,我們希望在內核側有以下幾點規劃:
更多的數據接入
- 增量入湖支持;
- V2 Format 支持;
- Row Identity 支持。
更快的查詢
- 索引支持;
- Alloxio 加速層支持;
- MOR 優化。
更好的數據治理
- 數據治理 Action;
- SQL Extension 支持;
- 更好的元數據管理。
2. 平臺側
在平臺側我們有以下幾點規劃:
數據治理服務化
- 元數據清理服務化;
- 數據治理服務化。
增量入湖支持
- Spark 消費 CDC 入湖;
- Flink 消費 CDC 入湖。
指標監控告警
- 寫入數據指標;
- 小文件監控和告警。
四、總結
經過大量生產上的應用與實踐,我們得到三方面的總結:
- 可用性:通過多個業務線的實戰,確認 Iceberg 經得起日均百億,甚至千億的考驗。
- 易用性:使用門檻比較高,需要做更多的工作才能讓用戶使用起來。
- 場景支持:目前支持的入湖場景 還沒有 Hudi 多,增量讀取這塊也比較缺失,需要大家努力補齊。
另外~《Apache Flink-實時計算正當時》電子書重磅發布,本書將助您輕松 Get Apache Flink 1.13 版本最新特征,同時還包含知名廠商多場景 Flink 實戰經驗,學用一體,干貨多多!快點擊下方鏈接領取吧~
https://developer.aliyun.com/article/784856?spm=a2c6h.13148508.0.0.61644f0eskgxgo
原文鏈接:https://developer.aliyun.com/article/785032?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink + Iceberg,腾讯百亿级实时数据入湖实战的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 放下表格——开箱即用的新冠疫苗接种统计模
- 下一篇: 实时数仓入门训练营:Hologres性能