spark常用函数比较
2019獨角獸企業重金招聘Python工程師標準>>>
算法分類:轉換(transformation)和執行(action)
查看算子使用demo
coalesce & repartition & partitionBy
reparation是coalesce的特殊情況 ,reparation會將coalesce中的shuffle參數設置為true,會使用HashPartitioner重新混洗分區,如果原有分區數據不均勻可以用reparation來重新混洗分區,使數據均勻分布,重新混洗過的分區和新的分區時寬依賴的關系
coalesce shuffle參數為false的情況 不會重新混洗分區,它是合并分區,比如把原來1000個分區合并成100個,父rdd和子rdd是窄依賴,
coalesce當shuffle參數設置為false時,如果設置的新partition數量大于之前的,則按照之前的分區數量重新分區。如果shuffle參數設置為true則效果和repartition一致。
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true) }partitionBy需要指定分區函數和分區數量
var rdd2=rdd.partitionBy(new HashPartitioner(2))range
// range函數是閉開區間[) range(1,4,1) //輸出:1,2,3 // to 函數是閉閉區間[] sc.makeRDD(1 to 5,2) // 輸出:1,2,3,4,5zip & zipWithIndex & zipWithUniqueId
zip
1.如果兩個RDD分區數不同,則拋出異常:Can’t zip RDDs with unequal numbers of partitions
2.如果兩個RDD的元素個數不同,則拋出異常:Can only zip RDDs with same number of elements in each partition
zipPartitions
zipPartitions函數將多個RDD按照partition組合成為新的RDD。
該函數需要組合的RDD具有相同的分區數,但對于每個分區內的元素數量沒有要求。
var rdd1=sparkSession.range(1,4,1).rdd var rdd2=sparkSession.range(4,7,1).rdd var rdd3=sparkSession.range(7,10,1).rdd // zip函數用于將兩個RDD組合成Key/Value形式的RDD,這里默認兩個RDD的partition數量以及每個partition的元素數量都相同,否則會拋出異常。 var rdd5=rdd1 zip rdd2 zip rdd3 /*** +-----+---+* | _1| _2|* +-----+---+* |[1,4]| 7|* |[2,5]| 8|* |[3,6]| 9|* +-----+---+*/ // 該函數將RDD中的元素和這個元素在RDD中的ID(索引號)組合成鍵/值對。 var rdd6=rdd1.zipWithIndex/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 2| 1|* | 3| 2|* +---+---+*/ var rdd7=sparkSession.range(1,10,2).rdd // 該函數將RDD中元素和一個唯一ID組合成鍵/值對,該唯一ID生成算法如下: // 每個分區中第一個元素的唯一ID值為:該分區索引號 // 每個分區中第N個元素的唯一ID值為:(前一個元素的唯一ID值) + (該RDD總的分區數) var rdd8=rdd7.zipWithUniqueId()/*** +---+---+* | _1| _2|* +---+---+* | 1| 0|* | 3| 2|* | 5| 1|* | 7| 3|* | 9| 5|* +---+---+*/mapPartitionsWithIndex
var rdd1 = sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2)// 函數作用同mapPartitions相同,不過提供了兩個參數,第一個參數為分區的索引var rdd2 = rdd1.mapPartitionsWithIndex {(partIdx, iter) => {var part_map = scala.collection.mutable.Map[String, List[(Int, String)]]()while (iter.hasNext) {var part_name = "part_" + partIdxvar elem = iter.next()if (part_map.contains(part_name)) {var elems = part_map(part_name)elems ::= elempart_map(part_name)=elems} else {part_map(part_name) = List[(Int, String)] {elem}}}part_map.iterator}}.collect()/*** +------+--------------+* | _1| _2|* +------+--------------+* |part_0|[[2,B], [1,A]]|* |part_1|[[4,D], [3,C]]|* +------+--------------+*/map & mapValues
var rdd1=sparkSession.sparkContext.makeRDD(Array((1, "A"), (2, "B"), (3, "C"), (4, "D")),2) // 對[K,V]整體操作 var rdd3=rdd1.map(_+"_").foreach(println(_)) /*** (1,A)_* (3,C)_* (2,B)_* (4,D)_*/ var rdd2=rdd1.mapValues(_+"_")/*** +---+---+* | _1| _2|* +---+---+* | 1| A_|* | 2| B_|* | 3| C_|* | 4| D_|* +---+---+*/// 鍵值對轉換rdd1.map(_.swap).foreach(println(_))/*** (C,3)* (D,4)* (A,1)* (B,2)*/// 使用map實現mapValues rdd1.map(x=>(x._1,x._2+"_")).foreach(println(_))/*** (1,A_)* (2,B_)* (3,C_)* (4,D_)*/flodByKey
val rdd4=sparkSession.sparkContext.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1))) val rdd5=rdd4.foldByKey(2)(_+_).collect()/*** +---+---+* | _1| _2|* +---+---+* | B| 5|* | A| 4|* | C| 3|* +---+---+*/groupByKey & reduceByKey & aggregateByKey & flodByKey
reduceByKey現在map過程中先進行聚合,再到reduce端聚合,減少數據太大帶來的壓力,減小RPC過程中的傳輸壓力。groupByKey是直接在reduce端進行聚合的,所以效率比reduceByKey低。
foldByKey和reduceByKey的功能是相似的,都是在map端先進行聚合,再到reduce聚合。不同的是flodByKey需要傳入一個參數。該參數是計算的初始值。
groupByKey是對每個key進行合并操作,但只生成一個sequence,groupByKey本身不能自定義操作函數。spark只能先將所有的鍵值對都移動,這樣的后果是集群節點之間的開銷很大,導致傳輸延時,詳情。
val words = Array("one", "two", "two", "three", "three", "three") val wordsRDD = sparkSession.sparkContext.parallelize(words).map(word => (word, 1)) val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect() val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect() val wordsCountWithAggregate=wordsRDD.aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)// aggregate簡寫seqOp和comOp使用同一個函數 val wordsCountWithFlod=wordsRDD.flodByKey(0)(_+_) val wordsCountWithCombe=wordsRDD.combineByKey((v: Int) => v,(c: Int, v: Int) => c+v,(c1: Int, c2: Int) => c1 + c2).collectcombineByKey
注意:
cogroup & union
cogroup相當于SQL中的全外連接full outer join,返回左右RDD中的記錄,關聯不上的為空??芍付ǚ謪^數和分區函數,返回的是key和每個RDD的迭代器
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] var rdd1 = sparkSession.sparkContext.makeRDD(Array(("A","1"),("B","2")),2) var rdd2 = sparkSession.sparkContext.makeRDD(Array(("A","3"),("C","4")),2) var rdd3 = sparkSession.sparkContext.makeRDD(Array(("A","5"),("C","6"),("D","8")),2) rdd1.cogroup(rdd2,rdd3).collect().foreach(x=>println("("+x._1+","+x._2._1+","+x._2._2+x._2._3+")"))/*** output:* (B,CompactBuffer(2),CompactBuffer()CompactBuffer())* (D,CompactBuffer(),CompactBuffer()CompactBuffer(8))* (A,CompactBuffer(1),CompactBuffer(3)CompactBuffer(5))* (C,CompactBuffer(),CompactBuffer(4)CompactBuffer(6))*/ rdd1.union(rdd2).collect().foreach(x=>println("("+x._1+","+x._2)+")")jion
// join相當于SQL中的內關聯join,只返回兩個RDD根據K可以關聯上的結果,join只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] // leftOuterJoin類似于SQL中的左外關聯left outer join,返回結果以前面的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可 def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] // rightOuterJoin類似于SQL中的有外關聯right outer join,返回結果以參數中的RDD為主,關聯不上的記錄為空。只能用于兩個RDD之間的關聯,如果要多個RDD關聯,多關聯幾次即可。 def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] // cogroup相當于SQL中的全外連接full outer join,返回左右RDD中的記錄,關聯不上的為空。 def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]注意:
rdd1.leftOuterJoin(rdd2)和rdd2.rightOuterJoin(rdd1)的結果是相同的,但是輸出格式是不一致的,不管是left jion還是right jion,輸出結果都是先輸出左邊的rdd對應的列,再輸出右邊的RDD對象的列
union & intersection & subtract
subtractByKey和基本轉換操作中的subtract類似,返回在主RDD中出現,并且不在otherRDD中出現的元素,可指定輸出分區數量和分區函數。
transformation
- map/mapValues/flatMap/mapPartitions/mapPartitionsWithIndex
- filter
- distinct:并局部無序而整體有序返回
action
- rdd.foreach
- rdd.first
- rdd.take(10): 從第一個分區的第一行數據開始取,不排序
- rdd.takeOrdered(10):與top函數類似,但是與top函數的排序方式相反
- rdd.top(10):默認按照降序的方式取前10個元素,可自定義排序規則
- rdd.sortBy(x=>x._2,true):按照RDD第二列進行升序排列(false為降序)
- rdd.countByValue():countByValue()函數與tuple元組中的(k,v)中的v 沒有關系,這點要搞清楚,countByValue是針對Rdd中的每一個元素對象。
- rdd.aggregate(1)({(x:Int,y:Int)=>x+y},{(sum1:Int,sum2:Int)=>sum1+sum2})
- rdd. fold(1)()(x:Int,y:Int)=>x+y): aggregate簡寫seqOp和comOp使用同一個函數
- saveAsTextFile,saveAsObjectFile,saveAsSequenceFile
- rdd.takeSample
sparkSql
object aggregatesFun extends Catalogs_Tutorial{import org.apache.spark.sql.functions._questionsDataFrame.filter("id > 400 and id< 450").filter("owner_userid is not null").join(dfTags,dfQuestions.col("id").equalTo(dfTags("id"))).groupBy(dfQuestions.col("owner_userid")).agg( avg("score"),max("answer_count")) // .sparkSession.conf.set("retainGroupColumns",false) // 結果是否展示分組字段.show() } +------------+----------+-----------------+ |owner_userid|avg(score)|max(answer_count)| +------------+----------+-----------------+ | 268| 26.0| 1| | 136| 57.6| 9| | 123| 20.0| 3| +------------+----------+-----------------+統計函數
- 基本統計函數:avg,mean,max,min,sum
- 高級統計函數:皮爾遜相關性(corr),協方差(cov),頻繁項(freqItems),交叉表(crosstabe),行列轉換(透視(pivot)),抽樣(sample)分層抽樣(sampleBy),詞頻統計(countMinSketch),布隆過濾器
- 顯示對dataFrame的統計結果:describe,包含標準差(stddev)和avg,max,min,count
手寫wordCount
object LocalWorldCount {def main(args: Array[String]): Unit = {val conf=new SparkConf()conf.setAppName("my first spark local App")conf.setMaster("local")val sc=new SparkContext(conf)val lines=sc.textFile("file:\\E:\\data\\worldCount.txt")val words=lines.flatMap(line=>line.split(" "))val pairs=words.map(word=>(word,1))val worldCount=pairs.reduceByKey(_+_)val sortedWordCount=worldCount.map(pair=>(pair._2,pair._1)).sortByKey(true).map(pair=>(pair._2,pair._1))sortedWordCount.collect.foreach(println)sc.stop()} } // 對應sql lines.算子選擇
mapPartitions/reduceByKey/foreachPartition/
使用filter之后進行coalesce操作。
使用repartitionAndSortWithinPartitions替代repartition與sort類操作。
repartitionAndSortWithinPartitions是Spark官網推薦的一個算子。官方建議,如果是需要在repartition重分區之后還要進行排序,就可以直接使用repartitionAndSortWithinPartitions算子。因為該算子可以一邊進行重分區的shuffle操作,一邊進行排序。shuffle與sort兩個操作同時進行,比先shuffle再sort來說,性能可能是要高的。
轉載于:https://my.oschina.net/freelili/blog/3037961
總結
以上是生活随笔為你收集整理的spark常用函数比较的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SD卡格式化怎么恢复?只需要五个步骤
- 下一篇: 总结了点React,咱也不敢说