唯品会在 Flink 容器化与平台化上的建设实践
轉(zhuǎn)自dbaplus社群公眾號(hào)
作者:王康,唯品會(huì)數(shù)據(jù)平臺(tái)高級(jí)開發(fā)工程師
GitHub 地址
https://github.com/apache/flink
歡迎大家給 Flink 點(diǎn)贊送 star~
自 2017 年起,為保障內(nèi)部業(yè)務(wù)在平時(shí)和大促期間的平穩(wěn)運(yùn)行,唯品會(huì)就開始基于 Kubernetes 深入打造高性能、穩(wěn)定、可靠、易用的實(shí)時(shí)計(jì)算平臺(tái),現(xiàn)在的平臺(tái)支持 Flink、Spark、Storm 等主流框架。
本文將分為五個(gè)方面,分享唯品會(huì) Flink 的容器化實(shí)踐應(yīng)用以及產(chǎn)品化經(jīng)驗(yàn):
- 發(fā)展概覽
- Flink 容器化實(shí)踐
- Flink SQL 平臺(tái)化建設(shè)
- 應(yīng)用案例
- 未來(lái)規(guī)劃
一、發(fā)展概覽
1、集群規(guī)模
在集群規(guī)模方面,我們有 2000+ 的物理機(jī),主要部署 Kubernetes 異地雙活的集群,利用 Kubernetes 的 namespaces,labels 和 taints 等實(shí)現(xiàn)業(yè)務(wù)隔離以及初步的計(jì)算負(fù)載隔離。
Flink 任務(wù)數(shù)、Flink SQL 任務(wù)數(shù)、Storm 任務(wù)數(shù)、Spark 任務(wù)數(shù),這些線上實(shí)時(shí)應(yīng)用加起來(lái)有 1000 多個(gè)。目前我們主要支持 Flink SQL 這一塊,因?yàn)?SQL 化是一個(gè)趨勢(shì),所以我們要支持 SQL 任務(wù)的上線平臺(tái)。
2、平臺(tái)架構(gòu)
我們從下往上進(jìn)行解析實(shí)時(shí)計(jì)算平臺(tái)的整體架構(gòu):
- 資源調(diào)度層(最底層)
實(shí)際上是用 deployment 的模式運(yùn)行 Kubernetes 上,平臺(tái)雖然支持 yarn 調(diào)度,但是 yarn 調(diào)度與批任務(wù)共享資源,所以主流任務(wù)還是運(yùn)行在 Kubernetes 上的。并且,yarn 調(diào)度這一層主要是離線部署的一套 yarn 集群。在 2017 年的時(shí)候,我們自研了 Flink on Kubernetes 的一套方案,因?yàn)榈讓诱{(diào)度分了兩層,所以在大促資源緊張的時(shí)候,實(shí)時(shí)跟離線就可以做一個(gè)資源的借調(diào)。
- 存儲(chǔ)層
主要用來(lái)支持公司內(nèi)部基于 Kafka 的實(shí)時(shí)數(shù)據(jù) vms,基于 binlog 的 vdp 數(shù)據(jù)和原生 Kafka 作為消息總線,狀態(tài)存儲(chǔ)在 HDFS 上,數(shù)據(jù)主要存入 Redis、MySQL、HBase、Kudu、HDFS、ClickHouse 等。
- 計(jì)算引擎層
主要是 Flink、Storm、Spark,目前主推的是 Flink,每個(gè)框架會(huì)都會(huì)支持幾個(gè)版本的鏡像以滿足不同的業(yè)務(wù)需求。
- 實(shí)時(shí)平臺(tái)層
主要提供作業(yè)配置、調(diào)度、版本管理、容器監(jiān)控、job 監(jiān)控、告警、日志等功能,提供多租戶的資源管理(quota,label 管理)以及 Kafka 監(jiān)控。資源配置也分為大促日和平常日,大促的資源和平常的資源是不一樣的,資源的權(quán)限管控也是不一樣的。在 Flink 1.11 版本之前,平臺(tái)自建元數(shù)據(jù)管理系統(tǒng)為 Flink SQL 管理 schema;從 1.11 版本開始,則是通過 Hive metastore 與公司元數(shù)據(jù)管理系統(tǒng)融合。
- 應(yīng)用層
主要是支持實(shí)時(shí)大屏、推薦、實(shí)驗(yàn)平臺(tái)、實(shí)時(shí)監(jiān)控和實(shí)時(shí)數(shù)據(jù)清洗的一些場(chǎng)景。
二、Flink容器化實(shí)踐
1、容器化方案
上面是實(shí)時(shí)平臺(tái) Flink 容器化的架構(gòu)圖。Flink 容器化其實(shí)是基于 Standalone 模式部署的。
我們的部署模式共有 Client、Job Manager、Task Manager 三個(gè)角色,每一個(gè)角色都會(huì)有一個(gè) Deployment 來(lái)控制。
用戶通過平臺(tái)上傳任務(wù) jar 包、配置等,存儲(chǔ)于 HDFS 上。同時(shí)由平臺(tái)維護(hù)的配置、依賴等也存儲(chǔ)在 HDFS 上,當(dāng) pod 啟動(dòng)時(shí),就會(huì)進(jìn)行拉取等初始化操作。
Client 中主進(jìn)程是一個(gè)由 go 開發(fā)的 agent,當(dāng) Client 啟動(dòng)時(shí),會(huì)首先檢查集群狀態(tài),當(dāng)集群準(zhǔn)備好后,從 HDFS 上拉取 jar 包,再向這個(gè)集群提交任務(wù)。Client 的主要任務(wù)是做容錯(cuò),它主要功能還有監(jiān)控任務(wù)狀態(tài),做 savepoint 等操作。
通過部署在每臺(tái)物理機(jī)上的 smart-agent 采集容器的指標(biāo)寫入 m3,以及通過 Flink 暴漏的接口將 metrics 寫入 prometheus,結(jié)合 grafana 展示。同樣通過部署在每臺(tái)物理機(jī)上的 vfilebeat 采集掛載出來(lái)的相關(guān)日志寫入 es,在 dragonfly 可以實(shí)現(xiàn)日志檢索。
1)Flink 平臺(tái)化
在實(shí)踐過程中,一定要結(jié)合具體場(chǎng)景和易用性,再去考慮做平臺(tái)化工作。
2)Flink 穩(wěn)定性
在我們應(yīng)用部署以及運(yùn)行過程中,異常是不可避免的,這時(shí)候平臺(tái)就需要做一些保證任務(wù)在出現(xiàn)異常狀況后,依舊保持穩(wěn)定性的一些策略。
- pod 的健康和可用:
由 livenessProbe 和 readinessProbe 檢測(cè),同時(shí)指定 pod 的重啟策略,Kubernetes 本身可以做一個(gè) pod 的拉起。
Flink 任務(wù)產(chǎn)生異常時(shí):
- Flink 有自已本身的一套 restart 策略和 failover 機(jī)制,這是它的第一層保障。
- 在 Client 中會(huì)定時(shí)監(jiān)控 Flink 狀態(tài),同時(shí)將最新的 checkpoint 地址更新到自己的緩存中,并匯報(bào)到平臺(tái),然后固化到 MySQL 中。當(dāng) Flink 無(wú)法再重啟時(shí),由 Client 重新從最新的成功 checkpoint 提交任務(wù)。這是它的第二層保障。
這一層將 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 機(jī)制了,少了 zk 的組件依賴。
- 當(dāng)前兩層無(wú)法重啟時(shí)或集群出現(xiàn)異常時(shí),由平臺(tái)自動(dòng)從固化到 MySQL 中的最新 checkpoint 重新拉起一個(gè)集群,提交任務(wù),這是它的第三層保障。
機(jī)房容災(zāi):
- 用戶的 jar 包,checkpoint 都做了異地雙 HDFS 存儲(chǔ)。
- 異地雙機(jī)房雙集群。
2、Kafka 監(jiān)控方案
Kafka 監(jiān)控是任務(wù)監(jiān)控里非常重要的一個(gè)環(huán)節(jié),整體的流程如下:
平臺(tái)提供監(jiān)控 Kafka 堆積,用戶在界面上,可以配置自己的 Kafka 監(jiān)控,告知在怎樣的集群,以及用戶消費(fèi) message 等配置信息??梢詮?MySQL 中將用戶 Kafka 監(jiān)控配置提取后,再通過 jmx 監(jiān)控 Kafka,這樣的信息采集之后,寫入下游 Kafka,再通過另一個(gè) Flink 任務(wù)實(shí)時(shí)監(jiān)控告警,同時(shí)將這些數(shù)據(jù)同步寫入 ck 里面,從而反饋給我們的用戶(這里也可以不用 ck,用 Prometheus 去做監(jiān)控也是可以的,但 ck 會(huì)更加適合),最后再用 Grafana 組件去展示給用戶。
三、Flink SQL 平臺(tái)化建設(shè)
有了前面 Flink 的容器化方案之后,就要開始 Flink SQL 平臺(tái)化建設(shè)了。大家都知道,這樣流式的 api 開發(fā)起來(lái),還是有一定的成本的。 Flink 肯定是比 Storm 快的,也相對(duì)比較穩(wěn)定、容易一些,但是對(duì)于一些用戶,特別是 Java 開發(fā)的一些同學(xué)來(lái)說,做這個(gè)是有一定門檻的。
Kubernetes 的 Flink 容器化實(shí)現(xiàn)以后,方便了 Flink api 應(yīng)用的發(fā)布,但是對(duì)于 Flink SQL 的任務(wù)仍然不夠便利。于是平臺(tái)提供了更加方便的在線編輯發(fā)布、SQL 管理等一棧式開發(fā)平臺(tái)。
1、 Flink SQL 方案
平臺(tái)的 Flink SQL 方案如上圖所示,任務(wù)發(fā)布系統(tǒng)與元數(shù)據(jù)管理系統(tǒng)是完全解耦的。
1)Flink SQL 任務(wù)發(fā)布平臺(tái)化
在實(shí)踐過程中,需要考慮易用性,做平臺(tái)化工作,主操作界面如下圖所示:
- Flink SQL 的版本管理、語(yǔ)法校驗(yàn)、拓?fù)鋱D管理等;
- UDF 通用和任務(wù)級(jí)別的管理,支持用戶自定義 udf;
- 提供參數(shù)化的配置界面,方便用戶上線任務(wù)。
下圖是一個(gè)用戶界面配置的例子:
下圖是一個(gè)集群配置的范例:
2)元數(shù)據(jù)管理
平臺(tái)在 1.11 之前通過構(gòu)建自己的元數(shù)據(jù)管理系統(tǒng) UDM,MySQL 存儲(chǔ) Kafka,Redis 等 schema,通過自定義 catalog 打通 Flink 與 UDM,從而實(shí)現(xiàn)元數(shù)據(jù)管理。
在 1.11 之后,Flink 集成 Hive 逐漸完善,平臺(tái)重構(gòu)了 Flink SQL 框架,并通過部署一個(gè) SQL-gateway service 服務(wù),中間調(diào)用自己維護(hù)的 SQL-Client jar 包,從而與離線元數(shù)據(jù)打通,實(shí)現(xiàn)了實(shí)時(shí)離線元數(shù)據(jù)的統(tǒng)一,為之后的流批一體打好了基礎(chǔ)。
在元數(shù)據(jù)管理系統(tǒng)創(chuàng)建的 Flink 表操作界面如下圖所示:創(chuàng)建 Flink 表的元數(shù)據(jù),持久化到 Hive 里,Flink SQL 啟動(dòng)時(shí)從 Hive 里讀取對(duì)應(yīng)表的 table schema 信息。
2、Flink SQL 相關(guān)實(shí)踐
平臺(tái)對(duì)于官方原生支持或者不支持的 connector 進(jìn)行整合和開發(fā),鏡像和 connector,format 等相關(guān)依賴進(jìn)行解耦,可以快捷的進(jìn)行更新與迭代。
1)Flink SQL 相關(guān)實(shí)踐
Flink SQL 主要分為以下三層:
connector 層
- 支持 VDP connector 讀取 source 數(shù)據(jù)源;
- 支持 Redis string、hash 等數(shù)據(jù)類型的 sink & 維表關(guān)聯(lián);
- 支持 kudu connector & catalog & 維表關(guān)聯(lián);
- 支持 protobuf format 解析實(shí)時(shí)清洗數(shù)據(jù);
- 支持 vms connector 讀取 source 數(shù)據(jù)源;
- 支持 ClickHouse connector sink 分布式表 & 本地表高 TPS 寫入;
- Hive connector 支持?jǐn)?shù)坊 Watermark Commit Policy 分區(qū)提交策略 & array、decimal 等復(fù)雜數(shù)據(jù)類型。
runtime 層
- 主要支持拓?fù)鋱D執(zhí)行計(jì)劃修改;
- 維表關(guān)聯(lián) keyBy 優(yōu)化 cache 提升查詢性能;
- 維表關(guān)聯(lián)延遲 join。
平臺(tái)層
- Hive UDF;
- 支持 json HLL 相關(guān)處理函數(shù);
- 支持 Flink 運(yùn)行相關(guān)參數(shù)設(shè)置如 minibatch、聚合優(yōu)化參數(shù);
- Flink 升級(jí) hadoop3。
2)拓?fù)鋱D執(zhí)行計(jì)劃修改
針對(duì)現(xiàn)階段 SQL 生成的 stream graph 并行度無(wú)法修改等問題,平臺(tái)提供可修改的拓?fù)漕A(yù)覽修改相關(guān)參數(shù)。平臺(tái)會(huì)將解析后的 FlinkSQL 的 excution plan json 提供給用戶,利用 uid 保證算子的唯一性,修改每個(gè)算子的并行度,chain 策略等,也為用戶解決反壓?jiǎn)栴}提供方法。例如針對(duì) ClickHouse sink 小并發(fā)大批次的場(chǎng)景,我們支持修改 ClickHouse sink 并行度,source 并行度 = 72,sink 并行度 = 24,提高 ClickHouse sink tps。
3)維表關(guān)聯(lián) keyBy 優(yōu)化 cache
針對(duì)維表關(guān)聯(lián)的情況,為了降低 IO 請(qǐng)求次數(shù),降低維表數(shù)據(jù)庫(kù)讀壓力,從而降低延遲,提高吞吐,有以下三種措施:
下面是維表關(guān)聯(lián) KeyBy 優(yōu)化 cache 的圖:
在優(yōu)化之前的時(shí)候,維表關(guān)聯(lián) LookupJoin 算子和正常算子 chain 在一起,優(yōu)化之間維表關(guān)聯(lián) Lookup Join 算子和正常算子不 chain 在一起,將join key 作為 hash 策略的 key。
采用這種方式優(yōu)化后,例如原來(lái)的 3000W 數(shù)據(jù)量維表,10 個(gè) TM 節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)都要緩存 3000W 的數(shù)據(jù),總共需要緩存 3 億的量。而經(jīng)過 keyBy 優(yōu)化之后,每個(gè) TM 節(jié)點(diǎn)只需要緩存 3000W/10 = 300W 的數(shù)據(jù)量,總共緩存的數(shù)據(jù)量只有 3000W,這非常大程度減少了緩存數(shù)據(jù)量。
4)維表關(guān)聯(lián)延遲 join
維表關(guān)聯(lián)中,有很多業(yè)務(wù)場(chǎng)景,在維表數(shù)據(jù)新增數(shù)據(jù)之前,主流數(shù)據(jù)已經(jīng)發(fā)生 join 操作,會(huì)出現(xiàn)關(guān)聯(lián)不上的情況。因此,為了保證數(shù)據(jù)的正確,將關(guān)聯(lián)不上的數(shù)據(jù)進(jìn)行緩存,進(jìn)行延遲 join。
最簡(jiǎn)單的做法是,在維表關(guān)聯(lián)的 function 里設(shè)置重試次數(shù)和重試間隔,這個(gè)方法會(huì)增大整個(gè)流的延遲,但主流 qps 不高的情況下,可以解決問題。
增加延遲 join 的算子,當(dāng) join 維表未關(guān)聯(lián)時(shí),先緩存起來(lái),根據(jù)設(shè)置重試次數(shù)和重試間隔從而進(jìn)行延遲的 join。
四、應(yīng)用案例
1、實(shí)時(shí)數(shù)倉(cāng)
1)實(shí)時(shí)數(shù)據(jù)入倉(cāng)
實(shí)時(shí)數(shù)倉(cāng)主要分為三個(gè)過程:
- 流量數(shù)據(jù)一級(jí) Kafka 進(jìn)行實(shí)時(shí)數(shù)據(jù)清洗后,可以寫到二級(jí)清洗 Kafka,主要是 protobuf 格式,再通過 Flink SQL 寫入 Hive 5min 表,以便做后續(xù)的準(zhǔn)實(shí)時(shí) ETL,加速 ods 層數(shù)據(jù)源的準(zhǔn)備時(shí)間。
- MySQL 業(yè)務(wù)庫(kù)的數(shù)據(jù),通過 VDP 解析形成 binlog cdc 消息流,再通過 Flink SQL 寫入 Hive 5min 表,同時(shí)會(huì)提交到自定義分區(qū),再把分區(qū)狀態(tài)匯報(bào)到服務(wù)接口,最后再做一個(gè)離線的調(diào)度。
- 業(yè)務(wù)系統(tǒng)通過 VMS API 產(chǎn)生業(yè)務(wù) Kafka 消息流,通過 Flink SQL 解析之后寫入 Hive 5min 表??梢灾С?string、json、csv 等消息格式。
使用 Flink SQL 做流式數(shù)據(jù)入倉(cāng)是非常方便的,而且 1.12 版本已經(jīng)支持了小文件的自動(dòng)合并,解決了大數(shù)據(jù)層一個(gè)非常普遍的痛點(diǎn)。
我們自定義分區(qū)提交策略,當(dāng)前分區(qū) ready 時(shí)候會(huì)調(diào)一下實(shí)時(shí)平臺(tái)的分區(qū)提交 api,在離線調(diào)度定時(shí)調(diào)度通過這個(gè) api 檢查分區(qū)是否 ready。
采用 Flink SQL 統(tǒng)一入倉(cāng)方案以后,我們可獲得以下成果:
- 首先我們不僅解決了以往 Flume 方案不穩(wěn)定的問題,用戶也可以實(shí)現(xiàn)自助入倉(cāng),大大降低入倉(cāng)任務(wù)的維護(hù)成本,穩(wěn)定性也可以得到保障。
- 其次我們還提升了離線數(shù)倉(cāng)的時(shí)效性,從小時(shí)級(jí)降低至 5min 粒度入倉(cāng),時(shí)效性可以增強(qiáng)。
2)實(shí)時(shí)指標(biāo)計(jì)算
- 實(shí)時(shí)應(yīng)用消費(fèi)清洗后 Kafka,通過 Redis 維表、api 等方式關(guān)聯(lián),再通過 Flink window 增量計(jì)算 UV,持久化寫到 HBase 里。
- 實(shí)時(shí)應(yīng)用消費(fèi) VDP 消息流之后,通過 Redis 維表、api 等方式關(guān)聯(lián),再通過 Flink SQL 計(jì)算出銷售額等相關(guān)指標(biāo),增量 upsert 到 kudu 里,方便根據(jù) range 分區(qū)批量查詢,最終通過數(shù)據(jù)服務(wù)對(duì)實(shí)時(shí)大屏提供最終服務(wù)。
以往指標(biāo)計(jì)算通常采用 Storm 方式,這個(gè)方式需要通過 api 定制化開發(fā),采用這樣 Flink 方案以后,我們可以獲得了以下成果:
- 將計(jì)算邏輯切到 Flink SQL 上,降低計(jì)算任務(wù)口徑變化快,解決修改上線周期慢等問題;
- 切換至 Flink SQL 可以做到快速修改,并且實(shí)現(xiàn)快速上線,降低了維護(hù)的成本。
3)實(shí)時(shí)離線一體化ETL數(shù)據(jù)集成
具體的流程如下圖所示:
Flink SQL 在最近的版本中持續(xù)強(qiáng)化了維表 join 的能力,不僅可以實(shí)時(shí)關(guān)聯(lián)數(shù)據(jù)庫(kù)中的維表數(shù)據(jù),還能關(guān)聯(lián) Hive 和 Kafka 中的維表數(shù)據(jù),能靈活滿足不同工作負(fù)載和時(shí)效性的需求。
基于 Flink 強(qiáng)大的流式 ETL 的能力,我們可以統(tǒng)一在實(shí)時(shí)層做數(shù)據(jù)接入和數(shù)據(jù)轉(zhuǎn)換,然后將明細(xì)層的數(shù)據(jù)回流到離線數(shù)倉(cāng)中。
我們通過將 presto 內(nèi)部使用的 HyperLogLog(后面簡(jiǎn)稱 HLL)實(shí)現(xiàn)引入到 Spark UDAF 函數(shù)里,打通 HLL 對(duì)象在 Spark SQL 與 presto 引擎之間的互通。如 Spark SQL 通過 prepare 函數(shù)生成的 HLL 對(duì)象,不僅可以在 Spark SQL 里 merge 查詢而且可以在 presto 里進(jìn)行 merge 查詢。
具體流程如下:
UV 近似計(jì)算示例:
2、實(shí)驗(yàn)平臺(tái)(Flink 實(shí)時(shí)數(shù)據(jù)入 OLAP)
唯品會(huì)實(shí)驗(yàn)平臺(tái)是通過配置多維度分析和下鉆分析,提供海量數(shù)據(jù)的 A/B-test 實(shí)驗(yàn)效果分析的一體化平臺(tái)。一個(gè)實(shí)驗(yàn)是由一股流量(比如用戶請(qǐng)求)和在這股流量上進(jìn)行的相對(duì)對(duì)比實(shí)驗(yàn)的修改組成。實(shí)驗(yàn)平臺(tái)對(duì)于海量數(shù)據(jù)查詢有著低延遲、低響應(yīng)、超大規(guī)模數(shù)據(jù)(百億級(jí))的需求。
整體數(shù)據(jù)架構(gòu)如下:
- 離線數(shù)據(jù)是通過 waterdrop 導(dǎo)入到 ClickHouse 里面去;
- 實(shí)時(shí)數(shù)據(jù)通過 Flink SQL 將 Kafka 里的數(shù)據(jù)清洗解析展開等操作之后,通過 Redis 維表關(guān)聯(lián)商品屬性,通過分布式表寫入到 ClickHouse,然后通過數(shù)據(jù)服務(wù) adhoc 查詢,通過數(shù)據(jù)服務(wù)提供對(duì)外的接口。
業(yè)務(wù)數(shù)據(jù)流如下:
我們的實(shí)驗(yàn)平臺(tái)有一個(gè)很重要的 ES 場(chǎng)景,我們上線一個(gè)應(yīng)用場(chǎng)景后,如果我想看效果如何,包括上線產(chǎn)生的曝光、點(diǎn)擊、加購(gòu)、收藏是怎樣的。我們需要把每一個(gè)數(shù)據(jù)的明細(xì),比如說分流的一些數(shù)據(jù),根據(jù)場(chǎng)景分區(qū),寫到 ck 里面去。
我們通過 Flink SQL Redis connector,支持 Redis 的 sink 、source 維表關(guān)聯(lián)等操作,可以很方便地讀寫 Redis,實(shí)現(xiàn)維表關(guān)聯(lián),維表關(guān)聯(lián)內(nèi)可配置 cache ,極大提高應(yīng)用的 TPS。通過 Flink SQL 實(shí)現(xiàn)實(shí)時(shí)數(shù)據(jù)流的 pipeline,最終將大寬表 sink 到 CK 里,并按照某個(gè)字段粒度做 murmurHash3_64 存儲(chǔ),保證相同用戶的數(shù)據(jù)都存在同一 shard 節(jié)點(diǎn)組內(nèi),從而使得 ck 大表之間的 join 變成 local 本地表之間的 join,減少數(shù)據(jù) shuffle 操作,提升 join 查詢效率。
五、未來(lái)規(guī)劃
1、提高Flink SQL易用性
Flink SQL 對(duì)于 Hive 用戶來(lái)說,使用起來(lái)還是有一點(diǎn)不一樣的地方。不管是 Hive,還是 Spark SQL,都是批量處理的一個(gè)場(chǎng)景。
所以當(dāng)前我們的 Flink SQL 調(diào)試起來(lái)仍有很多不方便的地方,對(duì)于做離線 Hive 的用戶來(lái)說還有一定的使用門檻,例如手動(dòng)配置 Kafka 監(jiān)控、任務(wù)的壓測(cè)調(diào)優(yōu)。所以如何能讓用戶的使用門檻降至最低,讓用戶只需要懂 SQL 或者懂業(yè)務(wù),把 Flink SQL 里面的概念對(duì)用戶屏蔽掉,簡(jiǎn)化用戶的使用流程,是一個(gè)比較大的挑戰(zhàn)。
將來(lái)我們考慮做一些智能監(jiān)控,告訴用戶當(dāng)前任務(wù)存在的問題,不需要用戶去學(xué)習(xí)太多的東西,盡可能自動(dòng)化并給用戶一些優(yōu)化建議。
2、數(shù)據(jù)湖CDC分析方案落地
一方面,我們做數(shù)據(jù)湖主要是為了解決我們 binlog 實(shí)時(shí)更新的場(chǎng)景,目前我們的 VDP binlog 消息流,通過 Flink SQL 寫入到 Hive ods 層,以加速 ods 層數(shù)據(jù)源的準(zhǔn)備時(shí)間,但是會(huì)產(chǎn)生大量重復(fù)消息去重合并。我們會(huì)考慮 Flink + 數(shù)據(jù)湖的 cdc 入倉(cāng)方案來(lái)做增量入倉(cāng)。
另一方面我們希望通過數(shù)據(jù)湖,來(lái)替代我們 Kudu,我們這邊一部分重要的業(yè)務(wù)在用 Kudu。雖然 Kudu 沒有大量的使用,但鑒于 Kudu 的運(yùn)維比一般的數(shù)據(jù)庫(kù)運(yùn)維復(fù)雜得多、比較小眾,并且像訂單打?qū)捴蟮?Kafka 消息流、以及聚合結(jié)果都需要非常強(qiáng)的實(shí)時(shí) upsert 能力,所以我們就開始調(diào)研 CDC+數(shù)據(jù)湖這種解決方案,用這種方案的增量 upsert 能力來(lái)替換 kudu 增量 upsert 場(chǎng)景。
Q&A
Q1:vdp connector 是 MySQL binlog 讀取嗎?和 canal是一種工具嗎?
A1 :vdp 是公司 binlog 同步的一個(gè)組件,將 binlog 解析之后發(fā)送到 Kafka。是基于 canal 二次開發(fā)的。我們定義了一個(gè) cdc format 可以對(duì)接公司的 vdp Kafka 數(shù)據(jù)源,與 Canal CDC format 有點(diǎn)類似。目前沒有開源,使我們公司用的 binlog 的一個(gè)同步方案。
Q2 : uv 數(shù)據(jù)輸出到 HBase,銷售數(shù)據(jù)輸出到 kudu,輸出到了不同的數(shù)據(jù)源,主要是因?yàn)槭裁床扇〉倪@種策略?
A2 :kudu 的應(yīng)用場(chǎng)景沒有 HBase 這么廣泛。uv 實(shí)時(shí)寫入的 TPS 比較高,HBase 比較適合單條查詢的場(chǎng)景,寫入 HBase 高吞吐 + 低延遲,小范圍查詢延遲低;kudu 的話具備一些 OLAP 的特性,可以存訂單類明細(xì),列存加速,結(jié)合 Spark、presto 等做 OLAP 分析。
Q3 : 請(qǐng)問一下,你們?cè)趺唇鉀Q的 ClickHouse 的數(shù)據(jù)更新問題?比如數(shù)據(jù)指標(biāo)更新。
A3 : ck 的更新是異步 merge,只能在同一 shard 同一節(jié)點(diǎn)同一分區(qū)內(nèi)異步 merge,是弱一致性。對(duì)于指標(biāo)更新場(chǎng)景不太建議使用 ck。如果在 ck 里有更新強(qiáng)需求的場(chǎng)景,可以嘗試 AggregatingMergeTree 解決方案,用 insert 替換 update,做字段級(jí)的 merge。
Q4:binlog 寫入怎么保證數(shù)據(jù)的去重和一致性?
A4 : binlog 目前還沒有寫入 ck 的場(chǎng)景,這個(gè)方案看起來(lái)不太成熟。不建議這么做,可以用采用 CDC + 數(shù)據(jù)湖的解決方案。
Q5 : 如果 ck 各個(gè)節(jié)點(diǎn)寫入不均衡,怎么去監(jiān)控,怎么解決?怎么樣看數(shù)據(jù)傾斜呢?
A5 :可以通過 ck 的 system.parts 本地表監(jiān)控每臺(tái)機(jī)器每個(gè)表每個(gè)分區(qū)的寫入數(shù)據(jù)量以及 size,來(lái)查看數(shù)據(jù)分區(qū),從而定位到某個(gè)表某臺(tái)機(jī)器某個(gè)分區(qū)。
Q6 : 你們?cè)趯?shí)時(shí)平臺(tái)是如何做任務(wù)監(jiān)控或者健康檢查的?又是如何在出錯(cuò)后自動(dòng)恢復(fù)的?現(xiàn)在用的是 yarn-application 模式嗎?存在一個(gè) yarn application 對(duì)應(yīng)多個(gè) Flink job 的情況嗎?
A6 : 對(duì)于 Flink 1.12+ 版本,支持了 PrometheusReporter 方式暴露一些 Flink metrics 指標(biāo),比如算子的 watermark、checkpoint 相關(guān)的指標(biāo)如 size、耗時(shí)、失敗次數(shù)等關(guān)鍵指標(biāo),然后采集、存儲(chǔ)起來(lái)做任務(wù)監(jiān)控告警。
Flink 原生的 restart 策略和 failover 機(jī)制,作為第一層的保證。
在 Client 中會(huì)定時(shí)監(jiān)控 Flink 狀態(tài),同時(shí)將最新的 checkpoint 地址更新到自己的緩存中,并匯報(bào)到平臺(tái),固化到 MySQL 中。當(dāng) Flink 無(wú)法再重啟時(shí),由 Client 重新從最新的成功 checkpoint 提交任務(wù)。作為第二層保證。這一層將 checkpoint 固化到 MySQL 中后,就不再使用 Flink HA 機(jī)制了,少了 zk 的組件依賴。
當(dāng)前兩層無(wú)法重啟時(shí)或集群出現(xiàn)異常時(shí),由平臺(tái)自動(dòng)從固化到 MySQL 中的最新 chekcpoint 重新拉起一個(gè)集群,提交任務(wù),作為第三層保證。
我們支持 yarn-per-job 模式,主要基于 Flink on Kubernetes 模式部署 standalone 集群。
Q7 : 目前你們大數(shù)據(jù)平臺(tái)上所有的組件都是容器化的還是混合的?
A7 :目前我們實(shí)時(shí)這一塊的組件 Flink、Spark 、Storm、Presto 等計(jì)算框架實(shí)現(xiàn)了容器化,詳情可看上文 1.2 平臺(tái)架構(gòu)。
Q8 :kudu 不是在 Kubernetes 上跑的吧?
A8 :kudu 不是在 Kubernetes 上運(yùn)行,這個(gè)目前還沒有特別成熟的方案。并且 kudu 是基于 cloudera manager 運(yùn)維的,沒有上 Kubernetes 的必要。
Q9 : Flink 實(shí)時(shí)數(shù)倉(cāng)維度表存到 ck 中,再去查詢 ck,這樣的方案可以嗎?
A9:這是可以的,是可以值得嘗試的。事實(shí)表與維度表數(shù)據(jù)都可以存,可以按照某個(gè)字段做哈希(比如 user_id),從而實(shí)現(xiàn) local join 的效果。
原文鏈接:https://developer.aliyun.com/article/784822?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊(cè)用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請(qǐng)查看《阿里云開發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的唯品会在 Flink 容器化与平台化上的建设实践的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink 和 Iceberg 如何解决
- 下一篇: PolarDB-X 2.0:使用一个透明