【Spark 深入学习 04】再说Spark底层运行机制
本節(jié)內(nèi)容
· spark底層執(zhí)行機(jī)制
· 細(xì)說RDD構(gòu)建過程
· Job Stage的劃分算法
· Task最佳計算位置算法
?
一、spark底層執(zhí)行機(jī)制
? ? 對于Spark底層的運行原理,找到了一副很好的圖,先貼上
?
?
? ??客戶端提交應(yīng)用后,spark是如何執(zhí)行的要有一個整體的概念,做到心中有數(shù),先整體把握,才能更好的分模塊開墾細(xì)節(jié),廢話不多說,先來看該圖如何更好的理解。
????1)提交前的聯(lián)系
?????Worker向Master或則ResourceManager匯報自己有哪些資源(內(nèi)存、CPU、磁盤空間、網(wǎng)絡(luò)等),Master或則ResourceManager與Worker一直保持心跳
? ? 2)應(yīng)用提交后
???Spark通過RDD對分布式的數(shù)據(jù)進(jìn)行管理,RDD記錄了轉(zhuǎn)換成“spark格式”后的數(shù)據(jù)分區(qū)(記錄數(shù)據(jù)的存儲位置)和數(shù)據(jù)分區(qū)對應(yīng)的操作
???· 應(yīng)用提交后,形成RDD Graph,并且在后臺創(chuàng)建DAG對象(spark不僅僅用DAG建模,而且還會執(zhí)行它,并且里面不是用對象表示,而是用RDD對象之間的關(guān)系)
???· DAG Scheduler 優(yōu)先使用pipeline方法,把RDD的transformation壓縮,當(dāng)碰到wide transformation 時,narrow無法和wide pipeline,那DAG scheduler會把前面的transformation定義成一個stage,DAG Scheduler的工作結(jié)果就是將RDD產(chǎn)生一組stages
???· 將DAG Scheduler產(chǎn)生的stages傳送給task scheduler,task scheduler使用集群管理器依次執(zhí)行task,task被分配到各個work下執(zhí)行,當(dāng)所有的task執(zhí)行完畢,一個stage標(biāo)記完成,再運行下一個stage,直到整個spark job完成。
?
? ? 簡單理解, Spark 把要處理的數(shù)據(jù),處理中間結(jié)果,和輸出結(jié)果都定義成 RDD. 這樣一個常見的 Spark job 就類似于:
????? 從數(shù)據(jù)源讀取數(shù)據(jù),把輸入生成一個 RDD;
????? 通過運算把輸入 RDD 轉(zhuǎn)換成另一個RDD;
????? 再通過運算把生成的 RDD 轉(zhuǎn)換成另一個RDD,重復(fù)需要進(jìn)行的 RDD 轉(zhuǎn)換操作 (此處省略一千遍);
????? 最后運算成結(jié)果 RDD,處理結(jié)果;
?
? ? ?Spark的運行流程: Client提交應(yīng)用,master找到一個worker啟動driver[也可以其他],driver向master請求資源,之后將應(yīng)用轉(zhuǎn)化為RDD Graph,再由DAGScheduler將RDD Graph轉(zhuǎn)換為stage的DAG提交給TaskScheduler,由TaskScheduler提交任務(wù)給executor。
? ? ?從調(diào)度來看,經(jīng)歷了如下調(diào)度:application調(diào)度 -> Job調(diào)度 -> Stage調(diào)度 -> Task調(diào)度
?
二、細(xì)說RDD構(gòu)建過程
? ? ?從前面的學(xué)習(xí)我們發(fā)現(xiàn)?RDD 其實就是數(shù)據(jù)集,是一組數(shù)據(jù)被處理到一個階段的狀態(tài)。
? ? ?每一個?Spark Job 就是定義了由輸入 RDD,如何把它轉(zhuǎn)化成下一個狀態(tài),再下一個狀態(tài) …… 直到轉(zhuǎn)化成我們的輸出。這些轉(zhuǎn)化就是對 RDD 里每一個 data record 的操作。用個高大上點的語言,一個 Spark job 就是一系列的 RDD 以及他們之間的轉(zhuǎn)換關(guān)系。那么用戶如何才能定義 RDD 和轉(zhuǎn)換關(guān)系呢?換句話說,用戶如何使用 Spark 呢?
? ? ?用戶需要定義一個包含主函數(shù)的?Java (main) 類。在這個 main 函數(shù)中,無論業(yè)務(wù)邏輯多么復(fù)雜,無論你需要使用多少 Java 類,如果從 Spark 的角度簡化你的程序,那么其實就是:
? ? ???首先生成?JavaSparkContext 類的對象.
? ?????從?JavaSparkContext 類的對象里產(chǎn)生第一個輸入RDD. 以讀取 HDFS 作為數(shù)據(jù)源為例,調(diào)用 JavaSparkContext.textFile() 就生成第一個 RDD.
? ?????每個?RDD 都定義了一些標(biāo)準(zhǔn)的常用的變化,比如我們上面提到的 map, filter, reduceByKey …… 這些變化在 Spark 里叫做 transformation.
? ?????之后可以按照業(yè)務(wù)邏輯,調(diào)用這些函數(shù)。這些函數(shù)返回的也是?RDD, 然后繼續(xù)調(diào)用,產(chǎn)生新的RDD …… 循環(huán)往復(fù),構(gòu)建你的 RDD 關(guān)系圖。
? ?????注意?RDD 還定義了其他一些函數(shù),比如 collect, count, saveAsTextFile 等等,他們的返回值不是 RDD. 這些函數(shù)在 Spark 里叫做 actions, 他們通常作為 job 的結(jié)尾處理。
?? ????用戶調(diào)用?actions 產(chǎn)生輸出結(jié)果,Job 結(jié)束。
? ? Action 都是類似于 “數(shù)數(shù)這個 RDD 里有幾個 data record”, 或者 ”把這個 RDD 存入一個文件” 等等。想想他們作為結(jié)尾其實非常合理:我們使用 Spark 總是來實現(xiàn)業(yè)務(wù)邏輯的吧?處理得出的結(jié)果自然需要寫入文件,或者存入數(shù)據(jù)庫,或者數(shù)數(shù)有多少元素,或者其他一些統(tǒng)計什么的。所以 Spark 要求只有用戶使用了一個 action,一個 job 才算結(jié)束。當(dāng)然,一個 job 可以有多個 action,比如我們的數(shù)據(jù)既要存入文件,我們又期望知道有多少個元素。
? ? ?這些?RDD 組成的關(guān)系在 Spark 里叫做 DAG,就是有向無循環(huán)圖,圖論里的一個概念,大家有興趣可以專門翻翻這個概念。可以發(fā)現(xiàn),實踐中絕大部分業(yè)務(wù)邏輯都可以用 DAG 表示,所以 spark 把 job 定義成 DAG 也就不足為奇了。
?
RDD 的兩種變化
? ? ?我們上面剛剛介紹了?transformation 的概念。在 Spark 眼中,transformation 被分成 narrow transformation 和 wide transformation. 這是什么東西呢?
上文提到過?RDD 被分成幾個分區(qū),分散在多臺機(jī)器上。當(dāng)我們把一個 RDD A 轉(zhuǎn)化成下一個 RDD B 時,這里有兩種情況:
?· 有時候只需要一個?A 里面的一個分區(qū),就可以產(chǎn)生 B 里的一個分區(qū)了,比如 map 的例子:A 和 B 之間每個分區(qū)是一一對應(yīng)的關(guān)系,這就是 narrow transofmration.
?· 還有一類?transformation,可能需要 A 里面所有的分區(qū),才能產(chǎn)生 B 里的一個分區(qū),比如 reduceByKey的例子,這就是 wide transformation.
?
Narrow 或者 Wide 有什么關(guān)系嗎?
? ? 一個?Spark job 中可能需要連續(xù)地調(diào)用 transformation, 比如先 map,后 filter,然后再 map …… 那這些 RDD 的變化用圖表示就是:
?
?
? ? 我們可以大膽設(shè)想一下,如果每個分區(qū)里的數(shù)據(jù)就待在那臺機(jī)器的內(nèi)存里,我們逐一的調(diào)用?map, filter, map 函數(shù)到這些分區(qū)里,Job 就很好的完成。
更重要的是,由于數(shù)據(jù)沒有轉(zhuǎn)移到別的機(jī)器,我們避免了?Network IO 或者 Disk IO. 唯一的任務(wù)就是把 map / filter 的運行環(huán)境搬到這些機(jī)器上運行,這對現(xiàn)代計算機(jī)來說,overhead 幾乎可以忽略不計。
? ? ?這種把多個操作合并到一起,在數(shù)據(jù)上一口氣運行的方法在?Spark 里叫 pipeline (其實 pipeline 被廣泛應(yīng)用的很多領(lǐng)域,比如 CPU)。這時候不同就出現(xiàn)了:只有 narrow transformation 才可以進(jìn)行 pipleline 操作。對于 wide transformation, RDD 轉(zhuǎn)換需要很多分區(qū)運算,包括數(shù)據(jù)在機(jī)器間搬動,所以失去了 pipeline 的前提。
RDD 的執(zhí)行
? ?當(dāng)用戶調(diào)用?actions 函數(shù)時,Spark 會在后臺創(chuàng)建出一個 DAG. 就是說 Spark 不僅用 DAG 建模,而且真正地創(chuàng)建出一個 DAG, 然后執(zhí)行它(順便說一句 DAG 在 Spark 里不是用一個對象表示的,而是用 RDD 對象之間的關(guān)系)。?
?
?
? ? ?Spark 會把這個 DAG 交給一個叫 DAG scheduler 的模塊,DAG scheduler 會優(yōu)先使用 pipeline 方法,把 RDD 的 transformation 壓縮;當(dāng)我們遇到 wide transformation 時,由于之前的 narrow transformation 無法和 wide transformation pipeline, 那 DAG scheduler 會把前面的 transformation 定義成一個 stage.
重要的事情說三遍:DAG scheduler 會分析 Spark Job 所有的 transformation, 用 wide transformation 作為邊界,把所有 transformation 分成若干個stages. 一個 stage 里的一個分區(qū)就被 Spark 叫做一個task. 所以一個 task 是一個分區(qū)的數(shù)據(jù)和數(shù)據(jù)上面的操作,這些操作可能包括一個 transformation,也可能是多個,但一定是 narrow transformation.
? ? ?DAG scheduler 工作的結(jié)果就是產(chǎn)生一組 stages. 這組 stages 被傳到 Spark 的另一個組件 task scheduler, task scheduler 會使用集群管理器依次執(zhí)行 task, 當(dāng)所有的 task 執(zhí)行完畢,一個 stage 標(biāo)記完成;再運行下一個 stage …… 直到整個 Spark job 完成。
?
?
三、Job Stage的劃分算法
? ? ?從前文了解到的處理流程,RDD Graph->DAG Scheduler->Task Scheduler,DAG Scheduler將RDD轉(zhuǎn)換為Job Stage。
? ? ?由于Spark的算子構(gòu)建一般都是鏈?zhǔn)降?#xff0c;這就涉及了要如何進(jìn)行這些鏈?zhǔn)接嬎?#xff0c;Spark的策略是對這些算子,先劃分Stage,然后在進(jìn)行計算。
?????? 由于數(shù)據(jù)是分布式的存儲在各個節(jié)點上的,所以為了減少網(wǎng)絡(luò)傳輸?shù)拈_銷,就必須最大化的追求數(shù)據(jù)本地性,所謂的數(shù)據(jù)本地性是指,在計算時,數(shù)據(jù)本身已經(jīng)在內(nèi)存中或者利用已有緩存無需計算的方式獲取數(shù)據(jù)。
1. ?Stage劃分算法思想
(1)一個Job由多個Stage構(gòu)成
? ? ?一個Job可以有一個或者多個Stage,Stage劃分的依據(jù)就是寬依賴,產(chǎn)生寬依賴的算子:reduceByKey、groupByKey等等
(2)根據(jù)依賴關(guān)系,從前往后依次執(zhí)行多個Stage
?????? SparkApplication 中可以因為不同的Action觸發(fā)眾多的Job,也就是說一個Application中可以有很多的Job,每個Job是有一個或者多個Stage構(gòu)成,后面的Stage依賴前面的Stage,也就是說只有前面的Stage計算完后,后面的Stage才會運行。
?(3)Stage的執(zhí)行時Lazy級別的
?????? 所有的Stage會形成一個DAG(有向無環(huán)圖),由于RDD的Lazy特性,導(dǎo)致Stage也是Lazy級別的,只有遇到了Action才會真正發(fā)生作業(yè)的執(zhí)行,在Action之前,Spark框架只是將要進(jìn)行的計算記錄下來,并沒有真的執(zhí)行。Action導(dǎo)致作業(yè)執(zhí)行的代碼如下:觸發(fā)作業(yè),發(fā)送消息。消息的接收和處理:
(1)DAGScheduler啟動一個線程EventLoop(消息循環(huán)器),不斷地從消息隊列中取消息。消息是通過EventLoop的put方法放入消息隊列,當(dāng)EventLoop拿到消息后會回調(diào)DAGScheduler的OnReceive,進(jìn)而調(diào)用doOnReceive方法進(jìn)行處理。
為什么要開辟線程來執(zhí)行消息的讀、取?這樣可以提交更多的Job,異步處理多Job,處理的業(yè)務(wù)邏輯一致(調(diào)用自己方法也是發(fā)送消息),解耦合,擴(kuò)展性好。
(2)在doOnReceive中通過模式匹配的方式把JobSubmitted封裝的內(nèi)容路由到handleJobSubmitted。
(3)在handleJobSubmitted中首先創(chuàng)建finalStage。
(4)通過遞歸的方式創(chuàng)建DAG。
?
四、Task最佳計算位置算法
1.Task任務(wù)本算法運用場景??
? ? ? ?在上一節(jié),我們介紹了Job Stage劃分算法,并最終得到了DAG圖中的Result Stage(final Stage)。接下來我們通過查看Task任務(wù)本地性(為了保證Data Locality)的運用場景----Task的運行調(diào)度處理,來引入Task任務(wù)本地性算法。
? ? ? 在得到邏輯上Result Stage,Spark為了進(jìn)行計算就必須先報任務(wù)以一定的集群可識別形式提交給集群進(jìn)行計算。Spark的任務(wù)提交過程如下:
(1)生成ActiveJob,為提交finalStage做準(zhǔn)備。
(2)提交finalStage
提交Stage,如果有未提交的ParentStage,則會遞歸提交這些ParentStage,只有所有ParentStage都計算完了,才能提交當(dāng)前Stag
(3)提交MissingTask
??missingTask會最先會再到需要計算的分片,然后對Stage的運行環(huán)境進(jìn)行設(shè)定,然后取得Task計算的本地性級別,最后會根據(jù)這些信息建立Tasks來處理每個分片,在提交給底層TaskScheduler之前,Spark還會將Tasks封裝成TaskSet。最后提交TaskSet給TaskScheduler,等待TaskScheduler最終向集群提交這些Task,并且DAGScheduler會監(jiān)聽這些Task的狀態(tài)。
2.數(shù)據(jù)本地性
(1)這里我們來著重講解獲取數(shù)據(jù)本地性部分的代碼:
?
? ? ? ?這里會將要計算的分片(Partition)轉(zhuǎn)換為(id, getPreferredLocs(stage.rdd, id)) 類型的truple,進(jìn)而由truple轉(zhuǎn)換未一個Map映射,在Task構(gòu)造時需要一個locs參數(shù),便可以利用這個映射由id得到相應(yīng)Partition的本地性級別。
? ? ? 在每個分片(Partition)內(nèi)部則是通過getPreferredLocs方法得到的
在具體算法實現(xiàn)的時候,首先查詢DAGScheduler的內(nèi)存數(shù)據(jù)結(jié)構(gòu)中是否存在當(dāng)前partition的數(shù)據(jù)本地性信息,若有的話就直接放回該信息;若沒有首先會調(diào)用rdd.getPreferredLocations來得到數(shù)據(jù)的本地性。
? ? ? 例如想讓Spark運行在Hbase上或者是一種現(xiàn)在Spark還沒有直接支持的數(shù)據(jù)庫上,此時開發(fā)者需要自定義RDD,為了保證Task計算的數(shù)據(jù)本地性,最為關(guān)鍵的方式就是必須實現(xiàn)RDD的getPreferredLocations方法,來支持各種來源的數(shù)據(jù)。
? ? ?DAGScheduler計算數(shù)據(jù)本地性時,巧妙的借助了RDD自身的getPreferredLocations中的數(shù)據(jù),最大化的優(yōu)化效率,因為getPreferredLocations中表明了每個Partition的數(shù)據(jù)本地性。雖然當(dāng)然Partition可能被persist或checkpoint,但是persist或checkpoint默認(rèn)情況下肯定和getPreferredLocations中的partition的數(shù)據(jù)本地性是一致的。所以,這中算法就極大的簡化了Task數(shù)據(jù)本地性算法的實現(xiàn),并且優(yōu)化了效率
?
五、參考資料
1.http://mp.weixin.qq.com/s/nDRt1VQTYmsYcW98q4cvEQ-五分鐘深入 Spark 運行機(jī)制
2.http://blog.csdn.net/sinat_25306771/article/details/51429984
?
轉(zhuǎn)載于:https://www.cnblogs.com/licheng/p/6815297.html
總結(jié)
以上是生活随笔為你收集整理的【Spark 深入学习 04】再说Spark底层运行机制的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【Tomcat】Tomcat 系统架构与
- 下一篇: Jakarta Commons Logg