RDD的依赖与分区
1?寬依賴和窄依賴
RDD從具體的依賴的角度講,有窄依賴和寬依賴2種情況。
窄依賴:指每個父RDD的一個Partition最多被子RDD的一個Partition所使用,例如map、filter等都會產生窄依賴;
寬依賴:指一個父RDD的Partition會被多個子RDD的一個Partition所使用,如groupByKey,reduceByKey等操作都會產生寬依賴。
總結:如果父RDD的一個Partition被一個子RDD的Partition使用就是窄依賴,否則就是寬依賴。如果子RDD的Partition對父RDD的Partition依賴的數量不會隨著RDD數據規模的改變而改變就是窄依賴,否則就是寬依賴。
特別說明:對join操作有兩種情況,如果說join操作的時候每個partition僅僅和已知的Partition進行join,這次是join操作就是窄依賴;其它情況(input?not?co-partitioned?會產生shuffle操作,而co-partitioned是哪幾個固定的Partition進行join)的join操作就是寬依賴;
因為是確定的partition數量的依賴關系,所有就是窄依賴,得出一個推論,窄依賴不僅包含一對一的窄依賴,還包含一對固定個數的窄依賴(也就是說對父RDD的依賴的Partition的數量不會隨著RDD數據規模的改變而改變)
2?RDD根據依賴關系構成Stage
RDD基于不同的依賴關系構成了Stage,我們到底能不能把所有的RDD放在一個任務中運行:
假設一:Spark一開始是將所有相關聯的RDD構成一個Stage,上圖中的ABCDEFG?這些RDD都放在一個Stage中,groupByKey,join等操作,其中的嚴重問題是,需要挨個執行,產生了大量的中間數據(中間數據需要被存儲起來,下一步才會執行,導致內存無法釋放)。
從執行邏輯圖的角度來看在每個RDD中不同的Partition是獨立的,這個是數據分片的一個基本特征,也就是在RDD內部每個Partition數據彼此之間不會干擾。假設G是最后一個RDD,為最后一個RDD每個Partition分配一個具體的任務。最后一個RDD有3個Partition,為每個Partition分配一個task。這個時候第一個數據分片來自B和F,B的數據來自A的3個分片,這樣做的話有很大的問題:①耗性能,重復計算:task太大,而且遇到shuffle級別操作的時候就必須計算依賴的RDD的所有Partition,而且都發送在一個task中計算。而且第2,3個Partition還要重復計算②存儲浪費;從后往前的依賴關系,看哪些RDD進行cache,如果從G的角度看,3個分片各自算各自的,這也是數據存儲的浪費。
上述假設,核心問題都是在遇到shuffle依賴(寬依賴)的時候,無法進行pipeline。則采取在有shuffle依賴的時候斷開處理,原因是浪費內存、重復計算、任務太大、不方便管理。所以要采用現在從后往前推理,遇到寬依賴就斷開,遇到窄依賴就把當前的RDD加入到該Stage中的方式。
回溯,血統從計算的角度說到底都是pipeline,每個Stage里面的Task數量是由該Stage中最后一個RDD的Partition的數量所決定的。最后一個Stage里面的任務的類型是ResultTask,前面其它所有的Stage里面的任務的類型多是ShuffleMapTask。
我們的計算表面上看是數據在流動,實質上是算子在流動:有兩層含義①集群計算角度:數據不動代碼動②在一個Stage內部算子為何會流動(pipeline)?首先是算子合并也就是,也就是所謂的函數式編程,執行的時候會最終進行函數的展開從而把一個Stage內部的多個算子合并成為一個大算子(其內部包含當前Stage所有算子對數據的計算邏輯);其次是由于Tranformation操作的lazy特征!!!在具體算子交給集群的executor計算之前首先會通過DAGScheduler進行算子的優化(基于數據本地性的pipeline)那就不會產生中間結果。
注意:shuffle依賴一般是k-v的形式,并且k不可以是數組
3?RDD依賴源碼
spark的寬依賴(narrow dependency)和窄依賴(wide dependency)繼承于Dependency。Dependency是個抽象類,只包含一個rdd,是它依賴的parentRDD.
Dependency一個很重要的要求是,子RDD可以為其每個partition根據dependency找到它所對應的父RDD的partition,或者是找到計算的數據來源,所以每個實現的Dependency都要提供根據partitionID獲取parentRDD的partition的方法。
3.1?窄依賴
NarrowDependency中子RDD的每個分區依賴少量(一個或多個)parent RDD分區,即parent RDD的partition至多被子RDD的某個partition使用一次。
窄依賴繼承于Dependency并定義了一個獲取parent rdd的方法,下面看窄依賴的子類: OneToOneDependency和RangeDependency。
OneToOneDependency
OneToOneDependency是一對一依賴關系,子RDD的每個partition依賴單個parentRdd的一個partition。常見的OneToOneDependency有map, filter, join等。源碼如下:
實現了getParents方法:子RDD以及父RDD之間,每個partition是對應(如果子RDD中存在的話)的,所以兩個RDD中的對應的partition應該具有相同的partitionId。
此類的Dependency中parent中的partitionId與childRDD中的partitionId是一對一的關系,也就是partition本身范圍不會改變,一個parition經過transform還是一個partition,雖然內容發生了變化,所以可以在local完成,此類場景通常像mapreduce中只有map的場景。
RangeDependency
rangeDependency是子rdd的每個partition依賴多個父parentRdd的一個partition。常見的RangeDependency有union。
幾個私有變量解釋如下:Rdd:父RDD;inStart:父RDDpartition的起始位置;outStart:UnionRDD的起始位置;length:父RDD partition的數量。父RDD中的partition通常是子RDD中,連續的某塊partition區間的父partition,所以對應關系應該是parentPartitionId = childPartitionId - childStart + parentStart。
重寫了getParents方法:parentRDD在最終的rdd的位置[outStart, outStart + parentRDDLength]
該依賴關系仍然是一一對應,但是parentRDD中的某個區間的partitions對應到childRDD中的某個區間的partitions。典型的操作是union,多個parentRDD合并到一個childRDD,所以將每個parentRDD都對應到childRDD中的一個區間。需要注意的是:union不會把多個partition合并成一個partition,而是的簡單的把多個RDD中的partitions放到一個RDD里面,partition不會發生變化。
它僅僅被org.apache.spark.rdd.UnionRDD使用。UnionRDD是把多個RDD合成一個RDD,這些RDD是被拼接而成,即每個parent RDD的Partition的相對順序不會變,只不過每個parent RDD在UnionRDD中的Partition的起始位置不同
3.2?寬依賴
寬依賴,是在shuffle stage的時候的依賴關系,是劃分Stage的重要標志,依賴首先要求是PariRdd即k,v的形式才能做shuffle。寬依賴只有ShuffleDependency一個實現。每個Shuffle過程會有一個Id,ShuffleDependency可以根據這個ShuffleId去獲得所依賴的partition的數據,所以ShuffleDependency所需要記錄的就是要能夠通過ShuffleId去獲得需要的數據。
ShuffleDependency類有3個泛型參數,K代表鍵類型,V代表值類型,而C則代表Combiner的類型。因為Shuffle過程對鍵值型數據才有意義,所以ShuffleDependency對父RDD的泛型類型有限制,必須是Product2[K,V]或者其子類,Product2在Scala中代表兩個元素的笛卡爾積。
構造方法參數說明:①partitioner:分區器②serializer:閉包序列化器,SparkEnv中已經創建,為JavaSerializer。③keyOrdering:可選的對鍵類型K排序的排序規則。④aggregator:可選的Map端數據聚合邏輯。⑤mapSideCombine:指定是否啟用Map數據預聚合。
還會調用SparkContext.newShuffleId()方法分配一個新的Shuffle ID,以及調用ShuffleManager的registerShuffle方法注冊該Shuffle,返回Shuffle句柄(ShuffleHandle)
?
4、Partitioner分區器
4.1 Partitioner類
Partitioner是一個抽象類,用于處理key-value類型的RDD,按照key進行元素的劃分
只有兩個方法:numPartitions獲取分區個數和getPartition(key: Any)根據Key值得到分區ID
在Partitioner的伴生對象中有defaultPartitioner方法,HashPartitioner 是Spark默認的分區器,除非RDD已經指定了一個分區器;對于分區數量,如果設置了配置項spark.default.parallelism,那么使用該配置,否則使用上游分區的最大數目
4.2?HashPartitioner
根據上圖源碼可知:傳入的參數partitions決定總的分區數;重寫的numPartitions方法也只是簡單返回該值;重寫的getPartition實際上是調用了Utils工具類的nonNegativeMod方法,將以key的hashcode和numPartitions作為參數
nonNegativeMod方法將對key的hashCode和numPartitions進行取模運算,得到key對應的分區索引。使用哈希和取模的方式,可以方便地計算出下游RDD的各個分區將具體處理哪些key。由于上游RDD所處理的key的哈希值在取模后很可能產生數據傾斜,所以HashPartitioner并不是一個均衡的分區計算器
reduceByKey,aggregateByKey,join內部默認都是使用HashPartitioner
4.3?RangePartitioner
HashPartitioner的實現原理可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據,這是不希望的。而RangePartitioner分區則盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,也就是說一個分區中的元素肯定都是比另一個分區內的元素小或者大;但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內。
RangePartitioner分區執行原理概述:
1.計算總體的數據抽樣大小sampleSize,計算規則是:至少每個分區抽取20個數據或者最多1e6的樣本的數據量。
2.根據sampleSize和分區數量計算每個分區的數據抽樣樣本數量最大值sampleSizePrePartition。
3.根據以上兩個值進行水塘抽樣,返回RDD的總數據量,分區中總元素的個數和每個分區的采樣數據。
4.計算出數據量較大的分區通過RDD.sample進行重新抽樣。
5.通過抽樣數組 candidates: ArrayBuffer[(K, wiegth)]計算出分區邊界的數組BoundsArray
6.在取數據時,如果分區數小于128則直接獲取,如果大于128則通過二分法,獲取當前Key屬于那個區間,返回對應的BoundsArray下標即為partitionsID。
RangePartitioner分區器的主要作用就是:將一定范圍內的數映射到某一個分區內,所以它的實現中分界的方法rangeBounds尤為重要
private?var?rangeBounds:?Array[K]?獲取對應分區的邊界。每個Range內的數據進入一個分區。首先當Partitioner大于1,那么給定總的數據抽樣大小,最多1M的數據量(10^6),最少20倍的RDD分區數量,也就是每個RDD分區至少抽取20條數據,sampleSize是初步的負載均衡;sampleSizePerPartition是由于依賴的父RDD數據時不均勻的,有些Partition數據量會很大,有些Partition數據量會很小。其中乘3的目的是保證數據量特別小的分區能夠抽取到足夠的數據,同時保證數據量非常大的分區能夠進行二次抽樣。
然后確定各partition對應的邊界,即rangeBounds。首先,利用 RangePartitioner伴生對象的sketch()方法對輸入的RDD的每一個 partition進行抽樣,抽樣方法采取的是水塘抽樣(Reservoir Sampling),可以在不知道總size的情況下進行抽樣,特別適用于數據在內存存不下的情況。
因為分區只需要對key進行操作,所以RangePartitioner.sketch的第一個參數是rdd.map(_._1)。該函數返回值是val (numItems, sketched) ,其中numItems相當于記錄rdd元素的總數;而sketched的類型是Array[(Int, Long, Array[K])],記錄的是分區的編號、該分區中總元素的個數以及從父RDD中每個分區采樣的數據。
sketch函數對父RDD中的每個分區進行采樣,并記錄下分區的ID和分區中數據總和。reservoirSampleAndCount函數就是典型的水塘抽樣實現,唯一不同的是該算法還記錄下i的值,這個就是該分區中元素的總和。
回到RangePartitioner方法中,如果獲取的數據分布不均勻,則邊界方法rangeBounds會再次抽樣,但是只對抽象數少于要求的partition進行sample,其他抽樣好的不會
最后獲取到每個partition中每個樣本和對應的weight( 類似candidates += ((key, weight))),weight為partition中元素數量與抽樣數量的比值,對于重新抽樣的,則為1。
最后通過RangePartitioner伴生對象的determineBounds()方法進行邊界確定,獲得邊界值組成的數組Array[K]; ?Array[K]被賦值給rangeBounds,即各partition對應的邊界
determineBounds主要對 candidates先按key進行排序,然后獲取總抽樣元素除以partition大小即為每個partition理論的大小,即代碼中的step。
然后再對排序好的ordered進行遍歷,當所代表的權重大于step的整數倍時,返回此時的key,作為劃分條件。然后依次類推,獲得每個partition的邊界key。
總結:RangePartitioner是采取抽樣的策略,每個partition理論的是抽取20個元素,實際采用水塘抽樣(Reservoir Sampling)時為了避免抽樣少于期望,會乘以3.然后再用determineBounds對抽樣數據進行排序,weight是每個key所代表的抽樣數量,再按weight確定每個partition接近理論的邊界,并進行返回,即為partitionid(getPartition返回值)。
getPartition查找某個元素應該所屬的partitionid時,如果partition數量過大,會采取二分查找。
4.4?水塘抽樣算法(Reservoir Sampling)
水塘抽樣是一系列的隨機算法,其目的在于從包含n個項目的集合S中選取k個樣本,其中n為一很大或未知的數量,尤其適用于不能把所有n個項目都存放到主內存的情況。
首先假設要從一個不知道行數的文本中等概率抽取一行,要如何做的?定義取出的行號為num,第一次以第一行作為取出行 num,而后第二次以二分之一概率決定是否用第二行替換 num,第三次以三分之一的概率決定是否以第三行替換 num……,以此類推。得出結論,在取第n個數據的時候,我們生成一個0到1的隨機數p,如果p小于1/n,保留第n個數。大于1/n,繼續保留前面的數。直到數據流結束,返回此數,算法結束。
將上面的條件變為,k為任意整數的情況,即要求最終返回的元素有k個,這就是水塘抽樣(Reservoir Sampling)問題。要求是:取到第n個元素時,前n個元素被留下的幾率相等,即k/n。
算法同上面思路類似,將1/n換乘k/n即可。在取第n個數據的時候,我們生成一個0到1的隨機數p,如果p小于k/n,替換池中任意一個為第n個數。大于k/n,繼續保留前面的數。直到數據流結束,返回此k個數。但是為了保證計算機計算分數額準確性,一般是生成一個0到n的隨機數,跟k相比,道理是一樣的。
用偽代碼表示如下所示:
| 從S中抽取首k項放入「水塘」中 對于每一個S[j]項(j ≥ k): ???隨機產生一個范圍0到j的整數r ???若 r < k 則把水塘中的第r項換成S[j]項 |
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
?
總結
- 上一篇: android 打开移动开关,教你一个让
- 下一篇: 协议数据分析