深入理解Spark 2.1 Core (一):RDD的原理与源码分析
摘要
本文提出了分布式內存抽象的概念——彈性分布式數據集(RDD,Resilient Distributed Datasets),它具備像MapReduce等數據流模型的容錯特性,并且允許開發人員在大型集群上執行基于內存的計算?,F有的數據流系統對兩種應用的處理并不高效:一是迭代式算法,這在圖應用和機器學習領域很常見;二是交互式數據挖掘工具。這兩種情況下,將數據保存在內存中能夠極大地提高性能。為了有效地實現容錯,RDD提供了一種高度受限的共享內存,即RDD是只讀的,并且只能通過其他RDD上的批量操作來創建(注:還可以由外部存儲系數據集創建,如HDFS)。盡管如此,RDD仍然足以表示很多類型的計算,包括MapReduce和專用的迭代編程模型(如Pregel)等。我們實現的RDD在迭代計算方面比Hadoop快20多倍,同時還可以在5-7秒內交互式地查詢1TB數據集。
1.引言
無論是工業界還是學術界,都已經廣泛使用高級集群編程模型來處理日益增長的數據,如MapReduce和Dryad。這些系統將分布式編程簡化為自動提供位置感知性調度、容錯以及負載均衡,使得大量用戶能夠在商用集群上分析超大數據集。
大多數現有的集群計算系統都是基于非循環的數據流模型。從穩定的物理存儲(如分布式文件系統)(注:即磁盤)中加載記錄,記錄被傳入由一組確定性操作構成的DAG,然后寫回穩定存儲。DAG數據流圖能夠在運行時自動實現任務調度和故障恢復。
盡管非循環數據流是一種很強大的抽象方法,但仍然有些應用無法使用這種方式描述。我們就是針對這些不太適合非循環模型的應用,它們的特點是在多個并行操作之間重用工作數據集。這類應用包括:(1)機器學習和圖應用中常用的迭代算法(每一步對數據執行相似的函數)(注:有許多機器學習算法需要將這次迭代權值調優后的結果數據集作為下次迭代的輸入,而使用MapReduce計算框架經過一次Reduce操作后輸出數據結果寫回磁盤,速度大大的降低了);(2)交互式數據挖掘工具(用戶反復查詢一個數據子集)?;跀祿鞯目蚣懿⒉幻鞔_支持工作集,所以需要將數據輸出到磁盤,然后在每次查詢時重新加載,這帶來較大的開銷。
我們提出了一種分布式的內存抽象,稱為彈性分布式數據集(RDD,Resilient Distributed Datasets)。它支持基于工作集的應用,同時具有數據流模型的特點:自動容錯、位置感知調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,后續的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而創建,然而這些限制使得實現容錯的開銷很低。與分布式共享內存系統需要付出高昂代價的檢查點和回滾機制不同,RDD通過Lineage來重建丟失的分區:一個RDD中包含了如何從其他RDD衍生所必需的相關信息,從而不需要檢查點操作就可以重構丟失的數據分區。盡管RDD不是一個通用的共享內存抽象,但卻具備了良好的描述能力、可伸縮性和可靠性,但卻能夠廣泛適用于數據并行類應用。
第一個指出非循環數據流存在不足的并非是我們,例如,Google的Pregel[21],是一種專門用于迭代式圖算法的編程模型;Twister[13]和HaLoop[8],是兩種典型的迭代式MapReduce模型。但是,對于一些特定類型的應用,這些系統提供了一個受限的通信模型。相比之下,RDD則為基于工作集的應用提供了更為通用的抽象,用戶可以對中間結果進行顯式的命名和物化,控制其分區,還能執行用戶選擇的特定操作(而不是在運行時去循環執行一系列MapReduce步驟)。RDD可以用來描述Pregel、迭代式MapReduce,以及這兩種模型無法描述的其他應用,如交互式數據挖掘工具(用戶將數據集裝入內存,然后執行ad-hoc查詢)。
Spark是我們實現的RDD系統,在我們內部能夠被用于開發多種并行應用。Spark采用Scala語言[5]實現,提供類似于DryadLINQ的集成語言編程接口[34],使用戶可以非常容易地編寫并行任務。此外,隨著scala新版本解釋器的完善,Spark還能夠用于交互式查詢大數據集。我們相信Spark會是第一個能夠使用有效、通用編程語言,并在集群上對大數據集進行交互式分析的系統。
我們通過微基準和用戶應用程序來評估RDD。實驗表明,在處理迭代式應用上Spark比hadoop快高達20多倍,計算數據分析類報表的性能提高了40多倍,同時能夠在5-7秒的延時內交互式掃描1TB數據集。此外,我們還在Spark之上實現了Pregel和HaLoop編程模型(包括其位置優化策略),以庫的形式實現(分別使用了100和200行Scala代碼)。最后,利用RDD內在的確定性特性,我們還創建了一種Spark調試工具rddbg,允許用戶在任務期間利用Lineage重建RDD,然后像傳統調試器那樣重新執行任務。
本文首先在第2部分介紹了RDD的概念,然后第3部分描述Spark API,第4部分解釋如何使用RDD表示幾種并行應用(包括Pregel和HaLoop),第5部分討論Spark中RDD的表示方法以及任務調度器,第6部分描述具體實現和rddbg,第7部分對RDD進行評估,第8部分給出了相關研究工作,最后第9部分總結。
2.彈性分布式數據集(RDD)
本部分描述RDD和編程模型。首先討論設計目標(2.1),然后定義RDD(2.2),討論Spark的編程模型(2.3),并給出一個示例(2.4),最后對比RDD與分布式共享內存(2.5)。
2.1 目標和概述
我們的目標是為基于工作集的應用(即多個并行操作重用中間結果的這類應用)提供抽象,同時保持MapReduce及其相關模型的優勢特性:即自動容錯、位置感知性調度和可伸縮性。RDD比數據流模型更易于編程,同時基于工作集的計算也具有良好的描述能力。
在這些特性中,最難實現的是容錯性。一般來說,分布式數據集的容錯性有兩種方式:即數據檢查點和記錄數據的更新。我們面向的是大規模數據分析,數據檢查點操作成本很高:需要通過數據中心的網絡連接在機器之間復制龐大的數據集,而網絡帶寬往往比內存帶寬低得多,同時還需要消耗更多的存儲資源(在內存中復制數據可以減少需要緩存的數據量,而存儲到磁盤則會拖慢應用程序)。所以,我們選擇記錄更新的方式。但是,如果更新太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。將創建RDD的一系列轉換記錄下來(即Lineage),以便恢復丟失的分區。
雖然只支持粗粒度轉換限制了編程模型,但我們發現RDD仍然可以很好地適用于很多應用,特別是支持數據并行的批量分析應用,包括數據挖掘、機器學習、圖算法等,因為這些程序通常都會在很多記錄上執行相同的操作。RDD不太適合那些異步更新共享狀態的應用,例如并行web爬蟲。因此,我們的目標是為大多數分析型應用提供有效的編程模型,而其他類型的應用交給專門的系統。
2.2 RDD抽象
RDD是只讀的、分區記錄的集合。RDD只能基于在穩定物理存儲中的數據集和其他已有的RDD上執行確定性操作來創建。這些確定性操作稱之為轉換,如map、filter、groupBy、join(轉換不是程序開發人員在RDD上執行的操作(注:這句話的意思可能是,轉換操作并不會觸發RDD真正的action。由于惰性執行,當進行action操作的時候,才會回溯去執行前面的轉換操作))。
RDD不需要物化。RDD含有如何從其他RDD衍生(即計算)出本RDD的相關信息(即Lineage),據此可以從物理存儲的數據計算出相應的RDD分區。
2.3 編程模型
在Spark中,RDD被表示為對象,通過這些對象上的方法(或函數)調用轉換。
定義RDD之后,程序員就可以在動作(注:即action操作)中使用RDD了。動作是向應用程序返回值,或向存儲系統導出數據的那些操作,例如,count(返回RDD中的元素個數),collect(返回元素本身),save(將RDD輸出到存儲系統)。在Spark中,只有在動作第一次使用RDD時,才會計算RDD(即延遲計算)。這樣在構建RDD的時候,運行時通過管道的方式傳輸多個轉換。
程序員還可以從兩個方面控制RDD,即緩存和分區。用戶可以請求將RDD緩存,這樣運行時將已經計算好的RDD分區存儲起來,以加速后期的重用。緩存的RDD一般存儲在內存中,但如果內存不夠,可以寫到磁盤上。
另一方面,RDD還允許用戶根據關鍵字(key)指定分區順序,這是一個可選的功能。目前支持哈希分區和范圍分區。例如,應用程序請求將兩個RDD按照同樣的哈希分區方式進行分區(將同一機器上具有相同關鍵字的記錄放在一個分區),以加速它們之間的join操作。在Pregel和HaLoop中,多次迭代之間采用一致性的分區置換策略進行優化,我們同樣也允許用戶指定這種優化。
(注:
)
2.4 示例:控制臺日志挖掘
本部分我們通過一個具體示例來闡述RDD。假定有一個大型網站出錯,操作員想要檢查Hadoop文件系統(HDFS)中的日志文件(TB級大小)來找出原因。通過使用Spark,操作員只需將日志中的錯誤信息裝載到一組節點的內存中,然后執行交互式查詢。首先,需要在Spark解釋器中輸入如下Scala代碼:
lines = spark.textFile("hdfs://...")errors = lines.filter(_.startsWith("ERROR"))errors.cache()- 1
第1行從HDFS文件定義了一個RDD(即一個文本行集合),第2行獲得一個過濾后的RDD,第3行請求將errors緩存起來。注意在Scala語法中filter的參數是一個閉包(什么是閉包?https://zhuanlan.zhihu.com/p/21346046)。
這時集群還沒有開始執行任何任務。但是,用戶已經可以在這個RDD上執行對應的動作,例如統計錯誤消息的數目:
errors.count()用戶還可以在RDD上執行更多的轉換操作,并使用轉換結果,如:
// Count errors mentioning MySQL: errors.filter(_.contains("MySQL")).count() // Return the time fields of errors mentioning // HDFS as an array (assuming time is field // number 3 in a tab-separated format): errors.filter(_.contains("HDFS")).map(_.split('\t')(3)).collect()- 1
使用errors的第一個action運行以后,Spark會把errors的分區緩存在內存中,極大地加快了后續計算速度。注意,最初的RDD lines不會被緩存。因為錯誤信息可能只占原數據集的很小一部分(小到足以放入內存)。
最后,為了說明模型的容錯性,圖1給出了第3個查詢的Lineage圖。在lines RDD上執行filter操作,得到errors,然后再filter、map后得到新的RDD,在這個RDD上執行collect操作。Spark調度器以流水線的方式執行后兩個轉換,向擁有errors分區緩存的節點發送一組任務。此外,如果某個errors分區丟失,Spark只在相應的lines分區上執行filter操作來重建該errors分區。
2.5 RDD與分布式共享內存
為了進一步理解RDD是一種分布式的內存抽象,表1列出了RDD與分布式共享內存(DSM,Distributed Shared Memory)[24]的對比。在DSM系統中,應用可以向全局地址空間的任意位置進行讀寫操作。(注意這里的DSM,不僅指傳統的共享內存系統,還包括那些通過分布式哈希表或分布式文件系統進行數據共享的系統,比如Piccolo[28](注:Spark生態系統中有一名為Alluxio的分布式內存文件系統,它通??勺鳛镾park和HDFS的中間層存在 ))DSM是一種通用的抽象,但這種通用性同時也使得在商用集群上實現有效的容錯性更加困難。
RDD與DSM主要區別在于,不僅可以通過批量轉換創建(即“寫”)RDD,還可以對任意內存位置讀寫。也就是說,RDD限制應用執行批量寫操作,這樣有利于實現有效的容錯。特別地,RDD沒有檢查點開銷,因為可以使用Lineage來恢復RDD。而且,失效時只需要重新計算丟失的那些RDD分區,可以在不同節點上并行執行,而不需要回滾整個程序。
表1 RDD與DSM對比
| 讀 | 批量或細粒度操作 | 細粒度操作 |
| 寫 | 批量轉換操作 | 細粒度操作 |
| 一致性 | 不重要(RDD是不可更改的) | 取決于應用程序或運行時 |
| 容錯性 | 細粒度,低開銷(使用Lineage) | 需要檢查點操作和程序回滾 |
| 落后任務的處理 | 任務備份 | 很難處理 |
| 任務安排 | 基于數據存放的位置自動實現 | 取決于應用程序(通過運行時實現透明性) |
| 如果內存不夠 | 與已有的數據流系統類似 | 性能較差 |
注意,通過備份任務的拷貝,RDD還可以處理落后任務(即運行很慢的節點),這點與MapReduce[12]類似。而DSM則難以實現備份任務,因為任務及其副本都需要讀寫同一個內存位置。
與DSM相比,RDD模型有兩個好處。第一,對于RDD中的批量操作,運行時將根據數據存放的位置來調度任務,從而提高性能。第二,對于基于掃描的操作,如果內存不足以緩存整個RDD,就進行部分緩存。把內存放不下的分區存儲到磁盤上,此時性能與現有的數據流系統差不多。
最后看一下讀操作的粒度。RDD上的很多動作(如count和collect)都是批量讀操作,即掃描整個數據集,可以將任務分配到距離數據最近的節點上。同時,RDD也支持細粒度操作,即在哈希或范圍分區的RDD上執行關鍵字查找。
3. Spark編程接口
Spark用Scala[5]語言實現了RDD的API。Scala是一種基于JVM的靜態類型、函數式、面向對象的語言。我們選擇Scala是因為它簡潔(特別適合交互式使用)、有效(因為是靜態類型)。但是,RDD抽象并不局限于函數式語言,也可以使用其他語言來實現RDD,比如像Hadoop[2]那樣用類表示用戶函數。
要使用Spark,開發者需要編寫一個driver程序,連接到集群以運行Worker,如圖2所示。Driver定義了一個或多個RDD,并調用RDD上的動作。Worker是長時間運行的進程,將RDD分區以Java對象的形式緩存在內存中。
圖2 Spark的運行時。用戶的driver程序啟動多個worker,worker從分布式文件系統中讀取數據塊,并將計算后的RDD分區緩存在內存中。
再看看2.4中的例子,用戶執行RDD操作時會提供參數,比如map傳遞一個閉包(closure,函數式編程中的概念)。Scala將閉包表示為Java對象,如果傳遞的參數是閉包,則這些對象被序列化,通過網絡傳輸到其他節點上進行裝載。Scala將閉包內的變量保存為Java對象的字段。例如,var x = 5; rdd.map(_ + x) 這段代碼將RDD中的每個元素加5??偟膩碚f,Spark的語言集成類似于DryadLINQ。
RDD本身是靜態類型對象,由參數指定其元素類型。例如,RDD[int]是一個整型RDD。不過,我們舉的例子幾乎都省略了這個類型參數,因為Scala支持類型推斷。
雖然在概念上使用Scala實現RDD很簡單,但還是要處理一些Scala閉包對象的反射問題。如何通過Scala解釋器來使用Spark還需要更多工作,這點我們將在第6部分討論。不管怎樣,我們都不需要修改Scala編譯器。
3.1 Spark中的RDD操作
表2列出了Spark中的RDD轉換和動作。每個操作都給出了標識,其中方括號表示類型參數。前面說過轉換是延遲操作,用于定義新的RDD;而動作啟動計算操作,并向用戶程序返回值或向外部存儲寫數據。
注意,有些操作只對鍵值對可用,比如join。另外,函數名與Scala及其他函數式語言中的API匹配,例如map是一對一的映射,而flatMap是將每個輸入映射為一個或多個輸出(與MapReduce中的map類似)。
除了這些操作以外,用戶還可以請求將RDD緩存起來。而且,用戶還可以通過Partitioner類獲取RDD的分區順序,然后將另一個RDD按照同樣的方式分區。有些操作會自動產生一個哈?;蚍秶謪^的RDD,像groupByKey,reduceByKey和sort等。
4. 應用程序示例
現在我們講述如何使用RDD表示幾種基于數據并行的應用。首先討論一些迭代式機器學習應用(4.1),然后看看如何使用RDD描述幾種已有的集群編程模型,即MapReduce(4.2),Pregel(4.3),和Hadoop(4.4)。最后討論一下RDD不適合哪些應用(4.5)。
4.1 迭代式機器學習
很多機器學習算法都具有迭代特性,運行迭代優化方法來優化某個目標函數,例如梯度下降方法。如果這些算法的工作集能夠放入內存,將極大地加速程序運行。而且,這些算法通常采用批量操作,例如映射和求和,這樣更容易使用RDD來表示。
例如下面的程序是邏輯回歸[15]的實現。邏輯回歸是一種常見的分類算法,即尋找一個最佳分割兩組點(即垃圾郵件和非垃圾郵件)的超平面w。算法采用梯度下降的方法:開始時w為隨機值,在每一次迭代的過程中,對w的函數求和,然后朝著優化的方向移動w。
val points = spark.textFile(...).map(parsePoint).persist() var w = // random initial vector for (i <- 1 to ITERATIONS) {val gradient = points.map{ p =>p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y}.reduce((a,b) => a+b)w -= gradient }首先定義一個名為points的緩存RDD,這是在文本文件上執行map轉換之后得到的,即將每個文本行解析為一個Point對象。然后在points上反復執行map和reduce操作,每次迭代時通過對當前w的函數進行求和來計算梯度。7.1小節我們將看到這種在內存中緩存points的方式,比每次迭代都從磁盤文件裝載數據并進行解析要快得多。
已經在Spark中實現的迭代式機器學習算法還有:kmeans(像邏輯回歸一樣每次迭代時執行一對map和reduce操作),期望最大化算法(EM,兩個不同的map/reduce步驟交替執行),交替最小二乘矩陣分解和協同過濾算法。Chu等人提出迭代式MapReduce也可以用來實現常用的學習算法[11]。
4.2 使用RDD實現MapReduce
MapReduce模型[12]很容易使用RDD進行描述。假設有一個輸入數據集(其元素類型為T),和兩個函數myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],代碼如下:
data.flatMap(myMap).groupByKey().map((k, vs) => myReduce(k, vs))- 1
如果任務包含combiner,則相應的代碼為:
data.flatMap(myMap).reduceByKey(myCombiner).map((k, v) => myReduce(k, v))ReduceByKey操作在mapper節點上執行部分聚集,與MapReduce的combiner類似。
4.3 使用RDD實現Pregel
略
4.4 使用RDD實現HaLoop
略
4.5 不適合使用RDD的應用
在2.1節我們討論過,RDD適用于具有批量轉換需求的應用,并且相同的操作作用于數據集的每一個元素上。在這種情況下,RDD能夠記住每個轉換操作,對應于Lineage圖中的一個步驟,恢復丟失分區數據時不需要寫日志記錄大量數據。RDD不適合那些通過異步細粒度地更新來共享狀態的應用,例如Web應用中的存儲系統,或者增量抓取和索引Web數據的系統,這樣的應用更適合使用一些傳統的方法,例如數據庫、RAMCloud[26]、Percolator[27]和Piccolo[28]。我們的目標是,面向批量分析應用的這類特定系統,提供一種高效的編程模型,而不是一些異步應用程序。
5. RDD的描述及作業調度
我們希望在不修改調度器的前提下,支持RDD上的各種轉換操作,同時能夠從這些轉換獲取Lineage信息。為此,我們為RDD設計了一組小型通用的內部接口。
簡單地說,每個RDD都包含:(1)一組RDD分區(partition,即數據集的原子組成部分);(2)對父RDD的一組依賴,這些依賴描述了RDD的Lineage;(3)一個函數,即在父RDD上執行何種計算;(4)元數據,描述分區模式和數據存放的位置。例如,一個表示HDFS文件的RDD包含:各個數據塊的一個分區,并知道各個數據塊放在哪些節點上。而且這個RDD上的map操作結果也具有同樣的分區,map函數是在父數據上執行的。表3總結了RDD的內部接口。
表3 Spark中RDD的內部接口
| partitions() | 返回一組Partition對象 |
| preferredLocations(p) | 根據數據存放的位置,返回分區p在哪些節點訪問更快 |
| dependencies() | 返回一組依賴 |
| iterator(p, parentIters) | 按照父分區的迭代器,逐個計算分區p的元素 |
| partitioner() | 返回RDD是否hash/range分區的元數據信息 |
設計接口的一個關鍵問題就是,如何表示RDD之間的依賴。我們發現RDD之間的依賴關系可以分為兩類,即:(1)窄依賴(narrow dependencies):子RDD的每個分區依賴于常數個父分區(即與數據規模無關);(2)寬依賴(wide dependencies):子RDD的每個分區依賴于所有父RDD分區。例如,map產生窄依賴,而join則是寬依賴(除非父RDD被哈希分區)。另一個例子見圖5。
(注:我們可以這樣認為:
窄依賴指的是:每個parent RDD 的 partition 最多被 child RDD的一個partition使用
寬依賴指的是:每個parent RDD 的 partition 被多個 child RDD的partition使用
窄依賴每個child RDD 的partition的生成操作都是可以并行的,而寬依賴則需要所有的parent partition shuffle結果得到后再進行。
下面我們來看下,我們來看下org.apache.spark.Dependency.scala的源碼
抽象類Dependency:
abstract class Dependency[T] extends Serializable {def rdd: RDD[T] }- 1
Dependency有兩個子類,一個子類為窄依賴:NarrowDependency;一個為寬依賴ShuffleDependency
NarrowDependency也是一個抽象類,它實現了getParents 重寫了 rdd 函數,它有兩個子類,一個是 OneToOneDependency,一個是 RangeDependency
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {/*** Get the parent partitions for a child partition.* @param partitionId a partition of the child RDD* @return the partitions of the parent RDD that the child partition depends upon*/def getParents(partitionId: Int): Seq[Int]override def rdd: RDD[T] = _rdd }- 1
OneToOneDependency,可以看到getParents實現很簡單,就是傳進一個partitionId: Int,再把partitionId放在List里面傳出去,即去parent RDD 中取與該RDD 相同 partitionID的數據
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = List(partitionId) }RangeDependency,用于union。與上面不同的是,這里我們要算出該位置,設某個parent RDD 從 inStart 開始的partition,逐個生成了 child RDD 從outStart 開始的partition,則計算方式為: partitionId - outStart + inStart
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)extends NarrowDependency[T](rdd) {override def getParents(partitionId: Int): List[Int] = {if (partitionId >= outStart && partitionId < outStart + length) {List(partitionId - outStart + inStart)} else {Nil}} }ShuffleDependency,需要進行shuffle
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]] {override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getNameprivate[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName// Note: It's possible that the combiner class tag is null, if the combineByKey// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.private[spark] val combinerClassName: Option[String] =Option(reflect.classTag[C]).map(_.runtimeClass.getName) //獲取shuffleIDval shuffleId: Int = _rdd.context.newShuffleId() //向注冊shuffleManager注冊Shuffle信息val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(shuffleId, _rdd.partitions.length, this)_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) }- 1
)
區分這兩種依賴很有用。首先,窄依賴允許在一個集群節點上以流水線的方式(pipeline)計算所有父分區。例如,逐個元素地執行map、然后filter操作;而寬依賴則需要首先計算好所有父分區數據,然后在節點之間進行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進行失效節點的恢復,即只需重新計算丟失RDD分區的父分區,而且不同節點之間可以并行計算;而對于一個寬依賴關系的Lineage圖,單個節點失效可能導致這個RDD的所有祖先丟失部分分區,因而需要整體重新計算。
通過RDD接口,Spark只需要不超過20行代碼實現便可以實現大多數轉換。5.1小節給出了例子,然后我們討論了怎樣使用RDD接口進行調度(5.2),最后討論一下基于RDD的程序何時需要數據檢查點操作(5.3)。
5.2 Spark任務調度器
5.3 檢查點
盡管RDD中的Lineage信息可以用來故障恢復,但對于那些Lineage鏈較長的RDD來說,這種恢復可能很耗時。例如4.3小節中的Pregel任務,每次迭代的頂點狀態和消息都跟前一次迭代有關,所以Lineage鏈很長。如果將Lineage鏈存到物理存儲中,再定期對RDD執行檢查點操作就很有效。
一般來說,Lineage鏈較長、寬依賴的RDD需要采用檢查點機制。這種情況下,集群的節點故障可能導致每個父RDD的數據塊丟失,因此需要全部重新計算[20]。將窄依賴的RDD數據存到物理存儲中可以實現優化,例如前面4.1小節邏輯回歸的例子,將數據點和不變的頂點狀態存儲起來,就不再需要檢查點操作。
當前Spark版本提供檢查點API,但由用戶決定是否需要執行檢查點操作。今后我們將實現自動檢查點,根據成本效益分析確定RDD Lineage圖中的最佳檢查點位置。
值得注意的是,因為RDD是只讀的,所以不需要任何一致性維護(例如寫復制策略,分布式快照或者程序暫停等)帶來的開銷,后臺執行檢查點操作。
(注:
我們來閱讀下org.apache.spark.rdd.ReliableCheckpointRDD中的def writePartitionToCheckpointFile 和 def writeRDDToCheckpointDirectory:
writePartitionToCheckpointFile:把RDD一個Partition文件里面的數據寫到一個Checkpoint文件里面
def writePartitionToCheckpointFile[T: ClassTag](path: String,broadcastedConf: Broadcast[SerializableConfiguration],blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {val env = SparkEnv.get//獲取Checkpoint文件輸出路徑val outputDir = new Path(path)val fs = outputDir.getFileSystem(broadcastedConf.value.value)//根據partitionId 生成 checkpointFileNameval finalOutputName = ReliableCheckpointRDD.checkpointFileName(ctx.partitionId())//拼接路徑與文件名val finalOutputPath = new Path(outputDir, finalOutputName)//生成臨時輸出路徑val tempOutputPath =new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")if (fs.exists(tempOutputPath)) {throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")}//得到塊大小,默認為64MBval bufferSize = env.conf.getInt("spark.buffer.size", 65536)//得到文件輸出流val fileOutputStream = if (blockSize < 0) {fs.create(tempOutputPath, false, bufferSize)} else {// This is mainly for testing purposefs.create(tempOutputPath, false, bufferSize,fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)}//序列化文件輸出流val serializer = env.serializer.newInstance()val serializeStream = serializer.serializeStream(fileOutputStream)Utils.tryWithSafeFinally {//寫數據serializeStream.writeAll(iterator)} {serializeStream.close()}if (!fs.rename(tempOutputPath, finalOutputPath)) {if (!fs.exists(finalOutputPath)) {logInfo(s"Deleting tempOutputPath $tempOutputPath")fs.delete(tempOutputPath, false)throw new IOException("Checkpoint failed: failed to save output of task: " +s"${ctx.attemptNumber()} and final output path does not exist: $finalOutputPath")} else {// Some other copy of this task must've finished before us and renamed itlogInfo(s"Final output path $finalOutputPath already exists; not overwriting it")if (!fs.delete(tempOutputPath, false)) {logWarning(s"Error deleting ${tempOutputPath}")}}}}- 1
writeRDDToCheckpointDirectoryWrite,將一個RDD寫入到多個checkpoint文件,并返回一個ReliableCheckpointRDD來代表這個RDD
def writeRDDToCheckpointDirectory[T: ClassTag](originalRDD: RDD[T],checkpointDir: String,blockSize: Int = -1): ReliableCheckpointRDD[T] = {val sc = originalRDD.sparkContext// 生成 checkpoint文件 的輸出路徑val checkpointDirPath = new Path(checkpointDir)val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)if (!fs.mkdirs(checkpointDirPath)) {throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")}// 保存文件,并重新加載它作為一個RDDval broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))sc.runJob(originalRDD,writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)if (originalRDD.partitioner.nonEmpty) {writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)}val newRDD = new ReliableCheckpointRDD[T](sc, checkpointDirPath.toString, originalRDD.partitioner)if (newRDD.partitions.length != originalRDD.partitions.length) {throw new SparkException(s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")}newRDD}- 24
以上源碼有可以改進的地方,因為重新計算RDD其實是沒有必要的。
RDD checkpoint之后得到了一個新的RDD,那么child RDD 如何知道 parent RDD 有沒有被checkpoint過呢? 看 RDD的源碼,我們可以發現:
private var dependencies_ : Seq[Dependency[_]] = nulldependencies_ 用來存放checkpoint后的結果的,如為null,則就判斷沒checkpoint:
final def dependencies: Seq[Dependency[_]] = {checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {if (dependencies_ == null) {dependencies_ = getDependencies}dependencies_}}總結
以上是生活随笔為你收集整理的深入理解Spark 2.1 Core (一):RDD的原理与源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark中的容错
- 下一篇: 深入理解Spark 2.1 Core (