通俗说Spark
前面有一篇文章形象解析了Yarn的工作原理,這一篇文章通俗解析一下當前最火的大數據框架Spark。
通俗說基于Yarn的Map-Reduce過程
聽說過Spark 的人常聽到他強于Hadoop 的原因是他是基于內存的計算,因而比Hadoop快,可是數據量如此之大,怎么可能都放在內存里面呢?
當然不是所有的都在內存里面,Spark比hadoop快而是由Spark全新的運行機制決定的。
一提Spark 的大數據處理能力,有一個抽象的概念叫RDD,其實用戶可以邏輯地認為數據全在內存中,僅僅關注數據處理的邏輯即可,這有點像客戶提的需求,往往是抽象的,需要在實現的過程中慢慢的落地。(這里接著延續通俗說Yarn里面接項目的模式)
客戶開始口若懸河的描述他們想怎么處理這些數據,例如每個都加一(map),過濾掉一些(filter),合并一下(union),按照key做個匯總(reduceByKey),最后處理完了,寫入HDFS。
好了,你作為需求分析師開始記錄客戶想要做的改進,一一記下來,最后形成了一張如下的有向無環圖。
好了,接到需求了,開始干吧!等等,咱們統籌規劃一下,先別著急動手。
這是Spark和hadoop編程模型不同的地方,上述的所有的操作,map-reduce看到一條就做一條,例如每個都加一,構成一個map-reduce,讀一次磁盤,寫一次磁盤,同理過濾,構成另一個map-reduce,以此類推,所以整個處理過程比較慢。
Spark當看到一個需求的時候,判斷這是一個中間狀態的轉換(Transformation),還是客戶要的最后結果,如果是中間狀態,則等等看,如果是客戶要結果了,才開始真正的行動(Action)。
當要行動的時候,客戶的需求已經完全清楚了,可以統籌規劃。
這個做統籌規劃的人叫DAGScheduler,他不真正的執行任務,也不調度任務的執行,既不是程序員,也不是項目經理,而是需求分析師,給出來的常稱為High Level Design。
DAGScheduler中的DAG是Directed Acyclic Graph,有向無環圖,他講客戶的需求畫成一個圖,這樣看起來清晰多了,然后從最后客戶想要的結果入手進行分析。
客戶想要的結果肯定是最后一步finalStage,然后看要得到這個結果的上一步的數據是什么,因為數據量很大,就像上一次講Yarn一樣,需要分成多個團隊并行處理,但是有時候并行不起來,還需要匯總一下結果,在Spark里面也是這樣的,如果上一步的數據到這一步子團隊內部就能搞定,就不需要開會匯總,互相交換資源,例如上一步的數據是每個省的高考分數,這一步要求得到每個省的最高分數,這樣自己省里面自己就能搞定,這叫做窄依賴,可以在一個Stage里面搞定,這就是Spark比較好的一個模式,只要是窄依賴,不需要啟動另外的map-reducce,在一個子團隊一直做,如果每個子團隊處理數據量不大,就不用落盤,直到需要匯總的那個時候。
什么時候需要匯總呢?例如上一步是每個省的高考分數,這一步求全國的文科和理科狀元,這樣一個省內就搞不定了,需要大家開會匯總,每個省將自己的文科狀元和理科狀元報上來,分別在理科處理子團隊和文科處理子團隊進行排序處理,這個過程各個省之間的數據需要交換,因而這個過程往往成為數據處理的瓶頸,稱為寬依賴,寬依賴會將處理過程分成兩個Stage,因為一個子團隊內部搞不定了,需要人統籌開會,交換資源,這個過程叫做shuffle,專門組織會議,統籌這件事情的叫做shuffleManager
就是按照這種思路,DAGScheduler從最后的輸出結果往前推,凡是窄依賴的算作一個Stage,凡是寬依賴的算作另外的Stage,于是將整個處理的圖劃分為多個Stage,規劃階段就算可以了。
接下來是要執行了。DAGScheduler會創建一個Job,對于每一個Stage,因為Stage內部都是窄依賴,因而可以分成多個團隊并行處理的,當一個Stage處理完畢,進入下一個Stage的時候,會進行一次Shuffle,也即集中開會交換數據,然后下一個Stage同樣多個團隊并行處理,直到輸出結果。
對于DAGScheduler來講,作為需求分析師,將任務劃分為Stage以后,接下來要將每個Stage劃分為并行的Task,以便不同的團隊處理不同的Task。除了最后輸出階段的Task叫做ResultTask以外,前面的Stage的Task都稱為MapShuffleTask。
分好了Task,具體任務交給哪個團隊,需求分析師就不管了,是項目經理的事情,由另外的TaskScheduler負責分配。
TaskScheduler相當于項目經理,開始為這個job的task分配資源,但是往往項目經理接的項目不止一個,整個團隊的資源就這么多,因而需要一個調度算法,資源先給誰,后給誰。這里面有兩種算法,fifo是先來先得,哪個job先來,先給誰資源,fire是公平算法,對于已經接手的項目,資源平均分配。
當TaskScheduler決定要并行的運行一組Task的時候,就讓自己的助理SchedulerBackend去真的申請資源,去哪里申請呢?當然是Mesos,Yarn,或者Spark自己實現的standalone。Mesos和Yarn如何分配資源的,請參考另外的文章。
反正資源管理系統會在資源足夠的節點上為任務分配空間,然后會通知節點上啟動一個Executer來真正的做事情,是最終真正干活的人。
Executer創建好后會向SchedulerBackend報告,一個執行這個任務的虛擬小組就算成立了。
Executer做什么呢?當然聽領導安排了,領導會將任務要處理哪些數據,如何處理等配置序列化后發給它,它讀取解序列化后就知道要做啥了,開始干貨吧。
前面說過了,任務有兩種,一種是ShuffleMapTask,一種是ResultTask,處理方式各不相同。除了最后輸出結果是ResultTask,其他的處理任務都是ShuffleMapTask。
我們先來說ShuffleMapTask,為什么叫做ShuffleMapTask呢?因為作為Spark的中間過程,首先會從上一個Stage獲取結果,而從上一個Stage到這一個Stage,一定是一個寬依賴,所以一定是經歷了一個Shuffle的過程,這個Stage獲取到的結果,其實是上一Stage Shuffle的結果,得到結果后,在本Stage中運行Map任務,并行的處理這個Stage的所有處理,由于同一個Stage里面都是窄依賴,所以不需要匯總交換信息,當本Stage的Map處理完畢之后,接著進行下一個Stage,由于又要跨Stage了,又要進行一次Shuffle了,只不過這個Stage是Shuffle的前一半,下一半會在下一個Stage完成。
所以說一個shuffle的過程是跨兩個Stage的,跨兩個ShuffleMapTask的。Shuffle的過程由于比較復雜,就像需要開大會,匯總,交換信息來搞定,所以需要一些人來總管這件事情,稱為ShuffleManager,在Master端和Executor端都有進程,會統一管理上一個Stage會輸出多少個部分,下一個Stage會輸入多少個部分,稱為partition,誰的數據應該拷貝給誰,ShuffleManager有很多的實現方式,當前主流的是SortShuffleManager。
例如上面的高考分數例子中,上一個Stage的partition的數目是省份的數量,下一個Stage的partition的數量是2,一個文科,一個理科,如何匯總數據呢,當然應該將每個省里面的文科的分數和理科的分數單獨保存,然后每個省的文科的分數都會拷貝給下一個Stage文科這個partition,每個省的理科的分數都拷貝給下一個Stage理科這個partition。
在上一個Stage的Map階段,寫入結果的是SortShuffleWriter,他的工作機制如下:
-
對于每一個Partition,創建一個Array,將屬于這個Partition的key/value放入數組
-
如果Array超過閾值,寫寫入外部存儲,外部存儲會記錄這個Partition的ID和保存了多少條目
-
將所有寫入到外部存儲的文件進行歸并
-
生成最后的數據文件是,需要生成Index文件,記錄partition的位置
當結果寫入文件后,會將結果的狀態MapStatus匯報給SortShuffleManager,將這個結果保存在MapOutputTracker
這一個Stage中,會從SortShuffleReader中讀取上面的結果,會調用BlockStoreShuffleFetcher的fetch函數,先從MapOutputTracker中得到元數據。然后根據元數據獲取數據,如果在本地,則通過BlockManager讀取,如果在其他Executor上,則調用BlockTransferService進程傳輸。
當本Stage的Shuffle結束之后,接下來就是好好的做自己的事情了,每個Partition可以在自己的窄依賴里面轉換,不需要和其他的Partition溝通,例如map,filter等等。當本Stage的裝換做完,到了一個寬依賴的時候,就需要開啟新的Shuffle,為下一個Stage做準備了。
如此一個Stage一個Stage的下去,知道最后要輸出結果的Stage了,在這個Stage里面的分給每個Executor的任務稱為ResultTask。如果是ResultTask,則是最后一個Stage,獲取RDD和作用于RDD的函數func,對RDD這個partition的每一項,執行func函數,例如寫入HDFS。
總結
- 上一篇: 图解Linux的Socket
- 下一篇: 通俗说基于Yarn的Map-Reduce