[scala-spark]10. RDD转换操作
RDD提供了一組非常豐富的操作來操作數據,如:map,flatMap,filter等轉換操作,以及SaveAsTextFile,conutByKey等行動操作。這里僅僅綜述了轉換操作。
- map
map是對RDD中的每一個元素都執行一個指定的函數來產生一個新的RDD,RDD之間的元素是一對一的關系。
val rdd1: RDD[Int] = sc.parallelize(1 to 9, 3) val rdd2: RDD[Int] = rdd1.map(_ * 2) printResult("map", rdd2) // 結果:map >> List(2, 4, 6, 8, 10, 12, 14, 16, 18)- flapMap
flatMap類似于map,但是每一個輸入元素,會被映射為0到多個出輸出元素(即func函數的返回值是一個Seq,而不是單一元素)的新的RDD,RDD之間的元素是一對多關系。
val rdd3: RDD[Int] = rdd2.filter(x => x > 10).flatMap(x => x to 21) printResult("flatMap", rdd3) // 結果:flatMap >> List(12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 14, 15, 16, 17, 18, 19, 20, 21, 16, 17, 18, 19, 20, 21, 18, 19, 20, 21)- filter
filter是對RDD元素進行過濾,返回一個新的數據集,有經過func函數后返回值為true的元素組成。
val rdd4 = rdd2.filter(x => x > 11) printResult("filter", rdd4) // 結果:filter >> List(12, 14, 16, 18)- mapPartitions
mapPartitions是map的一個變種。map的輸入函數應用于RDD中的每一個元素,而mapPartitions的輸入函數應用于每一個分區的數據,也就是把每一個分區中的內容作為整體來處理。
函數定義:
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] val rdd5: RDD[(Int, Int)] = rdd1.repartition(2).mapPartitions((iter: Iterator[Int]) => { val lst: ListBuffer[(Int, Int)] = new ListBuffer[(Int, Int)]() var prev: Int = 0 var current: Int = 0while (iter.hasNext) {current = iter.nextlst += ((prev, current))prev = current}lst.iterator}) printResult("mapPartitions", rdd5) 結果:mapPartitions >> List((0,1), (1,3), (3,5), (5,7), (7,9), (0,2), (2,4), (4,6), (6,8))- mapPartitionsWithIndex
mapPartitionsWithIndex于mapPartitions的功能類似,只是多傳入split index而已,所有func函數必須是(Int,Iterator<T>)=> Iterator<U>類型。
val rdd6 = rdd1.repartition(2).mapPartitionsWithIndex((idx, iter) => { val lst = new ListBuffer[String]() var sum = 0 while (iter.hasNext) {sum += iter.next}lst += (idx + ":" + sum)lst.iterator }) printResult("mapPartitionsWithIndex", rdd6) // 結果:mapPartitionsWithIndex >> List(0:25, 1:20)- sample
sample(withReplacemet,fraction,seed)是根據給定的隨機種子seed,隨機抽樣出數量為fraction的數據。其中,withReplacement:是否放回抽樣;fraction:比抽樣比例,0.1表示抽樣10%,seed:隨機種子,相同的seed得到的隨機序列是一樣的。所以,如果沒有設置seed,同一段代碼執行兩遍得到的隨機序列是一樣的。
?
- union
union(otherDataset)是數據的合并,返回一個新的數據集,由原數據集和otherDataset合并而成的一個數據集RDD。
val rdd8 = rdd1.union(rdd2) printResult("union", rdd8) // 結果:union >> List(1, 2, 3, 4, 5, 6, 7, 8, 9, 2, 4, 6, 8, 10, 12, 14, 16, 18)- intersection
intersection(otherDataset)是數據交集,返回一個新的數據集,它是兩個數據集的交集數據。
val rdd9 = rdd8.intersection(rdd1) printResult("intersection", rdd9) // 結果:intersection >> List(4, 8, 1, 9, 5, 6, 2, 3, 7)- distinct
distince(numPartitions)是對數據集進行去重,返回一個新的數據集,它是對兩個數據集去掉重復數據后得到的一個數據集。其中,numPartitions參數是設置任務并行數量。
val rdd10 = rdd8.union(rdd9).distinct(2) printResult("distinct", rdd10) // 結果:distinct >> List(4, 16, 14, 6, 8, 12, 18, 10, 2, 1, 3, 7, 9, 5)- groupByKey
groupByKey(partitioner)是對數據進行分組操作,在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意,默認情況下,使用8個并行任務進行分組,可以傳入partitioner參數設置并行任務的分區數.
val rddMap: RDD[(Int, Int)] = rdd1.map(item => (item % 3, item)) val rdd11 = rddMap.groupByKey(); printResult("groupByKey", rdd11) // 結果:groupByKey >> List((0,CompactBuffer(3, 6, 9)), (2,CompactBuffer(2, 5, 8)), (1,CompactBuffer(1, 4, 7)))- reduceByKey
reduceByKey(func, numPartitions)是對數據進行分組聚合操作,在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集。Key是相同的值,都被使用指定的reduce函數聚合到一起。和groupByKey類似,可以通過參數numPartitions設置并行任務的分區數。
val rdd12 = rddMap.reduceByKey((x, y) => x + y) printResult("reduceByKey", rdd12) // 結果:reduceByKey >> List((0,18), (2,15), (1,12))- sortByKey
sortByKey(ascending, numPartitions)是對RDD中的數據集進行排序操作,對(K,V)類型的數據按照K進行排序,其中K需要實現Ordered方法。
第一個參數是ascending,該參數決定排序后RDD中的元素是升序還是降序,默認是true,按升序排序。
第二個參數是numPartitions,即排序分區的并行任務個數。
- aggregateByKey
aggregateByKey(zeroValue, numPartitions)(seqOp: (U, V) => U,combOp: (U, U) => U)和reduceByKey的不同在于:reduceByKey輸入/輸出都是(K,V),而aggregateByKey輸出是(K,U),可以不同于輸入(K,U)。
ggregateByKey的三個參數如下: ? ? ? ?
所以,可以將aggregateByKey函數抽象成更高級的,更靈活的reduce和group的組合。
val rdd13 = rddMap.aggregateByKey(0)(seqOp = (x, y) => {math.max(x, y)},combOp = (x, y) => {x + y}) printResult("aggregateByKey", rdd13) // 結果:aggregateByKey >> List((0,12), (2,10), (1,11))- combineByKey
combineByKey(createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,numPartitions: Int)是對RDD中的數據按照Key進行聚合操作。聚合操作的邏輯是通過自定義函數提供給combineByKey。?把(K,V)類型的RDD轉換為(K,C)類型的RDD,C和Vk可以不一樣。
combineyey的三個參數如下: ?
- jion
join(other, partitioner)是連接操作,將數據的數據集(K,V)和另外一個數據集(K,W)進行join,得到(K,(V,W))。該操作是對于相同K的V和W集合進行笛卡爾積操作,也即V和W的所有組合。連接操作除了join外,還有左連接,右連接,全連接操作函數:leftOuterJoin,rightOuterJoin和fullOuterJoin,它們的用法基本上是一樣的。
val rddMap1 = rdd1.map(item => (item % 4, item)) val rdd16 = rddMap.join(rddMap1) printResult("join", rdd16) // 結果:join >> List((0,(3,4)), (0,(3,8)), (0,(6,4)), (0,(6,8)), (0,(9,4)), (0,(9,8)), (2,(2,2)), (2,(2,6)), (2,(5,2)), (2,(5,6)), (2,(8,2)), (2,(8,6)), (1,(1,1)), (1,(1,5)), (1,(1,9)), (1,(4,1)), (1,(4,5)), (1,(4,9)), (1,(7,1)), (1,(7,5)), (1,(7,9)))- pipe
pipe(command: Seq[String],env: Map[String, String])是以shell命令處理RDD數據。
val rdd19: RDD[String] = rdd1.pipe("head -n 1") printResult("pipe", rdd19) // 結果:pipe >> List(1,4,7)- subtract
subtract(other: RDD[T], numPartitions: Int)是RDD對other數據集進行減法操作,將輸入的rdd中的元素減去other中的元素,得到他們差值的一個新的RDD。
val rdd21 = rdd1.subtract(sc.parallelize(1 to 5)) printResult("subtract", rdd21) // 結果:subtract >> List(6, 9, 7, 8)- zip
zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]是對兩個RDD進行拉鏈操作,得到一個新的RDD[T,U]。拉鏈操作還包括:zipWitIndex和zipPartitinos。
val rdd22 = rdd1.repartition(3).zip(sc.parallelize(Array("A", "B", "C", "D", "E", "F", "G", "H", "I"), 3)) printResult("zip", rdd22) // 結果: zip >> List((3,A), (6,B), (8,C), (1,D), (4,E), (9,F), (2,G), (5,H), (7,I))- coalesce 和?repartition
coalesce(numPartitions: Int)是對RDD進行重分區,默認不進行shuffle,且該RDD的分區個數等于numPartitions個數。
repartition(numPartitions: Int)是將RDD進行重新分區,分區過程中會進行shuffle,調整分區數量為numPartitions。
總結
以上是生活随笔為你收集整理的[scala-spark]10. RDD转换操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从单纯聊天到在线生活
- 下一篇: 关于HOOK截入中文输入