Spark的Transformations算子(理解+实例)
把每個Transformations算子都敲著練習幾遍會理解的更深刻
Transformations算子之后要寫action算子才會進行計算。
1. map(func)
描述:返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)val numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(x => x * x)resultRDD.foreach(println)} 結(jié)果: 1 4 9 16 25 362. filter(func)
描述:返回一個新的RDD,該RDD經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(1,2,3,4,5,6)//parallelize()創(chuàng)建個rddval numRDD = sc.parallelize(arr)val resultRDD = numRDD.map(_%2 == 0)resultRDD.foreach(println)resultRDD.take(100).foreach(println)resultRDD.collect()} 結(jié)果: false true false true false true3.flatMap(func)
描述:類似map,到每個輸入元素可以被映射為0個或者多個輸入元素(所以func返回一個序列,而不是一個元素)
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val words = Array("hello python","hello hadoop","hello spark")val wordRDD = sc.parallelize(words)wordRDD.flatMap(_.split(" ")).collect.foreach(println)} 結(jié)果: hello python hello hadoop hello spark4.mapPartitions(func)
描述:類似map,但獨立在RDD的每個分區(qū)上運行,因此在類型為T的RDD上運行時,,func函數(shù)的類型必須是Iterator => Iterator
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chun").setMaster("local")val sc = new SparkContext(conf)val array = Array(1,2,1,2,2,3,4,5,6,7,8,9)val arrayRDD = sc.parallelize(array)arrayRDD.mapPartitions(elements =>{val result = new ArrayBuffer[Int]()elements.foreach(e =>{result +=e})result.iterator}).foreach(println)} 結(jié)果: 1212234567895.mapPartitionsWithIndex(func)
描述:類似于mapPartitions,但func帶有一個整形參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時func函數(shù)的類型必須(int,Iterator)=> Iterator
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),2) //2表示分區(qū)數(shù)arrayRDD.mapPartitionsWithIndex((index,elements) =>{println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)} 運行結(jié)果: partition index:0 1 2 3 4partition index:1 5 6 7 8 96.sample(WithReplacement,fraction,seed)
描述:根據(jù)fraction指定的比例對數(shù)據(jù)進行采樣,可以選擇是否使用隨機數(shù)進行替換,seed用于指定隨機數(shù)生成器種子
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(true,0.001) //true表示抽樣之后放回println(sampleRDD.count())}結(jié)果:10 def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(1 to 10000)val sampleRDD = arrayRDD.sample(false,0.001) //false表示抽樣之后不放回println(sampleRDD.count())結(jié)果:9}7.union(otherDataset)
描述:對源RDD和參數(shù)RDD求并集后并返回一個新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 10)val rdd2 = sc.parallelize(11 to 20)val resultRDD = rdd1.union(rdd2)resultRDD.foreach(print)} 結(jié)果: 111213141516171819208.intersection(otherDataset)
描述:對源RDD和參數(shù)RDD求交集后并返回一個新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(Array(1,3,5,7,8))val rdd2 = sc.parallelize(Array(3,5,7))val resultRDD = rdd1.intersection(rdd2)resultRDD.foreach(println)} 結(jié)果: 3 7 59.distinct([numTasks])
描述:對源RDD進行去重,返回一個新的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr = Array(Tuple3("max","math",90),("max","englist",85),("mike","math",100))val scoreRDD = sc.parallelize(arr)val studentNumber = scoreRDD.map(_._1).distinct().collect()println(studentNumber.mkString(","))} 結(jié)果: max,mike10.groupByKey([numTasks])
描述:在一個(k,v)形式的RDD上調(diào)用,返回一個(k,Iterator[V])的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)var x =0val arr = Array("chun1 chun2 chun3 chun1 chun1 chun2", "chun1")val arrRDD = sc.parallelize(arr)val resultRDD = arrRDD.flatMap(_.split(" ")).map((_,1)).groupByKey()//resultRDD.foreach(println)resultRDD.foreach(element => {println(element._1+" "+element._2.size)})} chun1 4 chun3 1 chun2 211.reduceByKey(func,[numTasks])
描述:在一個(k,v)形式的RDD上調(diào)用,返回一個(k,v)的RDD,使用指定的reduce函數(shù),將相同key的值聚集到一起,與groupBy類似,reudce任務的個數(shù)可以通過第二個參數(shù)來設置
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr =Array("chun1 chun2 chun3 chun1 chun1 chun2","chun1")val arrRDD=sc.parallelize(arr)val resultRDD = a.flatMap(_.split(" ")).map(x=>((x,1))).reduceByKey(_+_).collect.foreach(println)} 結(jié)果: (chun1,4) (chun3,1) (chun2,2)12.aggregateByKey(zeroValue)(seqOp,combOP,[numTasks])
描述:當調(diào)用(k,v)對的數(shù)據(jù)集時,返回(K,U)數(shù)據(jù)集,其中每個key的值使用給定的聚合函數(shù)和中性‘零’進行聚合,與groupyKey類似,reduce任務的數(shù)量可以通過可選的第二個參數(shù)進行配置
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val data = List((1,3),(1,4),(2,3),(3,6),(1,2),(3,8))val rdd =sc.parallelize(data)rdd.aggregateByKey(0)(math.max(_,_),_+_).collect(.foreach(println()))} 結(jié)果:(1,4) (3,8) (2,3)13.sortByKey([ascending],[numTasks])
描述:在一個(k,v)形式的RDD上調(diào)用,k必須實現(xiàn)Ordered接口,返回一個按照key進行排序的(k,v)的RDD
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val scores = Array(Tuple2("mike",80),("max",90),("bob",100))val scoresRDD = sc.parallelize(scores)val sortByKeyRDD = scoresRDD.map(x => (x._2,x._1)).sortByKey(false).map(x =>(x._2,x._1) //把元組k,v換位值進行排序后,再換回來)sortByKeyRDD.collect.foreach(println)} (bob,100) (max,90) (mike,80)14.join(otherDataset,[numTasks])
描述:當調(diào)用(k,v)和(k,w)類型的數(shù)據(jù)集時,返回一個(k,(v,w))形式的數(shù)據(jù)集,支持left outer join、right outer join 和full outer join
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)//學生信息val students = Array(Tuple2(1,"max"),Tuple2(2,"mike"),Tuple2(3,"bob"))//分數(shù)val scores = Array(Tuple2(1,90),Tuple2(2,120),Tuple2(3,80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)//兩組kv對join,返回的是(k,(v,w))val resultRDD = stuRDD.join(scoresRDD).sortByKey()resultRDD.foreach(x => {println("id:" +x._1 +" name:"+x._2._1 + " score:"+x._2._2)println("=========================")})} 結(jié)果:id:1 name:max score:90 ========================= id:2 name:mike score:120 ========================= id:3 name:bob score:80 =========================15.cogroup(otherDataset,[numTasks])
描述:當調(diào)用(k,v)和(k,w)類型的數(shù)據(jù)集時,返回(k,(Iterator,Iterator))元組的數(shù)據(jù)集
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun1").setMaster("local")val sc = new SparkContext(conf)//學生信息val students = Array(("class1","max"),("class1","mike"),("class2","bob"))//分數(shù)val scores = Array(("class1",90),("class1",120),("class2",80))val stuRDD = sc.parallelize(students)val scoresRDD = sc.parallelize(scores)val resultRDD = stuRDD.cogroup(scoresRDD).sortByKey()resultRDD.foreach(x =>{println("class:"+x._1)x._2._1.foreach(println)x._2._2.foreach(println) //可以去掉只顯示名字println("===========")})} 結(jié)果:class:class1 max mike 90 120 =========== class:class2 bob 80 ===========16.cartesian(otherDataset)
描述:當調(diào)用T和U類型的數(shù)據(jù)集時,返回一個(T,U)類型的數(shù)據(jù)集
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arr1 = sc.parallelize(Array(1,3,5))val arr2 = sc.parallelize(Array(2,4,6))arr1.cartesian(arr2).collect().foreach(println)} (1,2) (1,4) (1,6) (3,2) (3,4) (3,6) (5,2) (5,4) (5,6)17.pipe(command,[envVars])
描述:通過shell命令(例如perl或bash腳本)對RDD的每個分區(qū)進行管道連接。RDD元素寫入進程的stdin,輸出到其stdout的行作為字符串的RDD返回
18.coalesce(numpartitions)
描述:將RDD中的分區(qū)數(shù)減少到numpartitions,在過濾大型數(shù)據(jù)集后,可以更高效地運行操作
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val rdd1 = sc.parallelize(1 to 20,10)println(rdd1.partitions.length) //10var rdd2 = rdd1.coalesce(15,true)println(rdd2.partitions.length) //15var rdd3 = rdd1.repartition(15)println(rdd3.partitions.length) //15var rdd4 = rdd1.coalesce(15,false) //這種是不可以重新分區(qū)的println(rdd4.partitions.length) //10var rdd5 = rdd1.coalesce(2,false)println(rdd5.partitions.length) //2rdd5.foreach(print) //第一個區(qū):12345678910 第二個區(qū):11121314151617181920var rdd6 = rdd1.coalesce(2,true)println(rdd6.partitions.length) //2rdd6.foreach(print) //第一個區(qū):135791113151719 第二個區(qū):2468101214161820}19.repartiton(numPartitions)
描述:隨機重組RDD中的數(shù)據(jù),以創(chuàng)建更多或更少的分區(qū),并在分區(qū)之間進行平衡,總是會產(chǎn)生shuffle操作
repartition和coalesce
他們兩個都是RDD的分區(qū)進行重新劃分,repartition只是coalesce接口中shuffle為true的簡易實現(xiàn),(假設RDD有N個分區(qū),需要重新劃分成M個分區(qū))
1)、N<M。一般情況下N個分區(qū)有數(shù)據(jù)分布不均勻的狀況,利用HashPartitioner函數(shù)將數(shù)據(jù)重新分區(qū)為M個,這時需要將shuffle設置為true。
2)如果N>M并且N和M相差不多,(假如N是1000,M是100)那么就可以將N個分區(qū)中的若干個分區(qū)合并成一個新的分區(qū),最終合并為M個分區(qū),這時可以將shuff設置為false,在shuffl為false的情況下,如果M>N時,coalesce為無效的,不進行shuffle過程,父RDD和子RDD之間是窄依賴關(guān)系。
3)如果N>M并且兩者相差懸殊,這時如果將shuffle設置為false,父子RDD是窄依賴關(guān)系,他們同處在一個Stage中,就可能造成spark程序的并行度不夠,從而影響性能,如果在M為1的時候,為了使coalesce之前的操作有更好的并行度,可以講shuffle設置為true。
總之:如果shuff為false時,如果傳入的參數(shù)大于現(xiàn)有的分區(qū)數(shù)目,RDD的分區(qū)數(shù)不變,也就是說不經(jīng)過shuffle,是無法將RDDde分區(qū)數(shù)變多的。
20.repartitionAndSortWithinPartitions(partitioner)
描述:根據(jù)給定的分區(qū)重新分區(qū)RDD,在每個結(jié)果分區(qū)中,根據(jù)它們的鍵對記錄進行排序。這比調(diào)用重新分區(qū)更有效,然后在每個分區(qū)中進行排序,因為它可以將排序推入到洗牌機器中。
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("chunchun").setMaster("local")val sc = new SparkContext(conf)val arrayRDD = sc.parallelize(Array(1,2,3,4,5,6,7,8,9),3) //3表示分區(qū)數(shù)arrayRDD.mapPartitionsWithIndex((index,elements) =>{ //index為索引值,elements數(shù)據(jù)println("partition index:" + index)val result = new ArrayBuffer[Int]()elements.foreach(e =>{result += e})result.iterator}).foreach(println)} 結(jié)果: partition index:0 1 2 3 partition index:1 4 5 6 partition index:2 7 8 9總結(jié)
以上是生活随笔為你收集整理的Spark的Transformations算子(理解+实例)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 相比10年前,我国居民的收入翻了两倍,为
- 下一篇: 我国人均GDP刚过1万美元,15年后就成