基于 Apache Hudi 构建流批一体系统实践
1. 前言
當(dāng)前公司的大數(shù)據(jù)實時鏈路如下圖,數(shù)據(jù)源是MySQL數(shù)據(jù)庫,然后通過Binlog Query的方式消費或者直接客戶端采集到Kafka,最終通過基于Spark/Flink實現(xiàn)的批流一體計算引擎處理,最后輸出到下游對應(yīng)的存儲。
2. 模型特征架構(gòu)的演進(jìn)
2.1 第一代架構(gòu)
廣告業(yè)務(wù)發(fā)展初期,為了提升策略迭代效率,整理出一套通用的特征生產(chǎn)框架,該框架由三部分組成:特征統(tǒng)計、特征推送和特征獲取模型訓(xùn)練。如下圖所示:
?客戶端以及服務(wù)端數(shù)據(jù)先通過統(tǒng)一服務(wù)Sink到HDFS上?基于基HDFS數(shù)據(jù),統(tǒng)計特定維度的總量、分布等統(tǒng)計類特征并推送到Codis中?從Codis中獲取特征小時維度模型增量Training,讀取HDFS文件進(jìn)行天級別增量Training
該方案能夠滿足算法的迭代,但是有以下幾個問題
?由于Server端直接Put本地文件到HDFS上無法做到根據(jù)事件時間精準(zhǔn)分區(qū),導(dǎo)致數(shù)據(jù)源不同存在口徑問題?不可控的小文件、空文件問題?數(shù)據(jù)格式單一,只支持json格式?用戶使用成本較高,特征抽取需要不斷的Coding?整個架構(gòu)擴(kuò)展性較差
為解決上述問題,我們對第一代架構(gòu)進(jìn)行了演進(jìn)和改善,構(gòu)建了第二代批流一體架構(gòu)(另外該架構(gòu)升級也是筆者在餓了么進(jìn)行架構(gòu)升級的演進(jìn)路線)。
2.2 第二代架構(gòu)
2.2.1 批流一體平臺的構(gòu)建
首先將數(shù)據(jù)鏈路改造為實時架構(gòu),將Spark Structured Streaming(下文統(tǒng)一簡稱SS)與Flink SQL語法統(tǒng)一,同時實現(xiàn)與Flink SQL語法大體上一致的批流一體架構(gòu),并且做了一些功能上的增強(qiáng)與優(yōu)化。
為什么有了Flink還需要支持SS呢?主要有以下幾點原因
?Spark生態(tài)相對更完善,當(dāng)然現(xiàn)在Flink也做的非常好了?用戶使用習(xí)慣問題,有些用戶對從Spark遷移到Flink沒有多大訴求?SS Micro Batch引擎的抽象做批流統(tǒng)一更加絲滑?相比Flink純內(nèi)存的計算模型,在延遲不敏感的場景Spark更友好
這里舉一個例子,比如批流一體引擎SS與Flink分別創(chuàng)建Kafka table并寫入到ClickHouse,語法分別如下
Spark Structured Streaming語法如下
--Spark Structured StreamingCREATE STREAM spark ( ad_id STRING, ts STRING, event_ts as to_timestamp(ts)) WITH ('connector' = 'kafka','topic' = 'xx','properties.bootstrap.servers'='xx','properties.group.id'='xx','startingOffsets'='earliest','eventTimestampField' = 'event_ts','watermark' = '60 seconds','format'='json');create SINK ck( ad_id STRING, ts STRING, event_ts timestamp) WITH( 'connector'='jdbc', 'url'='jdbc:clickhouse://host:port/db', 'table-name'='table', 'username'='user', 'password'='pass', 'sink.buffer-flush.max-rows'='10', 'sink.buffer-flush.interval' = '5s', 'sink.parallelism' = '3' 'checkpointLocation'= 'checkpoint_path',);insert into ck select * from spark ;Flink SQL語法如下
CREATE TABLE flink ( ad_id STRING, ts STRING, event_ts as to_timestamp(ts) )WITH ('connector' = 'kafka','topic' = 'xx','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='300s','format' = 'json');CREATE TABLE ck ( ad_id VARCHAR, ts VARCHAR, event_ts timestamp(3) PRIMARY KEY (ad_id) NOT ENFORCED) WITH ('connector'='jdbc', 'url'='jdbc:clickhouse://host:port/db','table-name'='table','username'='user','password'='pass','sink.buffer-flush.max-rows'='10','sink.buffer-flush.interval' = '5s','sink.parallelism' = '3');insert into ck select * from flink ;2.2.2 模型特征處理新架構(gòu)
新的模型特征處理采用批流一體的架構(gòu),上游對接數(shù)據(jù)源還是Kafka,模型主要有兩個訴求
?支持增量讀取方式減少模型更新的實效性?利用CDC來實現(xiàn)特征的回補
整個流程如下圖
2.2.3 Hudi、Delta還是Iceberg
3個項目都是目前活躍的開源數(shù)據(jù)湖方案,feature to feature的展開詳細(xì)說篇幅太長,大致列舉一下各自的優(yōu)缺點。
其實通過對比可以發(fā)現(xiàn)各有優(yōu)缺點,但往往會因為訴求不同,在實際落地生產(chǎn)時3種選型會存在同時多個共存的情況,為什么我們在模型特征的場景最終選擇了Hudi呢?主要有以下幾點
?國內(nèi)Hudi社區(qū)非常活躍,問題可以很快得到解決?Hudi對Spark2的支持更加友好,公司算法還是Spark2為主?算法希望有增量查詢的能力,而增量查詢能力是Hudi原生主打的能力,與我們的場景非常匹配?Hudi非常適合CDC場景,對CDC場景支持非常完善
2.2.4 方案上線
我們計劃用Spark跟Flink雙跑,通過數(shù)據(jù)質(zhì)量以及資源成本來選擇合適的計算引擎。選擇的一個case是廣告曝光ed流跟用戶點擊Click流Join之后落地到Hudi,然后算法增量查詢抽取特征更新模型。
2.2.4.1 Flink方案
最初我們用的是Flink 1.12.2 + Hudi 0.8.0,但是實際上發(fā)現(xiàn)任務(wù)跑起來并不順利,使用master最新代碼0.9.0-SNAPSHOT之后任務(wù)可以按照預(yù)期運行,運行的Flink SQL如下
CREATE TABLE ed ( `value` VARCHAR, ts as get_json_object(`value`,'$.ts'), event_ts as to_timestamp(ts), WATERMARK FOR event_ts AS event_ts - interval '1' MINUTE, proctime AS PROCTIME())WITH ('connector' = 'kafka','topic' = 'ed','scan.startup.mode' = 'group-offsets','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='100s','scan.startup.mode'='group-offsets','format'='schemaless');CREATE TABLE click ( req_id VARCHAR, ad_id VARCHAR, ts VARCHAR, event_ts as to_timestamp(ts), WATERMARK FOR event_ts AS event_ts - interval '1' MINUTE, proctime AS PROCTIME())WITH ('connector' = 'kafka','topic' = 'click','properties.bootstrap.servers'='xx','scan.startup.mode' = 'group-offsets','properties.bootstrap.servers'='xx','properties.group.id'='xx','scan.topic-partition-discovery.interval'='100s','format'='json');CREATE TABLE hudi(uuid VARCHAR,ts VARCHAR,json_info VARCHAR, is_click INT,dt VARCHAR,`hour` VARCHAR,PRIMARY KEY (uuid) NOT ENFORCED)PARTITIONED BY (dt,`hour`)WITH ( 'connector' = 'hudi', 'path' = 'hdfs:///xx', 'write.tasks' = '10', 'write.precombine.field'='ts', 'compaction.tasks' = '1', 'table.type' = 'COPY_ON_WRITE' );insert into hudi SELECT concat(req_id, ad_id) uuid, date_format(event_ts,'yyyyMMdd') AS dt, date_format(event_ts,'HH') `hour`, concat(ts, '.', cast(is_click AS STRING)) AS ts, json_info,is_clickFROM (SELECT t1.req_id,t1.ad_id,t1.ts,t1.json_info, if(t2.req_id <> t1.req_id,0,1) as is_click, ROW_NUMBER() OVER (PARTITION BY t1.req_id,t1.ad_id,t1.ts ORDER BY if(t2.req_id <> t1.req_id,0,1) DESC) as row_num FROM (select ts,event_ts,map_info['req_id'] req_id,map_info['ad_id'] ad_id, `value` as json_info from ed,LATERAL TABLE(json_tuple(`value`,'req_id','ad_id')) as T(map_info)) t1 LEFT JOIN click t2 ON t1.req_id=t1.req_id and t1.ad_id=t2.ad_id and t2.event_ts between t1.event_ts - INTERVAL '10' MINUTE and t1.event_ts + INTERVAL '4' MINUTE ) a where a.row_num=1;標(biāo)注:上述SQL中有幾處與官方SQL不一致,主要是實現(xiàn)了統(tǒng)一規(guī)范Schema為一列的Schemaless的Format、與Spark/Hive語義基本一致的get_json_object以及json_tuple UDF,這些都是在批流一體引擎做的功能增強(qiáng)的一小部分。
但是在運行一周后,面臨著業(yè)務(wù)上線Delay的壓力以及暴露出來的兩個問題讓我們不得不先暫時放棄Flink方案
?任務(wù)反壓的問題(無論如何去調(diào)整資源似乎都會出現(xiàn)嚴(yán)重的反壓,雖然最終我們通過在寫入Hudi之前增加一個upsert-kafka的中間流程解決了,但鏈路過長這并不是我們預(yù)期內(nèi)的)?還有一點是任務(wù)存在丟數(shù)據(jù)的風(fēng)險,對比Spark方案發(fā)現(xiàn)Flink會有丟數(shù)據(jù)的風(fēng)險
標(biāo)注:這個case并非Flink集成Hudi不夠,國內(nèi)已經(jīng)有很多使用Flink引擎寫入Hudi的實踐,但在我們場景下因為為了確保上線時間,沒有太多時間細(xì)致排查問題。實際上我們這邊Kafka -> Hive鏈路有95%的任務(wù)都使用Flink替代了Spark Structured Streaming(SS)
2.2.4.2 Spark方案
由于沒有在Hudi官方網(wǎng)站上找到SS集成的說明,一開始筆者快速實現(xiàn)了SS與Hudi的集成,但是在通讀Hudi代碼之后發(fā)現(xiàn)其實社區(qū)早已有了SS的完整實現(xiàn),另外咨詢社區(qū)同學(xué)leesf之后給出的反饋是當(dāng)前SS的實現(xiàn)也很穩(wěn)定。稍作適配SS版本的任務(wù)也在一天之內(nèi)上線了,任務(wù)SQL如下
CREATE STREAM ed ( value STRING, ts as get_json_object(value,'$.ts'), event_ts as to_timestamp(get_json_object(value,'$.ts'))) WITH ('connector' = 'kafka','topic' = 'ed','properties.bootstrap.servers'='xx','properties.group.id'='xx','startingOffsets'='earliest','minPartitions' = '60','eventTimestampField' = 'event_ts','maxOffsetsPerTrigger' = '250000', 'watermark' = '60 seconds','format'='schemaless');CREATE STREAM click ( req_id STRING, ad_id STRING, ts STRING, event_ts as to_timestamp(ts)) WITH ('connector' = 'kafka','topic' = 'click','properties.bootstrap.servers'='xxxx'properties.group.id'='dw_ad_algo_naga_dsp_ed_click_rt','startingOffsets'='earliest','maxOffsetsPerTrigger' = '250000','eventTimestampField' = 'event_ts','minPartitions' = '60','watermark' = '60 seconds','format'='json');--可以動態(tài)注冊python、java、scala udfcreate python function py_f with ( 'code' = 'def apply(self,m): return 'python_{}'.format(m)','methodName'= 'apply','dataType' = 'string');create SINK hudi(uuid STRING,dt STRING,hour STRING,ts STRING,json_info STRING, is_click INT) WITH ( 'connector'='hudi', 'hoodie.table.name' = 'ed_click', 'path' ='hdfs:///xx', 'hoodie.datasource.write.recordkey.field' = 'uuid', 'hoodie.datasource.write.precombine.field' = 'ts', 'hoodie.datasource.write.operation' = 'upsert', 'hoodie.datasource.write.partitionpath.field' = 'dt,hour', 'hoodie.datasource.write.keygenerator.class'= 'org.apache.hudi.keygen.ComplexKeyGenerator', 'hoodie.datasource.write.table.type' = 'COPY_ON_WRITE', 'hoodie.datasource.write.hive_style_partitioning'='true', 'hoodie.datasource.write.streaming.ignore.failed.batch'='false', 'hoodie.keep.min.commits'='120', 'hoodie.keep.max.commits'='180', 'hoodie.cleaner.commits.retained'='100', --'hoodie.datasource.write.insert.drop.duplicates' = 'true', --'hoodie.fail.on.timeline.archiving'='false', --'hoodie.datasource.hive_sync.table'='true', -- 'hoodie.datasource.hive_sync.database'='ods_test', -- 'hoodie.datasource.hive_sync.table'='ods_test_hudi_test2', -- 'hoodie.datasource.hive_sync.use_jdbc'='false', -- 'hoodie.datasource.meta.sync.enable' ='true', -- 'hoodie.datasource.hive_sync.partition_fields'='dt,hour', -- 'hoodie.datasource.hive_sync.partition_extractor_class'='org.apache.hudi.hive.MultiPartKeysValueExtractor', 'trigger'='30', 'checkpointLocation'= 'checkpoint_path');INSERT INTO hudiSELECT concat(req_id, ad_id) uuid, date_format(ts,'yyyyMMdd') dt, date_format(ts,'HH') hour, concat(ts, '.', cast(is_click AS STRING)) AS ts, json_info, is_clickFROM ( SELECT t1.req_id, t1.ad_id, t1.ts, t1.json_info, IF(t2.req_id is null, 0, 1) AS is_click FROM (select ts,event_ts,req_id,ad_id,value as json_info from ed lateral view json_tuple(value,'req_id','ad_id') tt as req_id,ad_id) t1 LEFT JOIN click t2 ON t1.req_id = t2.req_id AND t1.ad_id = t2.ad_id AND t2.event_ts BETWEEN t1.event_ts - INTERVAL 10 MINUTE AND t1.event_ts + INTERVAL 4 MINUTE ) tmp;標(biāo)注:Spark批流一體引擎在流語法上盡量與Flink對齊,同時我們實現(xiàn)了python/java/scala多語言udf的動態(tài)注冊以方便用戶使用
3. 新方案收益
通過鏈路架構(gòu)升級,基于Flink/Spark + Hudi的新的流批一體架構(gòu)帶來了如下收益
?構(gòu)建在Hudi上的批流統(tǒng)一架構(gòu)純SQL化極大的加速了用戶的開發(fā)效率?Hudi在COW以及MOR不同場景的優(yōu)化讓用戶有了更多的讀取方式選擇,增量查詢讓算法可以實現(xiàn)分鐘級別的模型更新,這也是用戶的強(qiáng)烈訴求?利用SS以及Flink的事件時間語義抹平了口徑上的Gap?Hudi自動Compact機(jī)制+小文件智能處理,對比第一版實現(xiàn)甚至對比需要手動Compact無疑極大的減輕了工程負(fù)擔(dān)
4. 踩過的坑
?寫Hudi重試失敗導(dǎo)致數(shù)據(jù)丟失風(fēng)險。解決辦法:hoodie.datasource.write.streaming.ignore.failed.batch設(shè)置為false,不然Task會間隔hoodie.datasource.write.streaming.retry.interval.ms(默認(rèn)2000)重試hoodie.datasource.write.streaming.retry.count(默認(rèn)3)?增量查詢Range太大,導(dǎo)致算法任務(wù)重試1小時之前的數(shù)據(jù)獲取到空數(shù)據(jù)。解決辦法:調(diào)大保留版本數(shù)對應(yīng)參數(shù)為hoodie.keep.min.commits、hoodie.keep.max.commits調(diào)大cleanup retention版本數(shù)對應(yīng)參數(shù)為hoodie.cleaner.commits.retained?Upsert模式下數(shù)據(jù)丟失問題。解決辦法:hoodie.datasource.write.insert.drop.duplicates設(shè)置為false,這個參數(shù)會將已經(jīng)存在index的record丟棄,如果存在update的record會被丟棄?Spark讀取hudi可能會存在path not exists的問題,這個是由于cleanup導(dǎo)致的,解決辦法:調(diào)整文件版本并進(jìn)行重試讀取
總結(jié)
以上是生活随笔為你收集整理的基于 Apache Hudi 构建流批一体系统实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 机器学习从入门到精通50讲(九)-基于
- 下一篇: 一文教你掌握 ZooKeeper 核心知