Apache SparkStreaming 简介和编程模型
1. 簡介
圖5.22 SparkStreaming[16]
????Spark Streaming是Spark API核心擴展,提供對實時數(shù)據(jù)流進行流式處理,具備可擴展、高吞吐和容錯等特性。Spark Streaming支持從多種數(shù)據(jù)源中提取數(shù)據(jù),例如Twitter、Kafka、Flume、ZeroMQ和TCP套接字,并提供了一些高級的API來表示復雜處理算法,如map、reduce、join、windows等,最后可以將得到的結(jié)果存儲到分布式文件系統(tǒng)(如HDFS)、數(shù)據(jù)庫或者其他輸出,Spark的機器學習和圖計算的算法也可以應用于Spark Streaming的數(shù)據(jù)流中。Spark Streaming的本質(zhì)實際上是一個微批處理系統(tǒng),正因如此,Spark Streaming具有一些現(xiàn)有的流處理模型所沒有的特性。它可以對故障節(jié)點和慢節(jié)點實現(xiàn)秒級的恢復,且具有高吞吐量。但其實時計算延遲是在秒級的,而現(xiàn)有的流處理系統(tǒng)(如Storm)一般是在毫秒級,所以Spark Streaming不適用于一些實時性要求很高的場景,如實時金融系統(tǒng)等。
????許多數(shù)據(jù)需要實時進行處理,也就是說數(shù)據(jù)產(chǎn)生時的價值最大。例如,一個社交網(wǎng)絡想在分鐘級別內(nèi)確定某個交流話題的趨勢,搜索網(wǎng)站想根據(jù)用戶的訪問訓練模型,服務商想在秒級內(nèi)通過挖掘日志找到錯誤信息。設計適用于這些場景的模型極具挑戰(zhàn)性,因為對于一些應用場景(如機器學習、實時日志分析),集群規(guī)模會達到百級以上,在這樣的規(guī)模下會存在兩個主要問題:節(jié)點故障(faults)和慢節(jié)點(slow nodes)問題。這兩個問題在大規(guī)模集群下都是經(jīng)常存在的,所以快速恢復在流系統(tǒng)應用中是十分重要的,否則流式應用可能無法及時做出關鍵的決定。但現(xiàn)有的一些流處理系統(tǒng)在這兩個問題的處理上都十分有限,大多數(shù)流處理系統(tǒng)(如Storm、TimeStream、MapReduce Online等)都是基于純實時的計算模型(a-record-at-a-time,來一條數(shù)據(jù)就處理一條數(shù)據(jù)),雖然這個模型能夠有較小的計算時延,但是很難解決節(jié)點故障和慢節(jié)點的問題。一些傳統(tǒng)的流式處理方法在小規(guī)模集群下運行較好,但在大規(guī)模情況下卻面臨著實質(zhì)性的問題。
????Spark Streaming提供了一種抽象的連續(xù)數(shù)據(jù)流,即Discretized Stream(離散流),一個離散流本質(zhì)上就是一個序列化的RDD(Resilient Distributed Datasets,彈性分布式數(shù)據(jù)集)。離散流模型利用其并行恢復(ParallelRecovery)解決了節(jié)點故障和減輕了慢節(jié)點所帶來的問題,還保證了一致性語義。
2. 系統(tǒng)架構(gòu)
????離散流是Spark Streaming提供的基礎抽象,它代表持續(xù)性的數(shù)據(jù)流,這些數(shù)據(jù)流既可以從外部源(如Kafka、Flume等)獲取,也可以通過離散流的算子操作來獲得。實質(zhì)上,離散流由一組時間上連續(xù)的RDD組成,每個RDD都包含著一定時間片的數(shù)據(jù),如圖5.23所示:
圖5.23Discretized Stream[17]
圖5.24 SparkStreaming 整體架構(gòu)[18]
?
????如圖5.24所示,這是Spark Streaming系統(tǒng)的整體架構(gòu),它將實時的流數(shù)據(jù)分解成一系列很小的批處理作業(yè)。批處理引擎使用的是Spark Core,也就是把輸入數(shù)據(jù)按照一定的時間片(如1s)分成一段一段數(shù)據(jù),每一段數(shù)據(jù)都會轉(zhuǎn)換成Spark的RDD輸入到Spark Core中,然后再將離散流的操作轉(zhuǎn)換為RDD的算子操作,RDD算子操作產(chǎn)生的中間結(jié)果會保存在內(nèi)存中,最后整個流式計算可以將中間結(jié)果輸出到外部。
3. 一致性語義和容錯
????對于流式計算,容錯性的重要性在第一小節(jié)已經(jīng)詳細說明過了。首先,我們需要回憶Spark中RDD的容錯機制。RDD是一個彈性不可變的分布式數(shù)據(jù)集,Spark記錄著確定性的RDD轉(zhuǎn)換的操作繼承關系(lineage),所以只要輸入的數(shù)據(jù)是可容錯的,任何一個RDD的分區(qū)出錯時,都可以根據(jù)lineage對原始輸入數(shù)據(jù)進行轉(zhuǎn)換操作,從而重新計算。圖5.25是Spark Streaming的一個RDD繼承關系圖:
圖5.25 統(tǒng)計網(wǎng)頁瀏覽量的lineragegraph[18]
????圖中每個橢圓代表的是一個RDD,橢圓中的每一個圓形是一個RDD的分區(qū),圖中的每一列的所有RDD代表的是一個離散流(圖中一共有3個離散流),間隔[0,1)和[1,2)代表的是不同時間分片,圖中每一行的最后一個RDD代表的是中間結(jié)果RDD。
????并行恢復(Parallel Recovery):系統(tǒng)會周期性的checkpoint RDD的數(shù)據(jù),異步的備份到其他節(jié)點(默認復制數(shù)是2),因為RDD是不可變的,所以checkpoint不會鎖住當前時間片的執(zhí)行。一個Spark Streaming的流式應用,系統(tǒng)會每分鐘對中間結(jié)果RDD進行checkpoint。當一個節(jié)點發(fā)生故障了,系統(tǒng)監(jiān)測出丟失的RDD,系統(tǒng)會選擇上一個checkpoint的數(shù)據(jù)來進行重新計算。離散流可以利用充分利用分區(qū)的并行性來達到更快的恢復速度:1)與批處理系統(tǒng)很相似的是,每個節(jié)點上運行多個task,每一個時間片的轉(zhuǎn)換操作會在每個節(jié)點創(chuàng)建多個RDD分區(qū)(例如在100個節(jié)點的集群上有1000個RDD分區(qū))。這樣當一個節(jié)點發(fā)生故障時,可以讓RDD的不同分區(qū)并行恢復。2)繼承關系圖(lineage graph)可以使不同時間片的數(shù)據(jù)并行恢復。如果一個機器節(jié)點發(fā)生故障,系統(tǒng)在每一個時間片可能丟失一些map操作的輸出,從圖5.26的瀏覽量統(tǒng)計應用的lineage graph可以看出,不同時間片的map可以并行地恢復計算,所以并行恢復的速度是比上游緩存策略更快的。
????慢節(jié)點問題:在現(xiàn)有的純實時流處理系統(tǒng)中,基本都沒有解決慢節(jié)點的問題。離散流則與批處理系統(tǒng)類似,通過運行慢任務的副本來減輕慢節(jié)點帶來的影響。Spark Streaming最開始采用一種簡單的閾值來判斷一個任務是否是慢任務:當一個任務是這個階段(stage)的中間任務運行時間的1.4倍,則判斷這是個慢任務。
????一致性語義:離散流還有一個好處就是提供了強一致性。例如,考慮一個系統(tǒng)統(tǒng)計男女網(wǎng)頁瀏覽量的比例,一個節(jié)點統(tǒng)計男性網(wǎng)頁瀏覽量,另一個節(jié)點統(tǒng)計女性網(wǎng)頁瀏覽量。如果一個節(jié)點落后于另一個節(jié)點,那么最終的結(jié)果也將有誤。一些系統(tǒng)(如Borealis)利用同步節(jié)點來避免這個問題,而Storm就直接忽略了這個問題。而且Storm只能保證一個記錄最少被處理一次,可能存在錯誤記錄被多次處理,這就會使可變更的狀態(tài)因更新兩次而導致結(jié)果不正確,雖然Storm提供了Trident可以確保每條記錄有且僅被處理一次,但是非常慢且需要用戶去實現(xiàn)。使用離散流可以保證一致性是很明顯的,因為時間被劃分成時間片,每一個時間片的輸出RDD都與這個時間片的輸入和前面時間片有關(參考圖5.26),而RDD是不可變的,因此最終的結(jié)果是不會改變的。
4. Apache SparkStreaming編程模型
4.1 數(shù)據(jù)模型
?
????在第2節(jié)我們知道,Spark Streaming就是把數(shù)據(jù)流劃分為微批交給Spark Core處理的。Spark Core的處理的數(shù)據(jù)被抽象成了一個RDD,而Spark Streaming的處理數(shù)據(jù)被抽象成了一系列的DStream。實質(zhì)上,離散流由一組時間上連續(xù)的RDD組成,每個RDD都包含著一定時間片的數(shù)據(jù),如圖5.23所示。
?
4.2 計算模型
?
????Spark Streaming的編程模型可以看成是一個批處理Spark Core的編程模型,除了API是調(diào)用Spark Streaming的API,很多概念都是一樣的。在Spark Core編寫程序時,只需要指定初始RDD的生成,然后對初始RDD進行一系列轉(zhuǎn)換的操作,不斷生成新的RDD,最后生成最終的結(jié)果RDD。
????Spark Streaming也是類似的計算模型,DStream本質(zhì)是一組時間上連續(xù)的RDD組成的,RDD是依靠著分區(qū)(Partition)來保證并行性的。在編寫Spark Streaming程序的時候,我們需要指定初始DStream的輸入源,生成初始的DStream,然后定義一些轉(zhuǎn)換操作,這些DStream的操作最終都會轉(zhuǎn)換成RDD的操作,然后在每一個時間片內(nèi),可以獲得最終的結(jié)果DStream對應的RDD(也可以將結(jié)果選擇輸出到外部文件中),可以參考后面單詞計數(shù)的實例分析。
PS:關于Spark Core中RDD的編程模型不屬于本章所要講的重點,在這里就不做贅述。
?
4.3 基本操作
?
????從Spark Streaming的系統(tǒng)架構(gòu)可知,Spark Streaming中對DStream的各種操作,最終會在Spark Core中轉(zhuǎn)換成RDD的操作,因此對DStream的操作是與Spark Core對RDD的操作是十分類似的。Spark Streaming在其數(shù)據(jù)模型DStream的模型下,為DStream提供了一系列的操作方法,這些操作大概可以分為3類:普通的轉(zhuǎn)換操作、窗口轉(zhuǎn)換操作和輸出操作。常用的普通轉(zhuǎn)換操作有flatMap、map、filter、reduceByKey、countByKey等操作,并且Spark Streaming支持將DStream的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫或文件系統(tǒng)。具體Spark Streaming支持的所有操作,可以到官網(wǎng)查看。
?
4.4 編程模型實例分析
??????
?????? 下面用最基本的wordcount例子來解釋其編程模型,其DStream的轉(zhuǎn)換如下所示:
???? 圖XXX:單詞計數(shù)的DStream轉(zhuǎn)換圖
????如上圖所示,一共定義了四個離散流,wordCounts的離散流是我們最終要的結(jié)果。LinesDStream可以從文件系統(tǒng)、數(shù)據(jù)庫、kafka等獲取,然后對其進行flatMap操作,將每一行的文本分割成單詞,形成新的離散流words DStream,隨即進行mapToPair操作,將其映射成<word,1>的模式,最后用reduceByKey操作對每個單詞進行計數(shù),得到最終的結(jié)果離散流wordCountsDStream。
Java核心代碼如下:
?
//創(chuàng)建SparkConf對象 //與Spark Core的有一點不同,設置Master屬性的時候,使用local模式時, // local后面必須跟一個方括號,里面填寫一個數(shù)字,數(shù)字代表了用幾個線程執(zhí)行Spark Streaming程序。 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordCountLocal");//創(chuàng)建SparkStreamingContext對象,還需指定每隔多長時間的數(shù)據(jù)劃分為一個batch,這里是1s JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));//首先,創(chuàng)建一個DStream,代表了從一個數(shù)據(jù)源(這里是socket)來的持續(xù)不斷的實時數(shù)據(jù)流 JavaReceiverInputDStream<String> lines = jsc.socketTextStream("localhost", 9999);//將一行行的文本用flatMap切分成多個單詞,words DStream的RDD元素類型為一個個單詞 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();} });/ //接著開始進行mapToPair操作,將單詞映射成<word,1>的pair格式,得到離散流pairs JavaPairDStream<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String word) throws Exception {return new Tuple2<String, Integer>(word,1);} });//對離散流pairs進行reduceByKey操作,進行單詞計數(shù),得到wordCounts離散流 JavaPairDStream<String,Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer v1, Integer v2) throws Exception {return v1 + v2;} }); //最后,每次計算完,就打印這一秒鐘的單詞計數(shù)情況 wordCounts.print(); //必須調(diào)用JavaStreamingContext的start()方法,整個Java Streaming Application才會啟動執(zhí)行 //否則,不會執(zhí)行 jsc.start(); try {jsc.awaitTermination();//等待應用程序的終止,可以使用CTRL+C手動停止//也可以通過調(diào)用JavaStreamingContext的stop()方法來終止程序 } catch (InterruptedException e) {e.printStackTrace(); } jsc.close();
總結(jié)
以上是生活随笔為你收集整理的Apache SparkStreaming 简介和编程模型的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Apache Flink 简介和编程模型
- 下一篇: Twitter Heron 实时流处理系