Flink 靠什么征服饿了么工程师?
?
阿里妹導(dǎo)讀:本文將為大家展示餓了么大數(shù)據(jù)平臺(tái)在實(shí)時(shí)計(jì)算方面所做的工作,以及計(jì)算引擎的演變之路,你可以借此了解Strom、Spark、Flink的優(yōu)缺點(diǎn)。如何選擇一個(gè)合適的實(shí)時(shí)計(jì)算引擎?Flink憑借何種優(yōu)勢(shì)成為餓了么首選?本文將帶你一一解開謎題。
本文作者:易偉平
整理:姬平&鄭寧
平臺(tái)現(xiàn)狀
下面是目前餓了么平臺(tái)現(xiàn)狀架構(gòu)圖:
來源于多個(gè)數(shù)據(jù)源的數(shù)據(jù)寫到kafka里,計(jì)算引擎主要是Storm,Spark和Flink,計(jì)算引擎出來的結(jié)果數(shù)據(jù)再落地到各種存儲(chǔ)上。
目前Storm任務(wù)大概有100多個(gè),Spark任務(wù)有50個(gè)左右,Flink暫時(shí)還比較少。
目前我們集群規(guī)模每天數(shù)據(jù)量有60TB,計(jì)算次數(shù)有1000000000,節(jié)點(diǎn)有400個(gè)。這里要提一下,Spark和Flink都是on yarn的,其中Flink onyarn主要是用作任務(wù)間jobmanager隔離, Storm是standalone模式。
應(yīng)用場(chǎng)景
1.一致性語義
在講述我們應(yīng)用場(chǎng)景之前,先強(qiáng)調(diào)實(shí)時(shí)計(jì)算一個(gè)重要概念, 一致性語義:
1) at-most-once:即fire and forget,我們通常寫一個(gè)java的應(yīng)用,不去考慮源頭的offset管理,也不去考慮下游的冪等性的話,就是簡(jiǎn)單的at-most-once,數(shù)據(jù)來了,不管中間狀態(tài)怎樣,寫數(shù)據(jù)的狀態(tài)怎樣,也沒有ack機(jī)制。
2) at-least-once: 重發(fā)機(jī)制,重發(fā)數(shù)據(jù)保證每條數(shù)據(jù)至少處理一次。
3) exactly-once: 使用粗Checkpoint粒度控制來實(shí)現(xiàn)exactly-once,我們講的exactly-once大多數(shù)指計(jì)算引擎內(nèi)的exactly-once,即每一步的operator內(nèi)部的狀態(tài)是否可以重放;上一次的job如果掛了,能否從上一次的狀態(tài)順利恢復(fù),沒有涉及到輸出到sink的冪等性概念。
4) at-least-one + idempotent = exactly-one:如果我們能保證說下游有冪等性的操作,比如基于mysql實(shí)現(xiàn) update on duplicate key;或者你用es, cassandra之類的話,可以通過主鍵key去實(shí)現(xiàn)upset的語義, 保證at-least-once的同時(shí),再加上冪等性就是exactly-once。
2. Storm
餓了么早期都是使用Storm,16年之前還是Storm,17年才開始有Sparkstreaming, Structed-streaming。Storm用的比較早,主要有下面幾個(gè)概念:
1) 數(shù)據(jù)是tuple-based
2) 毫秒級(jí)延遲
3) 主要支持java, 現(xiàn)在利用apache beam也支持python和go。
4) Sql的功能還不完備,我們自己內(nèi)部封裝了typhon,用戶只需要擴(kuò)展我們的一些接口,就可以使用很多主要的功能;flux是Storm的一個(gè)比較好的工具,只需要寫一個(gè)yaml文件,就可以描述一個(gè)Storm任務(wù),某種程度上說滿足了一些需求,但還是要求用戶是會(huì)寫java的工程師,數(shù)據(jù)分析師就使用不了。
★ 2.1 總結(jié)
1) 易用性:因?yàn)槭褂瞄T檻高,從而限制了它的推廣。
2)StateBackend:更多的需要外部存儲(chǔ),比如redis之類的kv存儲(chǔ)。
3) 資源分配方面:用worker和slot提前設(shè)定的方式,另外由于優(yōu)化點(diǎn)做的較少,引擎吞吐量相對(duì)比較低一點(diǎn)。
3. Sparkstreaming
有一天有個(gè)業(yè)務(wù)方過來提需求說 我們能不能寫個(gè)sql,幾分鐘內(nèi)就可以發(fā)布一個(gè)實(shí)時(shí)計(jì)算任務(wù)。 于是我們開始做Sparkstreaming。它的主要概念如下:
1) Micro-batch:需要提前設(shè)定一個(gè)窗口,然后在窗口內(nèi)處理數(shù)據(jù)。
2) 延遲是秒級(jí)級(jí)別,比較好的情況是500ms左右。
3) 開發(fā)語言是java和scala。
4)streaming SQL,主要是我們的工作,我們希望提供streaming SQL的平臺(tái)。
特點(diǎn):
1) Spark生態(tài)和SparkSQL: 這是Spark比較好的地方,技術(shù)棧是統(tǒng)一的,SQL,圖計(jì)算,machine learning的包都是可以互調(diào)的。因?yàn)樗茸龅氖桥幚?#xff0c;和Flink不一樣,所以它天然的實(shí)時(shí)和離線的api是統(tǒng)一的。
2) Checkpointon hdfs。
3) onyarn:Spark是屬于hadoop生態(tài)體系,和yarn集成度高。
4) 高吞吐: 因?yàn)樗荕icro-batch的方式,吞吐也是比較高的。
下面給大家大致展示一下我們平臺(tái)用戶快速發(fā)布一個(gè)實(shí)時(shí)任務(wù)的操作頁(yè)面,它需要哪些步驟。我們這里不是寫DDL和DML語句,而是ui展示頁(yè)面的方式。
頁(yè)面里面會(huì)讓用戶選一些必要的參數(shù), 首先會(huì)選哪一個(gè)kafka集群,每個(gè)分區(qū)消費(fèi)多少,反壓也是默認(rèn)開啟的。消費(fèi)位置需要讓用戶每次去指定,有可能用戶下一次重寫實(shí)時(shí)任務(wù)的時(shí)候,可以根據(jù)業(yè)務(wù)需求去選擇offset消費(fèi)點(diǎn)。
中間就是讓用戶描述pipeline。 SQL就是kafka的多個(gè)topic,輸出選擇一個(gè)輸出表,SQL把上面消費(fèi)的kafka DStream注冊(cè)成表,然后寫一串pipeline,最后我們幫用戶封裝了一些對(duì)外sink(剛剛提到的各種存儲(chǔ)都支持,如果存儲(chǔ)能實(shí)現(xiàn)upsert語義的話,我們都是支持了的)。
★ 3.1 MultiStream-Join
雖然剛剛滿足一般無狀態(tài)批次內(nèi)的計(jì)算要求,但就有用戶想說, 我想做流的join怎么辦, 早期的Spark1.5可以參考Spark-streamingsql這個(gè)開源項(xiàng)目把 DStream注冊(cè)為一個(gè)表,然后對(duì)這個(gè)表做join的操作,但這只支持1.5之前的版本,Spark2.0推出structured streaming之后項(xiàng)目就廢棄了。我們有一個(gè)tricky的方式:
讓Sparkstreaming去消費(fèi)多個(gè)topic,但是我根據(jù)一些條件把消費(fèi)的DStream里面的每個(gè)批次RDD轉(zhuǎn)化為DataFrame,這樣就可以注冊(cè)為一張表,根據(jù)特定的條件,切分為兩張表,就可以簡(jiǎn)單的做個(gè)join,這個(gè)join的問題完全依賴于本次消費(fèi)的數(shù)據(jù),它們join的條件是不可控的,是比較tricky的方式。比如說下面這個(gè)例子,消費(fèi)兩個(gè)topic,然后簡(jiǎn)單通過filer條件,拆成兩個(gè)表,然后就可以做個(gè)兩張表的join,但它本質(zhì)是一個(gè)流。
★ 3.2 Exactly-once
exactly-once需要特別注意一個(gè)點(diǎn):
我們必須要求數(shù)據(jù)sink到外部存儲(chǔ)后,offset才能commit,不管是到zk,還是mysql里面,你最好保證它在一個(gè)transaction里面,而且必須在輸出到外部存儲(chǔ)(這里最好保證一個(gè)upsert語義,根據(jù)unique key來實(shí)現(xiàn)upset語義)之后,然后這邊源頭driver再根據(jù)存儲(chǔ)的offeset去產(chǎn)生kafka RDD,executor再根據(jù)kafka每個(gè)分區(qū)的offset去消費(fèi)數(shù)據(jù)。如果滿足這些條件,就可以實(shí)現(xiàn)端到端的exactly-once. 這是一個(gè)大前提。
★ 3.3 總結(jié)
1) Stateful Processing SQL ( <2.x mapWithState、updateStateByKey):我們要實(shí)現(xiàn)跨批次帶狀態(tài)的計(jì)算的話,在1.X版本,我們通過這兩個(gè)接口去做,但還是需要把這個(gè)狀態(tài)存到hdfs或者外部去,實(shí)現(xiàn)起來比較麻煩一點(diǎn)。
2) Real Multi-Stream Join:沒辦法實(shí)現(xiàn)真正的多個(gè)流join的語義。
3)End-To-End Exactly-Once Semantics:它的端到端的exactly-once語義實(shí)現(xiàn)起來比較麻煩,需要sink到外部存儲(chǔ)后還需要手動(dòng)的在事務(wù)里面提交offset。
4. STRUCTUREDSTREAMING
我們調(diào)研然后并去使用了Spark2.X之后帶狀態(tài)的增量計(jì)算。下面這個(gè)圖是官方網(wǎng)站的:
所有的流計(jì)算都參照了Google的 data flow,里面有個(gè)重要的概念:數(shù)據(jù)的processing time和event time,即數(shù)據(jù)的處理時(shí)間和真正的發(fā)生時(shí)間有個(gè)gap。于是流計(jì)算領(lǐng)域還有個(gè)watermark,當(dāng)前進(jìn)來的事件水位需要watermark來維持,watermark可以指定時(shí)間delay的范圍,在延遲窗口之外的數(shù)據(jù)是可以丟棄的,在業(yè)務(wù)上晚到的數(shù)據(jù)也是沒有意義的。
下面是structuredstreaming的架構(gòu)圖:
這里面就是把剛才Sparkstreaming講exactly-once的步驟1,2,3都實(shí)現(xiàn)了,它本質(zhì)上還是分批的batch方式,offset自己維護(hù),狀態(tài)存儲(chǔ)用的hdfs,對(duì)外的sink沒有做類似的冪等操作,也沒有寫完之后再去commit offset,它只是再保證容錯(cuò)的同時(shí)去實(shí)現(xiàn)內(nèi)部引擎的exactly-once。
★ 4.1 特點(diǎn)
1) Stateful Processing SQL&DSL:可以滿足帶狀態(tài)的流計(jì)算
2) Real Multi-Stream Join:可以通過Spark2.3實(shí)現(xiàn)多個(gè)流的join,多個(gè)流的join做法和Flink類似,你需要先定義兩個(gè)流的條件(主要是時(shí)間作為一個(gè)條件),比如說有兩個(gè)topic的流進(jìn)來,然后你希望通過某一個(gè)具體的schema中某個(gè)字段(通常是event time)來限定需要buffer的數(shù)據(jù),這樣可以實(shí)現(xiàn)真正意義上的流的join。
3)比較容易實(shí)現(xiàn)端到端的exactly-once的語義,只需要擴(kuò)展sink的接口支持冪等操作是可以實(shí)現(xiàn)exactly-once的。
特別說一下,Structuredstreaming和原生的streaming的api有一點(diǎn)區(qū)別,它c(diǎn)reate表的Dataframe的時(shí)候,是需要指定表的schema的,意味著你需要提前指定schema。另外它的watermark是不支持SQL的,于是我們加了一個(gè)擴(kuò)展,實(shí)現(xiàn)完全寫sql,可以從左邊到右邊的轉(zhuǎn)換(下圖),我們希望用戶不止是程序員,也希望不會(huì)寫程序的數(shù)據(jù)分析師等同學(xué)也能用到。
★ 4.2 總結(jié)
1) Trigger(Processing Time、 Continuous ):2.3之前主要基于processing Time,每個(gè)批次的數(shù)據(jù)處理完了立馬觸發(fā)下一批次的計(jì)算。2.3推出了record by record的持續(xù)處理的trigger。
2)Continuous Processing (Only Map-Like Operations):目前它只支持map like的操作,同時(shí)sql的支持度也有些限制。
3) LowEnd-To-End Latency With Exactly-Once Guarantees:端到端的exactly-once的保證需要自己做一些額外的擴(kuò)展, 我們發(fā)現(xiàn)kafka0.11版本提供了事務(wù)的功能,是可以從基于這方面考慮從而去實(shí)現(xiàn)從source到引擎再到sink,真正意義上的端到端的exactly-once。
4) CEP(Drools):我們發(fā)現(xiàn)有業(yè)務(wù)方需要提供cep 這樣復(fù)雜事件處理的功能,目前我們的語法無法直接支持,我們讓用戶使用規(guī)則引擎Drools,然后跑在每個(gè)executor上面,依靠規(guī)則引擎功能去實(shí)現(xiàn)cep。
于是基于以上幾個(gè)Spark structuredstreaming的特點(diǎn)和缺點(diǎn),我們考慮使用Flink來做這些事情。
5.Flink
Flink目標(biāo)是對(duì)標(biāo)Spark,流這塊是領(lǐng)先比較多,它野心也比較大,圖計(jì)算,機(jī)器學(xué)習(xí)等它都有,底層也是支持yarn,tez等。對(duì)于社區(qū)用的比較多的存儲(chǔ),Flink社區(qū)官方都支持比較好,相對(duì)來說。
Flink的框架圖:
Flink中的JobManager,相當(dāng)于Spark的driver角色,taskManger相當(dāng)于executor,里面的task也有點(diǎn)類似Spark的那些task。 不過Flink用的rpc是akka,同時(shí)Flink core自定義了內(nèi)存序列化框架,另外task無需像Spark每個(gè)stage的task必須相互等待而是處理完后即往下游發(fā)送數(shù)據(jù)。
Flink binary data處理operator:
?
Spark的序列化用戶一般會(huì)使用kryo或者java默認(rèn)的序列化,同時(shí)也有Tungsten項(xiàng)目對(duì)Spark程序做一jvm層面以及代碼生成方面的優(yōu)化。相對(duì)于Spark,Flink自己實(shí)現(xiàn)了基于內(nèi)存的序列化框架,里面維護(hù)著key和pointer的概念,它的key是連續(xù)存儲(chǔ),在cpu層面會(huì)做一些優(yōu)化,cache miss概率極低。比較和排序的時(shí)候不需要比較真正的數(shù)據(jù),先通過這個(gè)key比較,只有當(dāng)它相等的時(shí)候,才會(huì)從內(nèi)存中把這個(gè)數(shù)據(jù)反序列化出來,再去對(duì)比具體的數(shù)據(jù),這是個(gè)不錯(cuò)的性能優(yōu)化點(diǎn)。
Flink task chain:
Task中operatorchain,是比較好的概念。如果上下游數(shù)據(jù)分布不需要重新shuffle的話,比如圖中source是kafka source,后面跟的map只是一個(gè)簡(jiǎn)單的數(shù)據(jù)filter,我們把它放在一個(gè)線程里面,就可以減少線程上下文切換的代價(jià)。
并行度概念
比如說這里面會(huì)有5個(gè)task,就會(huì)有幾個(gè)并發(fā)線程去跑,chain起來的話放在一個(gè)線程去跑就可以提升數(shù)據(jù)傳輸性能。Spark是黑盒的,每個(gè)operator無法設(shè)并發(fā)度,而Flink可以對(duì)每個(gè)operator設(shè)并發(fā)度,這樣可以更靈活一點(diǎn),作業(yè)運(yùn)行起來對(duì)資源利用率也更高一點(diǎn)。
Spark 一般通過Spark.default.parallelism來調(diào)整并行度,有shuffle操作的話,并行度一般是通Spark.sql.shuffle.partitions參數(shù)來調(diào)整,實(shí)時(shí)計(jì)算的話其實(shí)應(yīng)該調(diào)小一點(diǎn),比如我們生產(chǎn)中和kafka的partition數(shù)調(diào)的差不多,batch在生產(chǎn)上會(huì)調(diào)得大一點(diǎn),我們?cè)O(shè)為1000,左邊的圖我們?cè)O(shè)并發(fā)度為2,最大是10,這樣首先分2個(gè)并發(fā)去跑,另外根據(jù)key做一個(gè)分組的概念,最大分為10組,就可以做到把數(shù)據(jù)盡量的打散。
State & Checkpoint
因?yàn)镕link的數(shù)據(jù)是一條條過來處理,所以Flink中的每條數(shù)據(jù)處理完了立馬發(fā)給下游,而不像spark,需要等該operator所在的stage所有的task都完成了再往下發(fā)。
Flink有粗粒度的checkpoint機(jī)制,以非常小的代價(jià)為每個(gè)元素賦予一個(gè)snapshot概念,只有當(dāng)屬于本次snapshot的所有數(shù)據(jù)都進(jìn)來后才會(huì)觸發(fā)計(jì)算,計(jì)算完后,才把buffer數(shù)據(jù)往下發(fā),目前Flink sql沒有提供控制buffer timeout的接口,即我的數(shù)據(jù)要buffer多久才往下發(fā)??梢栽跇?gòu)建Flink context時(shí),指定buffer timeout為0,處理完的數(shù)據(jù)才會(huì)立馬發(fā)下去,不需要等達(dá)到一定閾值后再往下發(fā)。
Backend默認(rèn)是維護(hù)在jobmanager內(nèi)存,我們更多使用的的是寫到hdfs上,每個(gè)operator的狀態(tài)寫到rocksdb上,然后異步周期增量同步到外部存儲(chǔ)。
容錯(cuò)
圖中左半部分的紅色節(jié)點(diǎn)發(fā)生了failover,如果是at-least-once,則其最上游把數(shù)據(jù)重發(fā)一次就好;但如果是exactly-once,則需要每個(gè)計(jì)算節(jié)點(diǎn)從上一次失敗的時(shí)機(jī)重放。
Exactly Once Two-Phase Commit
Flink1.4之后有兩階段提交來支持exactly-once.它的概念是從上游kafka消費(fèi)數(shù)據(jù)后,每一步都會(huì)發(fā)起一次投票,來記錄狀態(tài),通過checkpoint的屏障來處理標(biāo)記,只有最后再寫到kafka(0.11之后的版本),只有最后完成之后,才會(huì)把每一步的狀態(tài)讓jobmanager中的cordinator去通知可以固化下來,這樣實(shí)現(xiàn)exactly-once。
Savepoints
還有一點(diǎn)Flink比較好的就是,基于它的checkpoint來實(shí)現(xiàn)savepoint功能。業(yè)務(wù)方需要每個(gè)應(yīng)用恢復(fù)節(jié)點(diǎn)不一樣,希望恢復(fù)到的版本也是可以指定的,這是比較好的。這個(gè)savepoint不只是數(shù)據(jù)的恢復(fù),也有計(jì)算狀態(tài)的恢復(fù)。
特點(diǎn):
1) Trigger (Processing Time、 Event Time、IngestionTime):對(duì)比下,Flink支持的流式語義更豐富,不僅支持Processing Time, 也支持Event time和Ingestion Time。
2)Continuous Processing & Window:支持純意義上的持續(xù)處理,recordby record的,window也比Spark處理的好。
3) Low End-To-End Latency With Exactly-Once Guarantees:因?yàn)橛袃呻A段提交,用戶是可以選擇在犧牲一定吞吐量的情況下,根據(jù)業(yè)務(wù)需求情況來調(diào)整來保證端到端的exactly-once。
4) CEP:支持得好。
5) Savepoints:可以根據(jù)業(yè)務(wù)的需求做一些版本控制。
也有做的還不好的:
1)SQL (Syntax Function、Parallelism):SQL功能還不是很完備,大部分用戶是從hive遷移過來,Spark支持hive覆蓋率達(dá)到99%以上。 SQL函數(shù)不支持,目前還無法對(duì)單個(gè)operator做并行度的設(shè)置。
2) ML、Graph等:機(jī)器學(xué)習(xí),圖計(jì)算等其他領(lǐng)域比Spark要弱一點(diǎn),但社區(qū)也在著力持續(xù)改進(jìn)這個(gè)問題。
我們期待和你一起,把Flink建設(shè)得更好,幫助更多開發(fā)者。
大數(shù)據(jù)計(jì)算引擎,你pick哪個(gè)?
訪問:http://cn.mikecrm.com/d0nUFOK?from=singlemessage&isappinstalled=0
參與問卷調(diào)研,約需15分鐘,認(rèn)真答題的小伙伴還有機(jī)會(huì)獲得定制禮品。
?
每天一篇技術(shù)文章,
看不過癮?
關(guān)注“阿里巴巴機(jī)器智能”微信公眾號(hào)
發(fā)現(xiàn)更多AI干貨。
總結(jié)
以上是生活随笔為你收集整理的Flink 靠什么征服饿了么工程师?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于阿里基础设施,你要知道的都在这里
- 下一篇: 一张图,看懂阿里云的“飞天”史