编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...
什么是RDD
彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset,RDD)是 Spark 中的核心概念。
RDD在抽象上來講是一種抽象的分布式的數(shù)據(jù)集。它是被分區(qū)的,每個分區(qū)分布在集群中的不同的節(jié)點上。從而可以讓數(shù)據(jù)進行并行的計算它主要特點就是彈性和容錯性。彈性:RDD的數(shù)據(jù)默認情況下存放在內存中的,但是在內存資源不足時,Spark會自動將RDD數(shù)據(jù)寫入磁盤容錯性:RDD可以自動從節(jié)點失敗中恢復過來。即如果某個節(jié)點上的RDD partition,因為節(jié)點故障,導致數(shù)據(jù)丟了,那么RDD會自動通過自己的數(shù)據(jù)來源重新計算該partition。RDD來源:通常是Hadoop的HDFS,Hive 表等等;也可以通過Linux的本地文件;應用程序中的數(shù)組;jdbc(mysql 等);也可以是kafka、flume數(shù)據(jù)采集工具、中間件等轉化而來的RDD
RDD的創(chuàng)建
進行Spark核心編程時,首先要做的第一件事,就是創(chuàng)建一個初始的RDD。該RDD中,通常就代表和包含了Spark應用程序的輸入源數(shù)據(jù)。然后在創(chuàng)建了初始的RDD之后,才可以通過Spark Core提供的transformation算子,對該RDD進行轉換,來獲取其他的RDD。Spark Core提供了三種創(chuàng)建RDD的方式,包括:使用程序中的集合創(chuàng)建RDD;使用本地文件創(chuàng)建RDD;使用HDFS文件創(chuàng)建RDD。
1、使用程序中的集合創(chuàng)建RDD,主要用于進行測試,在實際部署到集群運行之前,自己使用集合構造測試數(shù)據(jù),來測試后面的spark應用流程。
2、使用本地文件創(chuàng)建RDD,主要用于臨時性地處理一些存儲了大量數(shù)據(jù)的文件。
3、使用HDFS文件創(chuàng)建RDD,應該是最常用的生產環(huán)境處理方式,主要可以針對HDFS上存儲的大數(shù)據(jù),進行離線批處理操作。
RDD特性
Spark RDD五大特性
1.分區(qū)列表
Spark RDD是被分區(qū)的,每一個分區(qū)都會被一個計算任務(Task)處理,分區(qū)數(shù)決定了并行計算的數(shù)量,RDD的并行度默認從父RDD傳給子RDD。默認情況下,一個HDFS上的數(shù)據(jù)分片就是一個 partiton,RDD分片數(shù)決定了并行計算的力度,可以在創(chuàng)建RDD時指定RDD分片個數(shù)(分區(qū)。
如果不指定分區(qū)數(shù)量,當RDD從集合創(chuàng)建時,則默認分區(qū)數(shù)量為該程序所分配到的資源的CPU核數(shù)(每個Core可以承載2~4個 partition),如果是從HDFS文件創(chuàng)建,默認為文件的 Block數(shù)。
2.每一個分區(qū)(分片)都有一個計算函數(shù)
每個分區(qū)都會有計算函數(shù), Spark的RDD的計算函數(shù)是以分片為基本單位的,每個RDD都會實現(xiàn) compute函數(shù),對具體的分片進行計算,RDD中的分片是并行的,所以是分布式并行計算。
有一點非常重要,就是由于RDD有前后依賴關系,遇到寬依賴關系,如reduce By Key等這些操作時劃分成 Stage, Stage內部的操作都是通過 Pipeline進行的,在具體處理數(shù)據(jù)時它會通過 Blockmanager來獲取相關的數(shù)據(jù),因為具體的 split要從外界讀數(shù)據(jù),也要把具體的計算結果寫入外界,所以用了一個管理器,具體的 split都會映射成 BlockManager的Block,而具體的splt會被函數(shù)處理,函數(shù)處理的具體形式是以任務的形式進行的。
3.依賴于其他RDD的列表( a list of dependencies on other RDDS)
由于RDD每次轉換都會生成新的RDD,所以RDD會形成類似流水線一樣的前后依賴關系,當然寬依賴就不類似于流水線了,寬依賴后面的RDD具體的數(shù)據(jù)分片會依賴前面所有的RDD的所有數(shù)據(jù)分片,這個時候數(shù)據(jù)分片就不進行內存中的 Pipeline,一般都是跨機器的,因為有前后的依賴關系,所以當有分區(qū)的數(shù)據(jù)丟失時, Spark會通過依賴關系進行重新計算,從而計算出丟失的數(shù)據(jù),而不是對RDD所有的分區(qū)進行重新計算。
RDD之間的依賴有兩種:窄依賴( Narrow Dependency)和寬依賴( Wide Dependency)。
RDD是 Spark的核心數(shù)據(jù)結構,通過RDD的依賴關系形成調度關系。通過對RDD的操作形成整個 Spark程序。
4.key- value數(shù)據(jù)類型的RDD分區(qū)器( a Partitioner for key- alue RDDS)、控制分區(qū)策略和分區(qū)數(shù)
每個key- value形式的RDD都有 Partitioner屬性,它決定了RDD如何分區(qū)。
當然,Partiton的個數(shù)還決定了每個Stage的Task個數(shù)。
RDD的分片函數(shù)可以分區(qū)( Partitioner),可傳入相關的參數(shù),如 Hash Partitioner和 Range Partitioner,它本身針對key- value的形式,如果不是key-value的形式它就不會有具體的 Partitioner, Partitioner本身決定了下一步會產生多少并行的分片,同時它本身也決定了當前并行( Parallelize) Shuffle輸出的并行數(shù)據(jù),從而使Spark具有能夠控制數(shù)據(jù)在不同結點上分區(qū)的特性,用戶可以自定義分區(qū)策略,如Hash分區(qū)等。
spark提供了 partition By運算符,能通過集群對RDD進行數(shù)據(jù)再分配來創(chuàng)建一個新的RDD。
5.每個分區(qū)都有一個優(yōu)先位置列表( a list of preferred locations to compute each split on)
優(yōu)先位置列表會存儲每個 Partition的優(yōu)先位置,對于一個HDFS文件來說,就是每個Partition塊的位置。
觀察運行 Spark集群的控制臺就會發(fā)現(xiàn),Spark在具體計算、具體分片以前,它已經(jīng)清楚地知道任務發(fā)生在哪個結點上,也就是說任務本身是計算層面的、代碼層面的,代碼發(fā)生運算之前它就已經(jīng)知道它要運算的數(shù)據(jù)在什么地方,有具體結點的信息。
這就符合大數(shù)據(jù)中數(shù)據(jù)不動代碼動的原則,即“移動數(shù)據(jù)不如移動計算”的理念。數(shù)據(jù)不動代碼動的最高境界是數(shù)據(jù)就在當前結點的內存中。這時候有可能是 Memory級別或 Tachyon級別的,Spark本身在進行任務調度時會盡可能地將任務分配到處理數(shù)據(jù)的數(shù)據(jù)塊所在的具體位置。
Scala源代碼函數(shù)get Parferredlocations可知,每次計算都符合完美的數(shù)據(jù)本地性。可在RDD類源代碼文件中找到4個方法和1個屬性,對應上述所闡述的RDD的五大特性,源代碼剪輯如下
RDD有哪些操作
reduce()
接受一個函數(shù),作用在RDD兩個類型相同的元素上,返回新元素。
可以實現(xiàn),RDD中元素的累加,計數(shù),和其他類型的聚集操作。
scala> val rdd = sc.parallelize(Array(1,2,3,3))rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[16] at parallelize at :24scala> rdd.collect()res9: Array[Int] = Array(1, 2, 3, 3)scala> rdd.reduce((x,y)=>x+y)res10: Int = 9collect()
遍歷整個RDD,向driver program返回RDD的內容
需要單機內存能夠容納下(因為數(shù)據(jù)要拷貝到driver,測試使用)
大數(shù)據(jù)的時候,使用saveAsTextFile() action等。
take(n)
返回RDD的n個元素(同時嘗試訪問最少的partitions)
返回結果是無序的,測試使用。
scala> rdd.take(2)res11: Array[Int] = Array(1, 2)scala> rdd.take(3)res12: Array[Int] = Array(1, 2, 3)top()
排序(根據(jù)RDD中數(shù)據(jù)的比較器)
scala> rdd.top(1)res13: Array[Int] = Array(3)scala> rdd.top(2)res14: Array[Int] = Array(3, 3)scala> rdd.top(3)res15: Array[Int] = Array(3, 3, 2)foreach()
計算RDD中的每個元素,但不返回到本地。
可以配合println()友好的打印出數(shù)據(jù)。
map
map的輸入變換函數(shù)應用于RDD中所有元素,而mapPartitions應用于所有分區(qū)。區(qū)別于mapPartitions主要在于調用粒度不同。如parallelize(1 to 10, 3),map函數(shù)執(zhí)行10次,而mapPartitions函數(shù)執(zhí)行3次。
每個方框表示一個 RDD 分區(qū),左側的分區(qū)經(jīng)過用戶自定義函數(shù) f:T->U 映射為右側的新 RDD 分區(qū)。但是,實際只有等到 Action算子觸發(fā)后,這個 f 函數(shù)才會和其他函數(shù)在一個stage 中對數(shù)據(jù)進行運算。在圖 1 中的第一個分區(qū),數(shù)據(jù)記錄 V1 輸入 f,通過 f 轉換輸出為轉換后的分區(qū)中的數(shù)據(jù)記錄 V’1。
filter(function)
過濾操作,滿足filter內function函數(shù)為true的RDD內所有元素組成一個新的數(shù)據(jù)集。如:filter(a == 1)。
flatMap(function)
map是對RDD中元素逐一進行函數(shù)操作映射為另外一個RDD,而flatMap操作是將函數(shù)應用于RDD之中的每一個元素,將返回的迭代器的所有內容構成新的RDD。而flatMap操作是將函數(shù)應用于RDD中每一個元素,將返回的迭代器的所有內容構成RDD。
flatMap與map區(qū)別在于map為“映射”,而flatMap“先映射,后扁平化”,map對每一次(func)都產生一個元素,返回一個對象,而flatMap多一步就是將所有對象合并為一個對象。
將原來 RDD 中的每個元素通過函數(shù) f 轉換為新的元素,并將生成的 RDD 的每個集合中的元素合并為一個集合,內部創(chuàng)建 FlatMappedRDD(this,sc.clean(f))。
表 示 RDD 的 一 個 分 區(qū) ,進 行 flatMap函 數(shù) 操 作, flatMap 中 傳 入 的 函 數(shù) 為 f:T->U, T和 U 可以是任意的數(shù)據(jù)類型。將分區(qū)中的數(shù)據(jù)通過用戶自定義函數(shù) f 轉換為新的數(shù)據(jù)。外部大方框可以認為是一個 RDD 分區(qū),小方框代表一個集合。 V1、 V2、 V3 在一個集合作為 RDD 的一個數(shù)據(jù)項,可能存儲為數(shù)組或其他容器,轉換為V’1、 V’2、 V’3 后,將原來的數(shù)組或容器結合拆散,拆散的數(shù)據(jù)形成為 RDD 中的數(shù)據(jù)項。
mapPartitions(function)
區(qū)于foreachPartition(屬于Action,且無返回值),而mapPartitions可獲取返回值。與map的區(qū)別前面已經(jīng)提到過了,但由于單獨運行于RDD的每個分區(qū)上(block),所以在一個類型為T的RDD上運行時,(function)必須是Iterator => Iterator類型的方法(入?yún)?。
mapPartitionsWithIndex(function)
與mapPartitions類似,但需要提供一個表示分區(qū)索引值的整型值作為參數(shù),因此function必須是(int, Iterator)=>Iterator類型的。
sample(withReplacement, fraction, seed)
采樣操作,用于從樣本中取出部分數(shù)據(jù)。withReplacement是否放回,fraction采樣比例,seed用于指定的隨機數(shù)生成器的種子。(是否放回抽樣分true和false,fraction取樣比例為(0, 1]。seed種子為整型實數(shù)。)
union(otherDataSet)
對于源數(shù)據(jù)集和其他數(shù)據(jù)集求并集,不去重。
intersection(otherDataSet)
對于源數(shù)據(jù)集和其他數(shù)據(jù)集求交集,并去重,且無序返回。
distinct([numTasks])
返回一個在源數(shù)據(jù)集去重之后的新數(shù)據(jù)集,即去重,并局部無序而整體有序返回。(詳細介紹見
groupByKey([numTasks])
在一個PairRDD或(k,v)RDD上調用,返回一個(k,Iterable)。主要作用是將相同的所有的鍵值對分組到一個集合序列當中,其順序是不確定的。groupByKey是把所有的鍵值對集合都加載到內存中存儲計算,若一個鍵對應值太多,則易導致內存溢出。
在此,用之前求并集的union方法,將pair1,pair2變?yōu)橛邢嗤I值的pair3,而后進行groupByKey
reduceByKey(function,[numTasks])
與groupByKey類似,卻有不同。如(a,1), (a,2), (b,1), (b,2)。groupByKey產生中間結果為( (a,1), (a,2) ), ( (b,1), (b,2) )。而reduceByKey為(a,3), (b,3)。
reduceByKey主要作用是聚合,groupByKey主要作用是分組。(function對于key值來進行聚合)
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
類似reduceByKey,對pairRDD中想用的key值進行聚合操作,使用初始值(seqOp中使用,而combOpenCL中未使用)對應返回值為pairRDD,而區(qū)于aggregate(返回值為非RDD)
sortByKey([ascending], [numTasks])
同樣是基于pairRDD的,根據(jù)key值來進行排序。ascending升序,默認為true,即升序;numTasks
join(otherDataSet,[numTasks])
加入一個RDD,在一個(k,v)和(k,w)類型的dataSet上調用,返回一個(k,(v,w))的pair dataSet。
cartesian(otherDataSet)
求笛卡爾乘積。該操作不會執(zhí)行shuffle操作。
pipe(command,[envVars])
通過一個shell命令來對RDD各分區(qū)進行“管道化”。通過pipe變換將一些shell命令用于Spark中生成的新RDD
coalesce(numPartitions)
重新分區(qū),減少RDD中分區(qū)的數(shù)量到numPartitions。
repartition(numPartitions)
repartition是coalesce接口中shuffle為true的簡易實現(xiàn),即Reshuffle RDD并隨機分區(qū),使各分區(qū)數(shù)據(jù)量盡可能平衡。若分區(qū)之后分區(qū)數(shù)遠大于原分區(qū)數(shù),則需要shuffle。
repartitionAndSortWithinPartitions(partitioner)
該方法根據(jù)partitioner對RDD進行分區(qū),并且在每個結果分區(qū)中按key進行排序。
Action:
first()
返回數(shù)據(jù)集的第一個元素(類似于take(1))
takeSample(withReplacement, num, [seed])
對于一個數(shù)據(jù)集進行隨機抽樣,返回一個包含num個隨機抽樣元素的數(shù)組,withReplacement表示是否有放回抽樣,參數(shù)seed指定生成隨機數(shù)的種子。
該方法僅在預期結果數(shù)組很小的情況下使用,因為所有數(shù)據(jù)都被加載到driver端的內存中。
take(n)
返回一個包含數(shù)據(jù)集前n個元素的數(shù)組(從0下標到n-1下標的元素),不排序。
takeOrdered(n,[ordering])
返回RDD中前n個元素,并按默認順序排序(升序)或者按自定義比較器順序排序。
saveAsTextFile(path)
將dataSet中元素以文本文件的形式寫入本地文件系統(tǒng)或者HDFS等。Spark將對每個元素調用toString方法,將數(shù)據(jù)元素轉換為文本文件中的一行記錄。
若將文件保存到本地文件系統(tǒng),那么只會保存在executor所在機器的本地目錄。
saveAsSequenceFile(path)(Java and Scala)
將dataSet中元素以Hadoop SequenceFile的形式寫入本地文件系統(tǒng)或者HDFS等。(對pairRDD操作)
saveAsObjectFile(path)(Java and Scala)
將數(shù)據(jù)集中元素以ObjectFile形式寫入本地文件系統(tǒng)或者HDFS等。
countByKey()
用于統(tǒng)計RDD[K,V]中每個K的數(shù)量,返回具有每個key的計數(shù)的(k,int)pairs的hashMap。
寬依賴和窄依賴
窄依賴(narrow dependencies)
子RDD的每個分區(qū)依賴于常數(shù)個父分區(qū)(與數(shù)據(jù)規(guī)模無關)輸入輸出一對一的算子,且結果RDD的分區(qū)結構不變。主要是map/flatmap輸入輸出一對一的算子,但結果RDD的分區(qū)結構發(fā)生了變化,如union/coalesce從輸入中選擇部分元素的算子,如filter、distinct、substract、sample寬依賴(wide dependencies)
子RDD的每個分區(qū)依賴于所有的父RDD分區(qū)對單個RDD基于key進行重組和reduce,如groupByKey,reduceByKey對兩個RDD基于key進行join和重組,如join經(jīng)過大量shuffle生成的RDD,建議進行緩存。這樣避免失敗后重新計算帶來的開銷。總結
以上是生活随笔為你收集整理的编程实现将rdd转换为dataframe:源文件内容如下(_大数据 什么是RDD?可以干什么?为什么要有RDD?...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pythonos基础_python基础之
- 下一篇: python grid用法_Python