实时流处理框架Storm、Spark Streaming、Samza、Flink,孰优孰劣?!
https://mp.weixin.qq.com/s?__biz=MzU1NDA4NjU2MA==&mid=2247486490&idx=1&sn=e25a05be8cf98ce7eb56a8a2a64ab1bd&chksm=fbe9b5d5cc9e3cc38541559dc54225c5be7ea2d15847badb5d958c30f5cb06855277a3ba0d13&scene=27#wechat_redirect
?
?
分布式流處理需求日益增加,包括支付交易、社交網(wǎng)絡、物聯(lián)網(wǎng)(IOT)、系統(tǒng)監(jiān)控等。業(yè)界對流處理已經(jīng)有幾種適用的框架來解決,下面我們來比較各流處理框架的相同點以及區(qū)別。
?
分布式流處理是對無邊界數(shù)據(jù)集進行連續(xù)不斷的處理、聚合和分析。它跟MapReduce一樣是一種通用計算,但我們期望延遲在毫秒或者秒級別。這類系統(tǒng)一般采用有向無環(huán)圖(DAG)。
?
DAG是任務鏈的圖形化表示,我們用它來描述流處理作業(yè)的拓撲。如下圖,數(shù)據(jù)從sources流經(jīng)處理任務鏈到sinks。單機可以運行DAG,但本篇文章主要聚焦在多臺機器上運行DAG的情況。?
?
關注點
當選擇不同的流處理系統(tǒng)時,有以下幾點需要注意的:
?
-
運行時和編程模型:平臺框架提供的編程模型決定了許多特色功能,編程模型要足夠處理各種應用場景。這是一個相當重要的點,后續(xù)會繼續(xù)。
-
函數(shù)式原語:流處理平臺應該能提供豐富的功能函數(shù),比如,map或者filter這類易擴展、處理單條信息的函數(shù);處理多條信息的函數(shù)aggregation;跨數(shù)據(jù)流、不易擴展的操作join。
-
狀態(tài)管理:大部分應用都需要保持狀態(tài)處理的邏輯。流處理平臺應該提供存儲、訪問和更新狀態(tài)信息。
-
消息傳輸保障:消息傳輸保障一般有三種:at most once,at least once和exactly once。At most once的消息傳輸機制是每條消息傳輸零次或者一次,即消息可能會丟失;A t least once意味著每條消息會進行多次傳輸嘗試,至少一次成功,即消息傳輸可能重復但不會丟失;Exactly once的消息傳輸機制是每條消息有且只有一次,即消息傳輸既不會丟失也不會重復。
-
容錯:流處理框架中的失敗會發(fā)生在各個層次,比如,網(wǎng)絡部分,磁盤崩潰或者節(jié)點宕機等。流處理框架應該具備從所有這種失敗中恢復,并從上一個成功的狀態(tài)(無臟數(shù)據(jù))重新消費。
-
性能:延遲時間(Latency),吞吐量(Throughput)和擴展性(Scalability)是流處理應用中極其重要的指標。?
平臺的成熟度和接受度:成熟的流處理框架可以提供潛在的支持,可用的庫,甚至開發(fā)問答幫助。選擇正確的平臺會在這方面提供很大的幫助。
運行時和編程模型
運行時和編程模型是一個系統(tǒng)最重要的特質(zhì),因為它們定義了表達方式、可能的操作和將來的局限性。因此,運行時和編程模型決定了系統(tǒng)的能力和適用場景。
?
實現(xiàn)流處理系統(tǒng)有兩種完全不同的方式:一種是稱作原生流處理,意味著所有輸入的記錄一旦到達即會一個接著一個進行處理。?
?
?
第二種稱為微批處理。把輸入的數(shù)據(jù)按照某種預先定義的時間間隔(典型的是幾秒鐘)分成短小的批量數(shù)據(jù),流經(jīng)流處理系統(tǒng)。?
?
?
兩種方法都有其先天的優(yōu)勢和不足。
?
首先以原生流處理開始,原生流處理的優(yōu)勢在于它的表達方式。數(shù)據(jù)一旦到達立即處理,這些系統(tǒng)的延遲性遠比其它微批處理要好。除了延遲性外,原生流處理的狀態(tài)操作也容易實現(xiàn),后續(xù)將詳細講解。
?
一般原生流處理系統(tǒng)為了達到低延遲和容錯性會花費比較大的成本,因為它需要考慮每條記錄。原生流處理的負載均衡也是個問題。比如,我們處理的數(shù)據(jù)按key分區(qū),如果分區(qū)的某個key是資源密集型,那這個分區(qū)很容易成為作業(yè)的瓶頸。
?
接下來看下微批處理。將流式計算分解成一系列短小的批處理作業(yè),也不可避免的減弱系統(tǒng)的表達力。像狀態(tài)管理或者join等操作的實現(xiàn)會變的困難,因為微批處理系統(tǒng)必須操作整個批量數(shù)據(jù)。并且,batch interval會連接兩個不易連接的事情:基礎屬性和業(yè)務邏輯。
?
相反地,微批處理系統(tǒng)的容錯性和負載均衡實現(xiàn)起來非常簡單,因為微批處理系統(tǒng)僅發(fā)送每批數(shù)據(jù)到一個worker節(jié)點上,如果一些數(shù)據(jù)出錯那就使用其它副本。微批處理系統(tǒng)很容易建立在原生流處理系統(tǒng)之上。
?
編程模型一般分為組合式和聲明式。組合式編程提供基本的構(gòu)建模塊,它們必須緊密結(jié)合來創(chuàng)建拓撲。新的組件經(jīng)常以接口的方式完成。相對應地,聲明式API操作是定義的高階函數(shù)。它允許我們用抽象類型和方法來寫函數(shù)代碼,并且系統(tǒng)創(chuàng)建拓撲和優(yōu)化拓撲。聲明式API經(jīng)常也提供更多高級的操作(比如,窗口函數(shù)或者狀態(tài)管理)。后面很快會給出樣例代碼。
主流流處理系統(tǒng)
有一系列各種實現(xiàn)的流處理框架,不能一一列舉,這里僅選出主流的流處理解決方案,并且支持Scala API。因此,我們將詳細介紹Apache Storm,Trident,Spark Streaming,Samza和Apache Flink。前面選擇講述的雖然都是流處理系統(tǒng),但它們實現(xiàn)的方法包含了各種不同的挑戰(zhàn)。這里暫時不講商業(yè)的系統(tǒng),比如Google MillWheel或者Amazon Kinesis,也不會涉及很少使用的Intel GearPump或者Apache Apex。?
?
Apache Storm最開始是由Nathan Marz和他的團隊于2010年在數(shù)據(jù)分析公司BackType開發(fā)的,后來BackType公司被Twitter收購,接著Twitter開源Storm并在2014年成為Apache頂級項目。毋庸置疑,Storm成為大規(guī)模流數(shù)據(jù)處理的先鋒,并逐漸成為工業(yè)標準。Storm是原生的流處理系統(tǒng),提供low-level的API。Storm使用Thrift來定義topology和支持多語言協(xié)議,使得我們可以使用大部分編程語言開發(fā),Scala自然包括在內(nèi)。
?
Trident是對Storm的一個更高層次的抽象,Trident最大的特點以batch的形式進行流處理。Trident簡化topology構(gòu)建過程,增加了窗口操作、聚合操作或者狀態(tài)管理等高級操作,這些在Storm中并不支持。相對應于Storm的At most once流傳輸機制,Trident提供了Exactly once傳輸機制。Trident支持Java,Clojure和Scala。
?
當前Spark是非常受歡迎的批處理框架,包含Spark SQL,MLlib和Spark Streaming。Spark的運行時是建立在批處理之上,因此后續(xù)加入的Spark Streaming也依賴于批處理,實現(xiàn)了微批處理。接收器把輸入數(shù)據(jù)流分成短小批處理,并以類似Spark作業(yè)的方式處理微批處理。Spark Streaming提供高級聲明式API(支持Scala,Java和Python)。
?
Samza最開始是專為LinkedIn公司開發(fā)的流處理解決方案,并和LinkedIn的Kafka一起貢獻給社區(qū),現(xiàn)已成為基礎設施的關鍵部分。Samza的構(gòu)建嚴重依賴于基于log的Kafka,兩者緊密耦合。Samza提供組合式API,當然也支持Scala。
?
最后來介紹Apache Flink。Flink是個相當早的項目,開始于2008年,但只在最近才得到注意。Flink是原生的流處理系統(tǒng),提供high level的API。Flink也提供API來像Spark一樣進行批處理,但兩者處理的基礎是完全不同的。Flink把批處理當作流處理中的一種特殊情況。在Flink中,所有的數(shù)據(jù)都看作流,是一種很好的抽象,因為這更接近于現(xiàn)實世界。
?
快速的介紹流處理系統(tǒng)之后,讓我們以下面的表格來更好清晰的展示它們之間的不同:?
?
Word Count
Wordcount之于流處理框架學習,就好比hello world之于編程語言學習。它能很好的展示各流處理框架的不同之處,讓我們從Storm開始看看如何實現(xiàn)Wordcount:
?
TopologyBuilder?builder?=?new?TopologyBuilder();
builder.setSpout("spout",?new?RandomSentenceSpout(),?5);
builder.setBolt("split",?new?Split(),?8).shuffleGrouping("spout");
builder.setBolt("count",?new?WordCount(),?12).fieldsGrouping("split",?newFields("word"));
?
...
?
Map<String,?Integer>?counts?=?new?HashMap<String,?Integer>();
?
public?void?execute(Tuple?tuple,?BasicOutputCollector?collector)?{
??String?word?=?tuple.getString(0);
??Integer?count?=?counts.containsKey(word)???counts.get(word)?+?1?:?1;
? counts.put(word,?count);
? collector.emit(new?Values(word,?count));
}
?
首先,定義topology。第二行代碼定義一個spout,作為數(shù)據(jù)源。然后是一個處理組件bolt,分割文本為單詞。接著,定義另一個bolt來計算單詞數(shù)(第四行代碼)。也可以看到魔數(shù)5,8和12,這些是并行度,定義集群每個組件執(zhí)行的獨立線程數(shù)。第八行到十五行是實際的WordCount bolt實現(xiàn)。因為Storm不支持內(nèi)建的狀態(tài)管理,所有這里定義了一個局部狀態(tài)。
?
按之前描述,Trident是對Storm的一個更高層次的抽象,Trident最大的特點以batch的形式進行流處理。除了其它優(yōu)勢,Trident提供了狀態(tài)管理,這對wordcount實現(xiàn)非常有用。
?
public?static?StormTopology?buildTopology(LocalDRPC?drpc)?{
FixedBatchSpout?spout?=?...
?
TridentTopology?topology?=?new?TridentTopology();
TridentState?wordCounts?=?topology.newStream("spout1",?spout)
.each(new?Fields("sentence"),new?Split(),?new?Fields("word"))
.groupBy(new?Fields("word"))
.persistentAggregate(new?MemoryMapState.Factory(),
new?Count(),?new?Fields("count"));
?
...
?
}
?
如你所見,上面代碼使用higher level操作,比如each(第七行代碼)和groupby(第八行代碼)。并且使用Trident管理狀態(tài)來存儲單詞數(shù)(第九行代碼)。
?
下面是時候祭出提供聲明式API的Apache Spark。記住,相對于前面的例子,這些代碼相當簡單,幾乎沒有冗余代碼。下面是簡單的流式計算單詞數(shù):
?
val conf?=?new?SparkConf().setAppName("wordcount")
val ssc?=?new?StreamingContext(conf,?Seconds(1))
?
val text?=?...
?
val counts?=?text.flatMap(line?=>?line.split(" "))
.map(word?=>?(word,?1))
.reduceByKey(_?+?_)
?
counts.print()
?
ssc.start()
ssc.awaitTermination()
?
每個Spark Streaming的作業(yè)都要有StreamingContext,它是流式函數(shù)的入口。StreamingContext加載第一行代碼定義的配置conf,但更重要地,第二行代碼定義batch interval(這里設置為1秒)。第六行到八行代碼是整個單詞數(shù)計算。這些是標準的函數(shù)式代碼,Spark定義topology并且分布式執(zhí)行。第十二行代碼是每個Spark Streaming作業(yè)最后的部分:啟動計算。記住,Spark Streaming作業(yè)一旦啟動即不可修改。?
接下來看下Apache Samza,另外一個組合式API例子:
?
class?WordCountTask?extends?StreamTask?{
?
?override?def?process(envelope:?IncomingMessageEnvelope,?collector:MessageCollector,
? ?coordinator:?TaskCoordinator)?{
?
? ?val text?=?envelope.getMessage.asInstanceOf[String]
?
? ?val counts?=?text.split(" ").foldLeft(Map.empty[String,?Int])?{
? ? ?(count,?word)?=>?count?+?(word?->?(count.getOrElse(word,?0)?+?1))
? ?}
?
? ?collector.send(new?OutgoingMessageEnvelope(new?SystemStream("kafka","wordcount"),?counts))
?
}
?
Samza的屬性配置文件定義topology,為了簡明這里并沒把配置文件放上來。定義任務的輸入和輸出,并通過Kafka topic通信。在單詞數(shù)計算整個topology是WordCountTask。在Samza中,實現(xiàn)特殊接口定義組件StreamTask,在第三行代碼重寫方法process。它的參數(shù)列表包含所有連接其它系統(tǒng)的需要。第八行到十行簡單的Scala代碼是計算本身。?
?
Flink的API跟Spark Streaming是驚人的相似,但注意到代碼里并未設置batch interval。
?
val env?=?ExecutionEnvironment.getExecutionEnvironment
?
val text?=?env.fromElements(...)
val counts?=?text.flatMap?(?_.split(" ")?)
??.map?(?(_,?1)?)
??.groupBy(0)
??.sum(1)
?
counts.print()
?
env.execute("wordcount")
?
上面的代碼是相當?shù)闹卑?#xff0c;僅僅只是幾個函數(shù)式調(diào)用,Flink支持分布式計算。
容錯性
流處理系統(tǒng)的容錯性與生俱來的比批處理系統(tǒng)難實現(xiàn)。當批處理系統(tǒng)中出現(xiàn)錯誤時,我們只需要把失敗的部分簡單重啟即可;但對于流處理系統(tǒng),出現(xiàn)錯誤就很難恢復。因為線上許多作業(yè)都是7 x 24小時運行,不斷有輸入的數(shù)據(jù)。流處理系統(tǒng)面臨的另外一個挑戰(zhàn)是狀態(tài)一致性,因為重啟后會出現(xiàn)重復數(shù)據(jù),并且不是所有的狀態(tài)操作是冪等的。容錯性這么難實現(xiàn),那下面我們看看各大主流流處理框架是如何處理這一問題。
?
Apache Storm:Storm使用上游數(shù)據(jù)備份和消息確認的機制來保障消息在失敗之后會重新處理。消息確認原理:每個操作都會把前一次的操作處理消息的確認信息返回。Topology的數(shù)據(jù)源備份它生成的所有數(shù)據(jù)記錄。當所有數(shù)據(jù)記錄的處理確認信息收到,備份即會被安全拆除。失敗后,如果不是所有的消息處理確認信息收到,那數(shù)據(jù)記錄會被數(shù)據(jù)源數(shù)據(jù)替換。這保障了沒有數(shù)據(jù)丟失,但數(shù)據(jù)結(jié)果會有重復,這就是at-least once傳輸機制。
?
Storm采用取巧的辦法完成了容錯性,對每個源數(shù)據(jù)記錄僅僅要求幾個字節(jié)存儲空間來跟蹤確認消息。純數(shù)據(jù)記錄消息確認架構(gòu),盡管性能不錯,但不能保證exactly once消息傳輸機制,所有應用開發(fā)者需要處理重復數(shù)據(jù)。Storm存在低吞吐量和流控問題,因為消息確認機制在反壓下經(jīng)常誤認為失敗。?
?
Spark Streaming:Spark Streaming實現(xiàn)微批處理,容錯機制的實現(xiàn)跟Storm不一樣的方法。微批處理的想法相當簡單。Spark在集群各worker節(jié)點上處理micro-batches。每個micro-batches一旦失敗,重新計算就行。因為micro-batches本身的不可變性,并且每個micro-batches也會持久化,所以exactly once傳輸機制很容易實現(xiàn)。?
?
Samza:Samza的實現(xiàn)方法跟前面兩種流處理框架完全不一樣。Samza利用消息系統(tǒng)Kafka的持久化和偏移量。Samza監(jiān)控任務的偏移量,當任務處理完消息,相應的偏移量被移除。消息的偏移量會被checkpoint到持久化存儲中,并在失敗時恢復。但是問題在于:從上次checkpoint中修復偏移量時并不知道上游消息已經(jīng)被處理過,這就會造成重復。這就是at least once傳輸機制。?
?
Apache Flink:Flink的容錯機制是基于分布式快照實現(xiàn)的,這些快照會保存流處理作業(yè)的狀態(tài)(本文對Flink的檢查點和快照不進行區(qū)分,因為兩者實際是同一個事物的兩種不同叫法。Flink構(gòu)建這些快照的機制可以被描述成分布式數(shù)據(jù)流的輕量級異步快照,它采用Chandy-Lamport算法實現(xiàn)。)。
?
如果發(fā)生失敗的情況,系統(tǒng)可以從這些檢查點進行恢復。Flink發(fā)送checkpoint的柵欄(barrier)到數(shù)據(jù)流中(柵欄是Flink的分布式快照機制中一個核心的元素),當checkpoint的柵欄到達其中一個operator,operator會接所有收輸入流中對應的柵欄(比如,圖中checkpoint n對應柵欄n到n-1的所有輸入流,其僅僅是整個輸入流的一部分)。
?
所以相對于Storm,Flink的容錯機制更高效,因為Flink的操作是對小批量數(shù)據(jù)而不是每條數(shù)據(jù)記錄。但也不要讓自己糊涂了,Flink仍然是原生流處理框架,它與Spark Streaming在概念上就完全不同。Flink也提供exactly once消息傳輸機制。?
狀態(tài)管理
大部分大型流處理應用都涉及到狀態(tài)。相對于無狀態(tài)的操作(其只有一個輸入數(shù)據(jù),處理過程和輸出結(jié)果),有狀態(tài)的應用會有一個輸入數(shù)據(jù)和一個狀態(tài)信息,然后處理過程,接著輸出結(jié)果和修改狀態(tài)信息。
?
因此,我們不得不管理狀態(tài)信息,并持久化。我們期望一旦因某種原因失敗,狀態(tài)能夠修復。狀態(tài)修復有可能會出現(xiàn)小問題,它并不總是保證exactly once,有時也會出現(xiàn)消費多次,但這并不是我們想要的。
?
據(jù)我們所知,Storm提供at-least once的消息傳輸保障。那我們又該如何使用Trident做到exactly once的語義。概念上貌似挺簡單,你只需要提交每條數(shù)據(jù)記錄,但這顯然不是那么高效。所以你會想到小批量的數(shù)據(jù)記錄一起提交會優(yōu)化。Trident定義了幾個抽象來達到exactly once的語義,見下圖,其中也會有些局限。?
Spark Streaming是微批處理系統(tǒng),它把狀態(tài)信息也看做是一種微批量數(shù)據(jù)流。在處理每個微批量數(shù)據(jù)時,Spark加載當前的狀態(tài)信息,接著通過函數(shù)操作獲得處理后的微批量數(shù)據(jù)結(jié)果并修改加載過的狀態(tài)信息。?
?
?
Samza實現(xiàn)狀態(tài)管理是通過Kafka來處理的。Samza有真實的狀態(tài)操作,所以其任務會持有一個狀態(tài)信息,并把狀態(tài)改變的日志推送到Kafka。如果需要狀態(tài)重建,可以很容易的從Kafka的topic重建。為了達到更快的狀態(tài)管理,Samza也支持把狀態(tài)信息放入本地key-value存儲中,所以狀態(tài)信息不必一直在Kafka中管理,見下圖。不幸的是,Samza只提供at-least once語義,exactly once的支持也在計劃中。?
?
Flink提供狀態(tài)操作,和Samza類似。Flink提供兩種類型的狀態(tài):一種是用戶自定義狀態(tài);另外一種是窗口狀態(tài)。如圖,第一個狀態(tài)是自定義狀態(tài),它和其它的的狀態(tài)不相互作用。這些狀態(tài)可以分區(qū)或者使用嵌入式Key-Value存儲狀態(tài)[文檔一和二]。當然Flink提供exactly-once語義。下圖展示Flink長期運行的三個狀態(tài)。?
單詞計數(shù)例子中的狀態(tài)管理
單詞計數(shù)的詳細代碼見上篇文章,這里僅關注狀態(tài)管理部分。
?
讓我們先看Trident:
?
public?static?StormTopology?buildTopology(LocalDRPC?drpc)?{
??FixedBatchSpout?spout?=?...
?
??TridentTopology?topology?=?new?TridentTopology();
?
??TridentState?wordCounts?=?topology.newStream("spout1",?spout)
? ??.each(new?Fields("sentence"),new?Split(),?new?Fields("word"))
? ??.groupBy(new?Fields("word"))
? ??.persistentAggregate(new?MemoryMapState.Factory(),?new?Count(),?newFields("count"));
?
...
?
}
?
在第九行代碼中,我們通過調(diào)用persistentAggregate創(chuàng)建一個狀態(tài)。其中參數(shù)Count存儲單詞數(shù),如果你想從狀態(tài)中處理數(shù)據(jù),你必須創(chuàng)建一個數(shù)據(jù)流。從代碼中也可以看出實現(xiàn)起來不方便。
?
Spark Streaming聲明式的方法稍微好點:
?
// Initial RDD input to updateStateByKey
val initialRDD?=?ssc.sparkContext.parallelize(List.empty[(String,?Int)])
?
val lines?=?...
val words?=?lines.flatMap(_.split(" "))
val wordDstream?=?words.map(x?=>?(x,?1))
?
val trackStateFunc?=?(batchTime:?Time,?word:?String,?one:?Option[Int],
?state:?State[Int])?=>?{
? ?val sum?=?one.getOrElse(0)?+?state.getOption.getOrElse(0)
? ?val output?=?(word,?sum)
? ?state.update(sum)
? ?Some(output)
?}
?
val stateDstream?=?wordDstream.trackStateByKey(
?StateSpec.function(trackStateFunc).initialState(initialRDD))
?
首先我們需要創(chuàng)建一個RDD來初始化狀態(tài)(第二行代碼),然后進行transformations(第五行和六行代碼)。接著在第八行到十四行代碼,我們定義函數(shù)來處理單詞數(shù)狀態(tài)。函數(shù)計算并更新狀態(tài),最后返回結(jié)果。第十六行和十七行代碼,我們得到一個狀態(tài)信息流,其中包含單詞數(shù)。
?
接著我們看下Samza:
?
class?WordCountTask?extends?StreamTask?with?InitableTask?{
?
?private?var?store:?CountStore?=?_
?
?def?init(config:?Config,?context:?TaskContext)?{
? ?this.store?=?context.getStore("wordcount-store")
? ? ?.asInstanceOf[KeyValueStore[String,?Integer]]
?}
?
override?def?process(envelope:?IncomingMessageEnvelope,
? collector:?MessageCollector,?coordinator:?TaskCoordinator)?{
?
? val words?=?envelope.getMessage.asInstanceOf[String].split(" ")
?
? words.foreach?{?key?=>
? ? val count:?Integer?=?Option(store.get(key)).getOrElse(0)
? ? store.put(key,?count?+?1)
? ? collector.send(new?OutgoingMessageEnvelope(new?SystemStream("kafka","wordcount"),
? ? ??(key,?count)))
??}
}
?
首先在第三行代碼定義狀態(tài),進行Key-Value存儲,在第五行到八行代碼初始化狀態(tài)。接著在計算中使用,上面的代碼已經(jīng)很直白。
?
最后,講下Flink使用簡潔的API實現(xiàn)狀態(tài)管理:
?
val env?=?ExecutionEnvironment.getExecutionEnvironment
?
val text?=?env.fromElements(...)
val words?=?text.flatMap?(?_.split(" ")?)
?
words.keyBy(x?=>?x).mapWithState?{
?(word,?count:?Option[Int])?=>
? ?{
? ? ?val newCount?=?count.getOrElse(0)?+?1
? ? ?val output?=?(word,?newCount)
? ? ?(output,?Some(newCount))
? ?}
}
?
我們僅僅需要在第六行代碼中調(diào)用mapwithstate函數(shù),它有一個函數(shù)參數(shù)(函數(shù)有兩個變量,第一個是單詞,第二個是狀態(tài)。然后返回處理的結(jié)果和新的狀態(tài))。
流處理框架性能
這里所講的性能主要涉及到的是延遲性和吞吐量。
?
對于延遲性來說,微批處理一般在秒級別,大部分原生流處理在百毫秒以下,調(diào)優(yōu)的情況下Storm可以很輕松的達到十毫秒。
?
同時也要記住,消息傳輸機制保障,容錯性和狀態(tài)恢復都會占用機器資源。例如,打開容錯恢復可能會降低10%到15%的性能,Storm可能降低70%的吞吐量。總之,天下沒有免費的午餐。對于有狀態(tài)管理,Flink會降低25%的性能,Spark Streaming降低50%的性能。
?
也要記住,各大流處理框架的所有操作都是分布式的,通過網(wǎng)絡發(fā)送數(shù)據(jù)是相當耗時的,所以進了利用數(shù)據(jù)本地性,也盡量優(yōu)化你的應用的序列化。
項目成熟度
當你為應用選型時一定會考慮項目的成熟度。下面來快速瀏覽一下:?
Storm是第一個主流的流處理框架,后期已經(jīng)成為長期的工業(yè)級的標準,并在像Twitter,Yahoo,Spotify等大公司使用。
?
Spark Streaming是最近最流行的Scala代碼實現(xiàn)的流處理框架。現(xiàn)在Spark Streaming被公司(Netflix, Cisco, DataStax, Intel, IBM等)日漸接受。
?
Samza主要在LinkedIn公司使用。Flink是一個新興的項目,很有前景。
?
你可能對項目的貢獻者數(shù)量也感興趣。Storm和Trident大概有180個代碼貢獻者;整個Spark有720多個;根據(jù)github顯示,Samza有40個;Flink有超過130個代碼貢獻者。
流處理框架推薦
應用選型是大家都會遇到的問題,一般是根據(jù)應用具體的場景來選擇特定的流處理框架。下面給出幾個作者認為優(yōu)先考慮的點:
?
-
High level API:具有high level API的流處理框架會更簡潔和高效;
-
狀態(tài)管理:大部分流處理應用都涉及到狀態(tài)管理,因此你得把狀態(tài)管理作為評價指標之一;
-
exactly once語義:exactly once會使得應用開發(fā)變得簡單,但也要看具體需求,可能at least once 或者at most once語義就滿足你得要求;
-
自動恢復:確保流處理系統(tǒng)能夠快速恢復,你可以使用Chaos Monkey或者類似的工具進行測試。快速的恢復是流處理重要的部分。
?
Storm:Storm非常適合任務量小但速度要求高的應用。如果你主要在意流處理框架的延遲性,Storm將可能是你的首先。但同時也要記住,Storm的容錯恢復或者Trident的狀態(tài)管理都會降低整體的性能水平。也有一個潛在的Storm更新項目-Twitter的Heron,Heron設計的初衷是為了替代Storm,并在每個單任務上做了優(yōu)化但同時保留了API。
?
Spark Streaming:如果你得基礎架構(gòu)中已經(jīng)設計到Spark,那Spark Streaming無疑是值得你嘗試的。因為你可以很好的利用Spark各種library。如果你需要使用Lambda架構(gòu),Spark Streaming也是一個不錯的選擇。但你要時刻記住微批處理的局限性,以及它的延遲性問題。
?
Samza:如果你想使用Samza,那Kafka應該是你基礎架構(gòu)中的基石,好在現(xiàn)在Kafka已經(jīng)成為家喻戶曉的組件。像前面提到的,Samza一般會搭配強大的本地存儲一起,這對管理大數(shù)據(jù)量的狀態(tài)非常有益。它可以輕松處理上萬千兆字節(jié)的狀態(tài)信息,但要記住Samza只支持at least once語義。
?
Flink:Flink流處理系統(tǒng)的概念非常不錯,并且滿足絕大多數(shù)流處理場景,也經(jīng)常提供前沿的功能函數(shù),比如,高級窗口函數(shù)或者時間處理功能,這些在其它流處理框架中是沒有的。同時Flink也有API提供給通用的批處理場景。但你需要足夠的勇氣去上線一個新興的項目,并且你也不能忘了看下Flink的roadmap。
Dataflow和開源
最后,我們來聊下Dataflow和它的開源。Dataflow是Google云平臺的一部分,Google云平臺包含很多組件:大數(shù)據(jù)存儲,BigQuery,Cloud PubSub,數(shù)據(jù)分析工具和前面提到的Dataflow。
?
Dataflow是Google管理批處理和流處理的統(tǒng)一API。它是建立在MapReduce(批處理),FlumeJava(編程模型)和MillWheel(流處理)之上。Google最近決定開源Dataflow SDK,并完成Spark和Flink的runner。現(xiàn)在可以通過Dataflow的API來定義Google云平臺作業(yè)、Flink作業(yè)或者Spark作業(yè),后續(xù)會增加對其它引擎的支持。
?
Google為Dataflow提供Java、Python的API,社區(qū)已經(jīng)完成Scalable的DSL支持。除此之外,Google及其合作者提交Apache Beam到Apache。?
?
結(jié)論
本系列文章粗略的講述各大流行的流處理框架,并討論了它們的相似性、區(qū)別、折衷權(quán)衡和使用的場景。希望這些將會給你設計流處理方案有幫助。
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/articles/10655201.html
總結(jié)
以上是生活随笔為你收集整理的实时流处理框架Storm、Spark Streaming、Samza、Flink,孰优孰劣?!的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基于 Flink 的严选实时数仓实践
- 下一篇: UNIX 网络协议的深度分析