RDD(弹性分布式数据集)
1、什么是RDD
RDD(分布式彈性數據集)是對分布式計算的抽象,代表要處理的數據,一個數據集,RDD是只讀分區的集合。數據被分片,分成若干個數據分片,存儲到不同的節點中,可以被并行的操作,所以叫分布式數據集。計算時優先考慮放于內存中,如果放不下把一部分放在磁盤上保存。
RDD(分布式彈性數據集)是整個Spark抽象的基石,是基于工作集的應用抽象。Spark的各個子框架,Spark SQL、Spark Streaming、SparkR、GraphX、ML等,其底層封裝的都是RDD。(也就是說①RDD提供了通用的抽象;②開發者可用根據自己所在的領域進行建模,開發出相應的子框架)。
RDD本身會有一系列的數據分片,RDD在邏輯上抽象的代表了底層的一個輸入文件,可能是一個文件夾,但是實際上是按分區partition分為多個分區,分區會放在Spark集群中不同的機器節點上,假設有1億條數據,可能每臺機器上放10萬條,需要1000臺機器,而且這1000臺機器上的10萬條數據是按照partition為單位去管理的。所謂partition就是特定規模的數據大小,就是數據集合。Spark中一切操作皆RDD。
?
2、工作集與數據集
基于工作集和基于數據集都提供一些特征如位置感知(具體數據在哪里,只不過是不同的實現),容錯,負載均衡。
(1)基于數據集的工作方式:從物理存儲加載數據,然后操作數據,然后寫入物理存儲設備。如Hadoop的MapReduce是基于數據集的。基于數據集的有幾種場景不太適用:①不適合大量的迭代,如機器學習,算法比較復雜的時候②不適合交互式查詢,每次的查詢都需要從磁盤讀取數據,然后再查詢寫會數據結果,每一次都這樣。(重點是基于數據集的方式不能改復用曾的結果或計算中間結果)
(2)基于工作集的工作方式,具有基于數據集的工作方式的優點即自動容錯、位置感知性調度和可伸縮性,同時夠對基于工作集的計算任務也具有良好的描述能力,即支持中間結果的復用場景。
3、RDD的彈性表現在哪幾個方面
Spark的RDD是基于工作集的,不僅具具有基于數據集的特點,而且RDD本身還有其特點:Resilient(彈性).
(1)自動進行內存和磁盤的數據存儲的切換:RDD代表一系列數據分片在不同的節點中存儲,默認優先考慮在內存中,如果放不下把一部分放在磁盤上保存,而這一切對用戶來說是透明的,不用關心RDD的partition放在哪里,只要針對RDD計算處理就行了。所以說RDD本身會自動的進行磁盤和內存的切換;
(2)自動Lineage血統的高效容錯:在運行階段,會有一系列的RDD,以用于容錯恢復,假設一個計算鏈條有900個步驟,假設第888步出錯,由于有血統關系,可以從第887個步驟恢復,不需要從第一個步驟開始計算,這極大的提升了錯誤恢復的速度;
(3)task失敗會自動進行特定次數的重試,默認4次:假設900個計算步驟的任務作為一個task,進行容錯,恢復的時候從第800個步驟開始恢復,恢復好幾次都沒有成功這個task就失敗了,調度器底層會自動進行容錯。
(4)Stage(一個計算階段)如果失敗,會自動進行特定次數的重試,只計算失敗的數據分片,默認3次:就是task底層嘗試好幾次都失敗,這個時候整個階段就會失敗,整個階段會有很多并行的數據分片,他們計算邏輯一樣只是處理的數據分片不一樣。是再次提交Stage的時候如果這個Stage中假設有100個數據分片只是3,5個失敗,再次提交Stage的時候會看看其他成功的任務有沒有輸出,有的話就不會第二次提交的時候把這100個任務再次提交,只會提交失敗的那幾個。
(5)checkpoint和persist:checkpoint(每次對RDD的操作都會產生新的RDD,除了action觸發job以外,有時處理鏈條比較長,計算比較笨重時,需要考慮將數據落地);persist:(內存、磁盤的復用[效率和容錯的延伸])
(6)數據調度彈性:DAG? TASK 和資源管理無關
(7)數據分片的高度彈性:repartition和coalesce①在數據計算時,會產生很多分片,這時如果partition非常小,每個分片每次都消耗一個線程進行處理,會降低處理效率,但是如果把把幾個partition合并成一個比較大的partition,會提高效率②如果每個partition的block比較大可能內存不足,這時會考慮將其變成更小的分片,這時Spark會出現更多的處理批次,避險出現OOM
注意:repartition內部調用的是coalesce,傳進的shuffle為true。coalesce默認shuffle為fasle。所以數據分片由多變為少的用coalesce不進行shuffle,如果數據分片由少到多不經過shuffle是不行的,使用repartition。
4、RDD的lazy特性
由于RDD是只讀分區的集合,那么每次的操作都會改變數據,會產生中間結果,這時就采用lazy的級別,對數據不進行計算。
RDD的核心之一就是他的lazy級別,因為不算,開始的時候只對數據處理做標記,包括textfile根本不從磁盤讀數據,faltMap根本就沒開始計算,他只不過是產生了一個操作的標記而已。
上圖為flatMap的源碼,flatMap產生了一個new MapPartitionRDD,但是看它的構造,第一個參數是this,this是當前對象,指父RDD,即生成的RDD所依賴的RDD。這樣,Spark的RDD是只讀的,且是lazy級別的,每次構建的新的RDD時,都是將其父RDD作為第一個參數傳遞進來生成新的RDD,這樣就構成了一個鏈條結構
5、常規的容錯方式
常規的容錯方式:數據檢查點和記錄數據更新的方式。
5.1 數據檢查點
分布式的計算數據檢查點的基本工作方式就是:通過數據中心的網絡連接,不同的機器每次操作的時候都要負責整個數據集,就相當于每次都有一個拷貝,這個是需要網絡的,復制到其他機器上,而網絡帶寬就是分布式的瓶頸。每次拷貝對存儲資源也是非常大的消耗。
5.2 記錄數據更新
記錄數據更新的工作方式:每次數據變化我們就記錄一下,這個方式不需要重新拷貝一份數據,但是這種方式復雜,而且更新的話就變成數據可更新,那很多操作全局數據容易失控,原子性對分布式來說太可怕了第一復雜第二耗性能。
因為RDD是從后往前的鏈條依賴關系,所以容錯的開銷非常低
5.3 Spark的RDD的容錯方式
Spark就是記錄數據更新的方式,原因又2點:①、RDD是不可變的+lazy(因為不可變不存在全局修改的問題,控制難度就極大的下降,在這基礎上有計算鏈條,假設901個步驟錯了,從900個步驟開始恢復(這個前提是要持久化persit/checkpoint或者上一個Stage結束))。②RDD是粗粒度的操作,為了效率,每次操作的時候作用所有數據集合(所謂的粗粒度就是每次操作都作用于全部的數據集)。如果更新力度太細記錄成本就會高效率就低了。對RDD的所有寫或者修改都是粗粒度的,通過元數據記錄數據更新是寫操作,我們在這邊說RDD是粗粒度的指的是RDD的寫操作是粗粒度的,但是RDD的讀操作即可是粗粒度的也可以是細粒度的(例如通過RDD讀取數據庫可以讀取一條記錄)。RDD的寫操作是粗粒度的限制了他的使用場景,例如說網絡爬蟲就不適合,但是現實世界中,大多數的場景是粗粒度的
5.4 RDD中的幾個核心方法及屬性
(1)partitioner:分區器,類似MapReduce的的Partitioner接口,控制key到哪個reduce
?
(2)compute:compute方法是針對RDD的每個Partition進行計算的
所有的RDD操作返回的都是一個迭代器,這個好處就是假設用spark sql提取出數據后產生新的RDD,機器學習訪問這個RDD不用關心他是不是sparksql,因為是基于iterator,那就可以用hasNext看下有么有下個元素,用next讀取下個元素,這就讓所有框架無縫集成。
compute傳進的第一個參數split是Partition類型的,Partition是RDD并行的劃分單元,其在Spark中的抽象定義十分簡單如下:
它定義了一個index唯一表示這個partition,它更像一個指針指向實體數據,Partition的具體實現有很多,包括HadoopPartition, JdbcPartition, ParallelCollectionPartition等。
(3)getPartition:getPartitions返回的是一系列partitions的集合,即一個Partition類型的數組。是在partitions方法中調用getPartition方法的。
(4)getDependencies:獲取所有依賴關系
(5)getPreferredLocations:輸入參數是Partition類型的split分片,輸出結果是一組優先的節點位置。
5.5?HadoopRDD
(1)getPartition實現
?
首先getJobConf():用來獲取job Configuration,獲取配置方式有clone和非clone方式,clone方式是線程不安全的,,非clone方式可以從cache中獲取,如cache中沒有那就創建一個新的,然后再放到cache中;然后獲得InputFormat實例對象;調用getSplits方法來計算分片,然后把分片HadoopPartition包裝到到array里面返回
(2)compute實現
輸入值是一個Partition,返回是一個Iterator[(K, V)]類型的數,compute方法是通過分片來獲得Iterator接口,以遍歷分片的數據把Partition轉成HadoopPartition
通過InputSplit創建一個RecordReader
重寫Iterator的getNext方法,通過創建的reader調用next方法讀取下一個值
(3)getPreferredLocations
調用InputSplit的getLocations方法獲得所在的位置
6、RDD的生命周期
6.1 創建RDD
Spark程序中創建的第一個RDD代表了Spark應用程序輸入數據的來源。通過Transformation來對RDD進行各種算子的轉換,實現算法。
常見的創建初始RDD的方式①使用程序中的集合,②使用本地文件系統創建RDD,③使用HDFS創建RDD,④基于DB創建RDD,⑤基于NOSQL創建RDD,⑥基于S3創建RDD,⑦基于數據流創建RDD
6.2 構建執行計劃
RDD 在調用Transformation算子和action 算子后構成一個RDD鏈條,即血緣,然后DAGScheduler 會根據 RDD 之間的依賴關系劃分Stage ,最后終封裝成 TaskSetManager 根據不同的調度模型加入不同的調度隊列。
6.3 調度任務執行
由 TaskScheduler和TaskSetManager 對TaskSet進行進一步資源封裝和最佳位置計算,然后進行調度到相應的Executor上去執行。
6.4 結果返回
將最終的執行結果返回給 Driver 或者輸出到指定的位置。
7、RDD的操作類型
RDD本身有3種操作類型Transformation和Action和Controller。
Transformation進行數據狀態的轉換,根據已有的RDD創建一個新的RDD;Action觸發具體的作業,主要是對RDD進行最后取結果的一種操作;Controller(是控制算子,包括cache,persist,checkpoint)對性能,效率還有容錯方面的支持。
Transformation級別的RDD是lazy的,也就是說使用Transformation只是標記對我們的數據進行操作,不會真正的執行,這是算法的描述,當我們遇到Action或者checkpoint的時候他才會真正的操作。通過這種lazy特性,底層就可以對我們spark應用程序優化,因為一直是延遲執行,spark框架可以看見很多步驟,看見步驟越多優化的空間就越大。
8、常用的算子
81 map
map:使用自定義的函數f,對其中的每個元素進行處理,產生U類型的結果,傳入的RDD的元素類型為T類型,生成的RDD元素類型為U類型
withScope{body}?是為了確保運行body代碼塊產生的所有RDDs都在同一個scope里面。首先調用了SparkContext的clean方法,實際上調用了ClosureCleaner的clean方法,這里一再清除閉包中的不能序列化的變量,防止RDD在網絡傳輸過程中反序列化失敗。(scala支持閉包(jvm上的閉包當然也是一個對像),閉包會把它對外的引用(閉包里面引用了閉包外面的對像)保存到自己內部,?這個閉包就可以被單獨使用了,而不用擔心它脫離了當前的作用域;?但是在spark這種分布式環境里,這種作法會帶來問題,如果對外部的引用是不可serializable的,它就不能正確被發送到worker節點上去了;?還有一些引用,可能根本沒有用到,這些沒有使用到的引用是不需要被發到worker上的; ClosureCleaner.clean()就是用來完成這個事的;?ClosureCleaner.clean()通過遞歸遍歷閉包里面的引用,檢查不能serializable的, 去除unused的引用;?這個方法在SparkContext中用得很多,對rpc方法,只要傳入的是閉包,基本都會使用這個方法,它可以降低網絡io,提高executor的內存效率)然后new了一個MapPartitionsRDD,還把清除閉包中的不能序列化的變量的匿名函數f傳進去。MapPartitionsRDD源碼如下
MapPartitionsRDD繼承RDD[U](prev),他的源碼如下。它把RDD復制給了deps,這個OneToOneDependency是一個窄依賴,子RDD直接依賴于父RDD。
MapPartitionsRDD重寫了Partitioner,getPartitions,compute和clearDependencies,發現大量出現firstParent[T]源碼如下,返回第一個父RDD
所以partitioner和它的第一個parent RDD的partitioner保持一致(如果需要保留partitioner的話),它的partitions就是它的firstParent的partitions。它的compute函數只是調用了flatMap實例化它時輸入的函數,compute函數是在父RDD遍歷每一行數據時只是調用了flatMap實例化它時輸入的函數。
看compute實際傳遞的函數和調用它的代碼,iter:Iterator[T]是一個Partition上的元素迭代器,用來遍歷RDD[T]的第pid個partition上的所有元素。?firstParent[T].iterator(split, context)?就是返回parentRDD的對應partition的迭代器iter:Iterator[T]: 如果已經保存了就直接讀取,否則重新計算(可以跳轉看它的實現)。有了這個迭代器iter之后,然后用?iter.flatMap(cleanF)?來產生新的迭代器,返回類型是Iterator[U],這個就是最終返回的RDD: RDD[U]的partition的迭代器。
compute函數作用:在沒有依賴的條件下,根據分片的信息生成遍歷數據的Iterable接口;在有前置依賴的條件下,在父RDD的Iterable接口上給遍歷每個元素的時候再套上一個方法
8.2 flatMap
flatMap:使用自定義的函數f,對其中的每個元素進行處理,將產生的結果合并成一個大的集合。
flatMap和map函數區別主要在于:map調用的是迭代器的map方法,flatMap調用的是迭代器的flatMap方法是針對RDD的每個元素利用函數f生成多個元素,然后把這些結果全部串聯起來
8.3?reduceByKey
reduceByKey這個方法不是在RDD中的,而是在PairRDDFunctions里面,因為在RDD的伴生對象里面已經導入了,RDD內部會發生隱式轉換,轉換為PairRDDFunctions,然后再調用這個方法。
reduceByKey內部調用的是combineByKey
底層是基于combineByKeyWithClassTag的,combineByKey是combineByKeyWithClassTag的簡寫的版本
require方法首先判斷mergeCombiners(定義兩個C類型數據的組合函數)是否定義,沒有則拋異常
然后keyClass.isArray判斷如果key是Array類型,是不支持在map端合并的(mapSideCombine默認為true即進行本地預聚合),也不支持HashPartitioner(要想進行Map段合并和Hash分區,那么Key就必須可以通過比較內容是否相同來確定Key是否相等以及通過內容計算hash值,進而進行合并和分區,然而數組判斷相等和計算hash值并不是根據它里面的內容,而是根據數組在堆棧中的信息來實現的。);
然后?Aggregator創建一個聚合器,用于對數據進行聚合,對參數函數執行clean方法保證是可以被序列化的。Aggregator是核心,聚合全是交給它來完成的
Aggregator的三個泛型,第一個K,這個是你進行combineByKey也就是聚合的條件Key,可以是任意類型。后面的V,C兩個泛型是需要聚合的值的類型,和聚合后的值的類型,兩個類型是可以一樣,也可以不一樣,例如,Spark中用的多的reduceByKey這個方法,若聚合前的值為long,那么聚合后仍為long。再比如groupByKey,若聚合前為String,那么聚合后為Iterable<String>。再看三個自定義方法:①createCombiner:這個方法會在每個分區上都執行的,而且只要在分區里碰到在本分區里沒有處理過的Key,就會執行該方法。執行的結果就是在本分區里得到指定Key的聚合類型C(可以是數組,也可以是一個值,具體還是得看方法的定義了。) ②?mergeValue:這方法也會在每個分區上都執行的,和createCombiner不同,它主要是在分區里碰到在本分區內已經處理過的Key才執行該方法,執行的結果就是將目前碰到的Key的值聚合到已有的聚合類型C中。其實方法1和2放在一起看,就是一個if判斷條件,進來一個Key,就去判斷一下若以前沒出現過就執行方法1,否則執行方法2.?③mergeCombiner:前兩個方法是實現分區內部的相同Key值的數據合并,而這個方法主要用于分區間的相同Key值的數據合并,形成最終的結果。
然后看下他的三個方法:①combineValuesByKey:實現的就是分區內部的數據合并②combineCombinersByKey:主要是實現分區間的數據合并,也就是合并combineValuesByKey的結果③updateMetrics:刷磁盤有關,就是記錄下,當前是否刷了磁盤,刷了多少
回到combineByKeyWithClassTag方法中,?實例化Aggregator后,接著就是判斷,是否需要重新分區(shuffle)。然后self.partitioner == Some(partitioner)判斷分區器是否相同如果分區器相同,self.partitioner是指A這個RDD的partitioner,它指明了A這個RDD中的每個key在哪個partition中。而等號右邊的partitioner,指明了B這個RDD的每個key在哪個partition中。當二者==時,就會用self.mapPartitions生成MapPartitionsRDD, 這和map這種transformation生成的RDD是一樣的,此時reduceByKey不會引發shuffle。
①當self.partitioner == Some(partitioner)時,也就是分區實例是同一個的時候,就不需要分區了,因此只需要對先用的分區進行combineValuesByKey操作就好了,沒有分區間的合并了,也不需要shuffle了。②兩個分區器不一樣,需要對現在分區的零散數據按Key重新分區,目的就是在于將相同的Key匯集到同一個分區上,由于數據分布的不確定性,因此有可能現在的每個分區的數據是由重新分區后的所有分區的部分數據構成的(寬依賴),因此需要shuffle,則構建ShuffledRDD
combineByKey的關鍵在于分區器partitioner,它是針對分區的一個操作,分區器的選擇就決定了執行combineByKey后的結果,如果所給的分區器不能保證相同的Key值被分區到同一個分區,那么最終的合并的結果可能存在多個分區里有相同的Key。Shuffle的目的就是將零散于所有分區的數據按Key分區并集中。
8.4 join
join就是sql中的inner join。join也是PairRDDFunctions中的方法,sparkcore中支持的連接有:笛卡爾積、內連接join,外連接(左leftOuterJoin、右rightOuterJoin、全fullOuterJoin)
不指定分區函數時默認使用HashPartitioner;提供numPartitions參數時,其內部的分區函數是HashPartitioner(numPartitions)
我們發現join的內部其實是調用cogroup。即rdd1.join(rdd2) => rdd1.cogroup(rdd2,partitioner) => flatMapValues(遍歷兩個value的迭代器)。
返回值的是(key,(v1,v2))這種形式的元組
8.5 cogroup
首先先判斷一下如果使用HashPartitioner分區,并且key是數組的話拋異常。然后構造一個CoGroupedRDD其鍵值對中的value要求是Iterable[V]和Iterable[W]類型。
重寫的RDD的getDependencies: 如果rdd和給定分區函數相同就是窄依賴,否則就是寬依賴
這里返回一個帶有Partitioner.numPartitions個分區類型為CoGroupPartition的數組
總結:cogroup算子,根據rdd1,rdd2創建了一個CoGroupedRDD;分析了CoGroupedRDD的依賴關系,看到如果兩個rdd的分區函數相同,那么生成的rdd分區數不變,它們之間是一對一依賴,也就是窄依賴,從而可以減少依次shuffle;CoGroupedRDD的分區函數就是將兩個rdd的相同分區索引的分區合成一個新的分區,并且通過NarrowCoGroupSplitDep這個類實現了序列化。
join返回的類型是 RDD[(K, (V, W))],CoGroup返回的是RDD[(K, (Iterable[V], Iterable[W]))]
8.6 reduce
reduce函數:對RDD中的所有元素進行聚合操作,將最終的結果返回給Driver。同時元素之間還要符合結合律和交換律[原因:在進行reduce的操作時,并不知道那個數據先過來,所有要符合交換律,在交換律的基礎上,滿足結合律才能進行reduce]
8.7?collect
collect方法是匯總所有節點中的計算結果到Driver端,collect后得到的是數組,Array中就是一個元素,只不過這個元素是一個Tuple,Array即為元組數組。返回的是一個數組,包含了所有程序運行結果的數組,其中使用concat(results:?_*)方法將各個節點的數據加入到數組中。
8.8?saveAsTextFile
該函數將數據輸出,以文本文件的形式寫入本地文件系統或者HDFS等。Spark將對每個元素調用toString方法,將數據元素轉換為文本文件中的一行記錄。若將文件保存到本地文件系統,那么只會保存在executor所在機器的本地目錄
總結
以上是生活随笔為你收集整理的RDD(弹性分布式数据集)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql 混合模式,SQLServer
- 下一篇: centos6.5装mysql好难_Ce