Spark Streaming 实现思路与模块概述
Spark Streaming 實現思路與模塊概述
[酷玩 Spark] Spark Streaming 源碼解析系列?,返回目錄請?猛戳這里
「騰訊·廣點通」技術團隊榮譽出品
本文內容適用范圍:
- 2016.01.04 update, Spark 1.6 全系列 √ (1.6.0)
- 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
- 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
- 2015.04.17 update, Spark 1.3 全系列 √ (1.3.0, 1.3.1)?
一、基于 Spark 做 Spark Streaming 的思路
Spark Streaming 與 Spark Core 的關系可以用下面的經典部件圖來表述:
在本節,我們先探討一下基于 Spark Core 的 RDD API,如何對 streaming data 進行處理。理解下面描述的這個思路非常重要,因為基于這個思路詳細展開后,就能夠充分理解整個 Spark Streaming 的模塊劃分和代碼邏輯。
第一步,假設我們有一小塊數據,那么通過 RDD API,我們能夠構造出一個進行數據處理的 RDD DAG(如下圖所示)。
第二步,我們對連續的 streaming data 進行切片處理 —— 比如將最近 200ms 時間的 event 積攢一下 —— 每個切片就是一個 batch,然后使用第一步中的 RDD DAG 對這個 batch 的數據進行處理。
注意: 這里我們使用的是 batch 的概念 —— 其實 200ms 在其它同類系統中通常叫做 mini-batch,不過既然 Spark Streaming 官方的叫法就是 batch,我們這里就用 batch 表達 mini-batch 的意思了 :)
所以,針對連續不斷的 streaming data 進行多次切片,就會形成多個 batch,也就對應出來多個 RDD DAG(每個 RDD DAG 針對一個 batch 的數據)。如此一來,這多個 RDD DAG 之間相互同構,卻又是不同的實例。我們用下圖來表示這個關系:
所以,我們將需要:
(1) 一個靜態的 RDD DAG 的模板,來表示處理邏輯;
(2) 一個動態的工作控制器,將連續的 streaming data 切分數據片段,并按照模板復制出新的 RDD DAG 的實例,對數據片段進行處理。
第三步,我們回過頭來看 streaming data 本身的產生。Hadoop MapReduce, Spark RDD API 進行批處理時,一般默認數據已經在 HDFS, HBase 或其它存儲上。而 streaming data —— 比如 twitter 流 —— 又有可能是在系統外實時產生的,就需要能夠將這些數據導入到 Spark Streaming 系統里,就像 Apache Storm 的 Spout,Apache S4 的 Adapter 能夠把數據導入系統里的作用是一致的。所以,我們將需要:
- (3) 原始數據的產生和導入。
第四步,我們考慮,有了以上 (a)(b)(c) 3 部分,就可以順利用 RDD API 處理 streaming data 了嗎?其實相對于 batch job 通常幾個小時能夠跑完來講,streaming job 的運行時間是 +∞(正無窮大)的,所以我們還將需要:
- (4) 對長時運行任務的保障,包括輸入數據的失效后的重構,處理任務的失敗后的重調。
至此,streaming data 的特點決定了,如果我們想基于 Spark Core 進行 streaming data 的處理,還需要在 Spark Core 的框架上解決剛才列出的 (1)(2)(3)(4) 這四點問題:
二、Spark Streaming 的整體模塊劃分
根據 Spark Streaming 解決這 4 個問題的不同 focus,可以將 Spark Streaming 劃分為四個大的模塊:
- 模塊 1:DAG 靜態定義
- 模塊 2:Job 動態生成
- 模塊 3:數據產生與導入
- 模塊 4:長時容錯
其中每個模塊涉及到的主要的類,示意如下:
這里先不用糾結每個類的具體用途,我們將在本文中簡述,并在本系列的后續文章里對每個模塊逐一詳述。
2.1 模塊 1:DAG 靜態定義
通過前面的描述我們知道,應該首先對計算邏輯描述為一個 RDD DAG 的“模板”,在后面 Job 動態生成的時候,針對每個 batch,Spark Streaming 都將根據這個“模板”生成一個 RDD DAG 的實例。
DStream 和 DStreamGraph
其實在 Spark Streaming 里,這個 RDD “模板”對應的具體的類是?DStream,RDD DAG “模板”對應的具體類是DStreamGraph。而?RDD?本身也有很多子類,幾乎每個子類都有一個對應的?DStream,如?UnionRDD?的對應是UnionDStream。RDD?通過?transformation?連接成 RDD DAG(但 RDD DAG 在 Spark Core 里沒有對應的具體類),DStream?也通過?transformation?連接成?DStreamGraph。
DStream 的全限定名是:org.apache.spark.streaming.dstream.DStream DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraphDStream 和 RDD 的關系
既然?DStream?是?RDD?的模板,而且?DStream?和?RDD?具有相同的?transformation?操作,比如 map(), filter(), reduce() ……等等(正是這些相同的?transformation?使得?DStreamGraph?能夠忠實記錄 RDD DAG 的計算邏輯),那?RDD?和?DStream?有什么不一樣嗎?
還真不一樣。
比如,DStream?維護了對每個產出的?RDD?實例的指針。比如下圖里,DStream A?在 3 個 batch 里分別實例化了 3 個?RDD,分別是?a[1],?a[2],?a[3],那么?DStream A?就保留了一個?batch → 所產出的 RDD?的哈希表,即包含?batch 1 → a[1],?batch 2 → a[2],?batch 3 → a[3]?這 3 項。
另外,能夠進行流量控制的?DStream?子類,如?ReceiverInputDStream,還會保存關于歷次 batch 的源頭數據條數、歷次 batch 計算花費的時間等數值,用來實時計算準確的流量控制信息,這些都是記在?DStream?里的,而?RDD a[1]?等則不會保存這些信息。
我們在考慮的時候,可以認為,RDD?加上 batch 維度就是?DStream,DStream?去掉 batch 維度就是?RDD?—— 就像?RDD = DStream at batch T
不過這里需要特別說明的是,在DStreamGraph的圖里,DStream(即數據)是頂點,DStream之間的 transformation(即計算)是邊,這與 Apache Storm 等是相反的。
在 Apache Storm 的 topology 里,頂點是計算,邊是 stream(連續的 tuple),即數據。這一點也是比較熟悉 Storm 的同學剛開始一下子不太理解 DStream 的原因--我們再重復一遍,DStream 在有向圖里是頂點,是數據本身,而不是邊。
2.2 模塊 2:Job 動態生成
現在有了?DStreamGraph?和?DStream,也就是靜態定義了的計算邏輯,下面我們來看 Spark Streaming 是如何將其動態調度的。
在 Spark Streaming 程序的入口,我們都會定義一個 batchDuration,就是需要每隔多長時間就比照靜態的?DStreamGraph?來動態生成一個 RDD DAG 實例。在 Spark Streaming 里,總體負責動態作業調度的具體類是?JobScheduler,在 Spark Streaming 程序開始運行的時候,會生成一個?JobScheduler?的實例,并被 start() 運行起來。
JobScheduler?有兩個非常重要的成員:JobGenerator?和?ReceiverTracker。JobScheduler?將每個 batch 的 RDD DAG 具體生成工作委托給?JobGenerator,而將源頭輸入數據的記錄工作委托給?ReceiverTracker。
JobScheduler 的全限定名是:org.apache.spark.streaming.scheduler.JobScheduler JobGenerator 的全限定名是:org.apache.spark.streaming.scheduler.JobGenerator ReceiverTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceiverTrackerJobGenerator?維護了一個定時器,周期就是我們剛剛提到的 batchDuration,定時為每個 batch 生成 RDD DAG 的實例。具體的,每次 RDD DAG 實際生成包含 5 個步驟:
- (1)?要求?ReceiverTracker?將目前已收到的數據進行一次 allocate,即將上次 batch 切分后的數據切分到到本次新的 batch 里
- (2)?要求?DStreamGraph?復制出一套新的 RDD DAG 的實例,具體過程是:DStreamGraph?將要求圖里的尾?DStream?節點生成具體的 RDD 實例,并遞歸的調用尾?DStream?的上游?DStream?節點……以此遍歷整個?DStreamGraph,遍歷結束也就正好生成了 RDD DAG 的實例
- (3)?獲取第 1 步?ReceiverTracker?分配到本 batch 的源頭數據的 meta 信息
- (4) 將第 2 步生成的本 batch 的 RDD DAG,和第 3 步獲取到的 meta 信息,一同提交給?JobScheduler?異步執行
- (5) 只要提交結束(不管是否已開始異步執行),就馬上對整個系統的當前運行狀態做一個 checkpoint
上述 5 個步驟的調用關系圖如下:
2.3 模塊 3:數據產生與導入
下面我們看 Spark Streaming 解決第三個問題的模塊分析,即數據的產生與導入。
DStream?有一個重要而特殊的子類?ReceiverInputDStream:它除了需要像其它?DStream?那樣在某個 batch 里實例化?RDD?以外,還需要額外的?Receiver?為這個?RDD?生產數據!
具體的,Spark Streaming 在程序剛開始運行時:
(1) 由?Receiver?的總指揮?ReceiverTracker?分發多個 job(每個 job 有 1 個 task),到多個 executor 上分別啟動ReceiverSupervisor?實例;
(2) 每個?ReceiverSupervisor?啟動后將馬上生成一個用戶提供的?Receiver?實現的實例 —— 該?Receiver?實現可以持續產生或者持續接收系統外數據,比如?TwitterReceiver?可以實時爬取 twitter 數據 —— 并在?Receiver?實例生成后調用Receiver.onStart()。
(1)(2) 的過程由上圖所示,這時?Receiver?啟動工作已運行完畢。
接下來?ReceiverSupervisor?將在 executor 端作為的主要角色,并且:
(3)?Receiver?在?onStart()?啟動后,就將持續不斷地接收外界數據,并持續交給?ReceiverSupervisor?進行數據轉儲;
(4)?ReceiverSupervisor?持續不斷地接收到?Receiver?轉來的數據:
- 如果數據很細小,就需要?BlockGenerator?攢多條數據成一塊(4a)、然后再成塊存儲(4b 或 4c)
反之就不用攢,直接成塊存儲(4b 或 4c)
這里 Spark Streaming 目前支持兩種成塊存儲方式,一種是由?blockManagerskManagerBasedBlockHandler?直接存到 executor 的內存或硬盤,另一種由?WriteAheadLogBasedBlockHandler?是同時寫 WAL(4c) 和 executor 的內存或硬盤
(5) 每次成塊在 executor 存儲完畢后,ReceiverSupervisor?就會及時上報塊數據的 meta 信息給 driver 端的ReceiverTracker;這里的 meta 信息包括數據的標識 id,數據的位置,數據的條數,數據的大小等信息。
(6)?ReceiverTracker?再將收到的塊數據 meta 信息直接轉給自己的成員?ReceivedBlockTracker,由ReceivedBlockTracker?專門管理收到的塊數據 meta 信息。
這里 (3)(4)(5)(6) 的過程是一直持續不斷地發生的,我們也將其在上圖里標識出來。
后續在 driver 端,就由?ReceiverInputDStream?在每個 batch 去檢查?ReceiverTracker?收到的塊數據 meta 信息,界定哪些新數據需要在本 batch 內處理,然后生成相應的?RDD?實例去處理這些塊數據,這個過程在模塊 1:DAG 靜態定義?模塊2:Job 動態生成?里描述過了。
2.4 模塊 4:長時容錯
以上我們簡述完成 Spark Streamimg 基于 Spark Core 所新增功能的 3 個模塊,接下來我們看一看第 4 個模塊將如何保障 Spark Streaming 的長時運行 —— 也就是,如何與前 3 個模塊結合,保障前 3 個模塊的長時運行。
通過前 3 個模塊的關鍵類的分析,我們可以知道,保障模塊 1 和 2 需要在 driver 端完成,保障模塊 3 需要在 executor 端和 driver 端完成。
executor 端長時容錯
先看 executor 端。
在 executor 端,ReceiverSupervisor?和?Receiver?失效后直接重啟就 OK 了,關聯是保障收到的塊數據的安全。保障了源頭塊數據,就能夠保障 RDD DAG (Spark Core 的 lineage)重做。
Spark Streaming 對源頭塊數據的保障,分為 4 個層次,全面、相互補充,又可根據不同場景靈活設置:
- (1) 熱備:熱備是指在存儲塊數據時,將其存儲到本 executor、并同時 replicate 到另外一個 executor 上去。這樣在一個 replica 失效后,可以立刻無感知切換到另一份 replica 進行計算。實現方式是,在實現自己的 Receiver 時,即指定一下StorageLevel?為?MEMORY_ONLY_2?或?MEMORY_AND_DISK_2?就可以了。
// 1.5.2 update 這已經是默認了。
- (2) 冷備:冷備是每次存儲塊數據前,先把塊數據作為 log 寫出到?WriteAheadLog?里,再存儲到本 executor。executor 失效時,就由另外的 executor 去讀 WAL,再重做 log 來恢復塊數據。WAL 通常寫到可靠存儲如 HDFS 上,所以恢復時可能需要一段 recover time。
(3) 重放:如果上游支持重放,比如 Apache Kafka,那么就可以選擇不用熱備或者冷備來另外存儲數據了,而是在失效時換一個 executor 進行數據重放即可。
(4) 忽略:最后,如果應用的實時性需求大于準確性,那么一塊數據丟失后我們也可以選擇忽略、不恢復失效的源頭數據。
我們用一個表格來總結一下:
| ? | 圖示 | 優點 | 缺點 |
| (1) 熱備 | 無 recover time | 需要占用雙倍資源 | |
| (2) 冷備 | 十分可靠 | 存在 recover time | |
| (3) 重放 | 不占用額外資源 | 存在 recover time | |
| (4) 忽略 | 無 recover time | 準確性有損失 |
driver 端長時容錯
前面我們講過,塊數據的 meta 信息上報到 ReceiverTracker,然后交給?ReceivedBlockTracker?做具體的管理。ReceivedBlockTracker?也采用 WAL 冷備方式進行備份,在 driver 失效后,由新的?ReceivedBlockTracker?讀取 WAL 并恢復 block 的 meta 信息。
另外,需要定時對?DStreamGraph?和?JobScheduler?做?Checkpoint,來記錄整個?DStreamGraph?的變化、和每個 batch 的 job 的完成情況。
注意到這里采用的是完整 checkpoint 的方式,和之前的 WAL 的方式都不一樣。Checkpoint?通常也是落地到可靠存儲如 HDFS。Checkpoint?發起的間隔默認的是和?batchDuration 一致;即每次 batch 發起、提交了需要運行的 job 后就做Checkpoint,另外在 job 完成了更新任務狀態的時候再次做一下?Checkpoint。
這樣一來,在 driver 失效并恢復后,可以讀取最近一次的 Checkpoint 來恢復作業的?DStreamGraph?和 job 的運行及完成狀態。
總結一下本節內容為上述表格,可以看到,Spark Streaming 的長時容錯特性,能夠提供不重、不丟,exactly-once 的處理語義。
三、入口:StreamingContext
上面我們花了很多篇幅來介紹 Spark Streaming 的四大模塊,我們在最后介紹一下?StreamingContext。
下面我們用這段僅 11 行的完整?quick example,來說明用戶 code 是怎么通過?StreamingContext?與前面幾個模塊進行交互的:
import org.apache.spark._ import org.apache.spark.streaming._// 首先配置一下本 quick example 將跑在本機,app name 是 NetworkWordCount val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") // batchDuration 設置為 1 秒,然后創建一個 streaming 入口 val ssc = new StreamingContext(conf, Seconds(1))// ssc.socketTextStream() 將創建一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監聽本機 9999 端口 val lines = ssc.socketTextStream("localhost", 9999)val words = lines.flatMap(_.split(" ")) // DStream transformation val pairs = words.map(word => (word, 1)) // DStream transformation val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation wordCounts.print() // DStream transformation // 上面 4 行利用 DStream transformation 構造出了 lines -> words -> pairs -> wordCounts -> .print() 這樣一個 DStreamGraph // 但注意,到目前是定義好了產生數據的 SocketReceiver,以及一個 DStreamGraph,這些都是靜態的// 下面這行 start() 將在幕后啟動 JobScheduler, 進而啟動 JobGenerator 和 ReceiverTracker // ssc.start() // -> JobScheduler.start() // -> JobGenerator.start(); 開始不斷生成一個一個 batch // -> ReceiverTracker.start(); 開始往 executor 上分布 ReceiverSupervisor 了,也會進一步創建和啟動 Receiver ssc.start()// 然后用戶 code 主線程就 block 在下面這行代碼了 // block 的后果就是,后臺的 JobScheduler 線程周而復始的產生一個一個 batch 而不停息 // 也就是在這里,我們前面靜態定義的 DStreamGraph 的 print(),才一次一次被在 RDD 實例上調用,一次一次打印出當前 batch 的結果 ssc.awaitTermination()所以我們看到,StreamingContext?是 Spark Streaming 提供給用戶 code 的、與前述 4 個模塊交互的一個簡單和統一的入口。
四、總結與回顧
在最后我們再把?Sark Streaming 官方 Programming Guide?的部分內容放在這里,作為本文的一個回顧和總結。請大家看一看,如果看懂了本文的內容,是不是讀下面這些比較 high-level 的介紹會清晰化很多 :-)
Spark Streaming?is an extension of the?core Spark API?that enables?scalable,?high-throughput,?fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.
Internally, it works as follows.?Spark Streaming receives live input data streams and divides the data into batches, which are then processed by the Spark engine to generate the final stream of results in batches.
Spark Streaming provides a high-level abstraction called?discretized stream?or?DStream, which represents a continuous stream of data. DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams.?Internally, a DStream is represented as a sequence of RDDs.
...
(本文完,參與本文的討論請?猛戳這里,返回目錄請?猛戳這里)
轉載于:https://www.cnblogs.com/dailidong/p/7571134.html
總結
以上是生活随笔為你收集整理的Spark Streaming 实现思路与模块概述的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 享受大自然的感悟句子219个
- 下一篇: 关于春夏秋冬四季的网名122个