Spark中的键值对操作-scala
?
1.PairRDD介紹
? ? Spark為包含鍵值對類型的RDD提供了一些專有的操作。這些RDD被稱為PairRDD。PairRDD提供了并行操作各個鍵或跨節點重新進行數據分組的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分別規約每個鍵對應的數據,還有join()方法,可以把兩個RDD中鍵相同的元素組合在一起,合并為一個RDD。
2.創建Pair RDD
? ? 程序示例:對一個英語單詞組成的文本行,提取其中的第一個單詞作為key,將整個句子作為value,建立 PairRDD
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me")); //獲取第一個單詞作為鍵 val words =rdd.map(x=>(x.split(" ")(0),x)); words.collect().foreach(println);輸出結果:
(this,this is a test)
(how,how are you)
(do,do you love me)
(can,can you tell me)
?
3.PairRDD的轉化操作
? ??PairRDD可以使用所有標準RDD上可用的轉化操作。傳遞函數的規則也適用于PairRDD。由于PairRDD中包含二元組,所以需要傳遞的函數應當操作而元素而不是獨立的元素。
? ??? ??? ??? ??? ??? ??? ??? ?PairRDD的相關轉化操作如下表所示
針對兩個PairRDD的轉化操作 rdd={(1,2),(3,4),(3,6)} other={(3,9)}
| 函數名 | 目的 | 示例 | 結果 |
| substractByKey | 刪掉RDD中鍵與other RDD 中的鍵相同的元素 | rdd.subtractByKey(other) | {(1,2)} |
| join | 對兩個RDD進行內連接 | rdd.join(other) | {(3,(4,9)),(3,(6,9))} |
| rightOuterJoin | 對兩個RDD進行連接操作,右外連接 | rdd.rightOuterJoin(other) | {(3,(4,9)),(3,(6,9))} |
| leftOuterJoin | 對兩個RDD進行連接操作,左外連接 | rdd.rightOuterJoin(other) | {(1,(2,None)),(3,(4,9)),(3,(6,9))} |
| cogroup | 將兩個RDD中擁有相同鍵的數據分組 | rdd.cogroup(other) | {1,([2],[]),(3,[4,6],[9])} |
程序實例:
針對2 中程序生成的PairRDD,刪選掉長度超過20個字符的行。
val results=words.filter(value => value._2.length()<20); results.foreach(println)? ??RDD上有fold(),combine(),reduce()等行動操作,pair RDD上則有相應的針對鍵的轉化操作。
? ? (1)reduceByKey()與reduce()操作類似,它們都接收一個函數,并使用該函數對值進行合并。reduceByKey()會為數據集中的每個鍵進行并行的規約操作,每個規約操作會將鍵相同的值合并起來。reduceBykey()最終返回一個由各鍵規約出來的結果值組成的新的RDD。
程序示例:用reduceByKey實現單詞計數
val rdd=sc.parallelize(List("this is a test","how are you","do you love me","can you tell me")); val words =rdd.flatMap(line => line.split(" ")); val results=words.map(word => (word,1)).reduceByKey( {case(x,y) => x+y}); results.foreach(println)輸出:
(are,1)
(this,1)
(is,1)
(you,3)
(can,1)
(a,1)
(love,1)
(do,1)
(how,1)
(tell,1)
(me,2)
(test,1)
?
? (2)foldByKey()與fold()操作類似,他們都使用一個與RDD和合并函數中的數據類型相同的零值作為初始值。與fold()一樣,foldByKey()操作所使用的合并函數對零值與另一個元素進行合并,結果仍為該元素。
? ? 程序示例:求對應key的value之和
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val results=nums.foldByKey(0)({case(x,y)=>x+y}) results.collect().foreach(println)結果:
(1,4)
(2,10)
(3)
? ??combineByKey()是最為常用的基于鍵進行聚合的函數。大多數基于鍵聚合的函數都是用它實現的。和aggregate()一樣,combineByKey()可以讓用戶返回與輸入數據類型不同的返回值。combineByKey()會遍歷分區中的所有元素,因此,每個元素的鍵要么還么有遇到過,要么就和之前的某個元素的鍵相同。如果這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來創建那個鍵對應的累加器的初始值。需要注意的是,這一過程會在每個分區中第一次出現每個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。
? ? 如果這是一個處理當前分區之前就已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合并。
? ? 由于每個分區都是獨立處理的,因此對于同一個鍵可以有多個累加器。如果有兩個或者更多的分區都有對應一個鍵的累加器,就需要使用用戶提供的mergeCombiners()方法將各個分區的結果進行合并。
? ??以下程序示例使用combineBykey()求每個鍵對應的平均值。
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val results=nums.combineByKey((v)=>(v,1), (acc:(Int,Int),v) =>(acc._1+v,acc._2+1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2) ).map{case(key,value)=>(key,value._1/value._2.toFloat)} results.collectAsMap().map(println)結果:
(2,5.0)
(1,2.0)
成功求出每個key對應value對應的平均值
*(4)并行度調優
? ? 每個RDD都有固定數目的分區,分區數決定了在RDD上執行操作時的并行度。
? ? 在執行聚合或者分組操作時,可以要求Spark使用給定的分區數。Spark始終嘗試根據集群的大小推斷出一個有意義的默認值,但是你可以通過對并行度進行調優來獲得更好的性能表現。
? ? 在Scala中,combineByKey()函數和reduceByKey()函數的最后一個可選的參數用于指定分區的數目,即numPartitions,使用如下:
val results=nums.reduceByKey({(x,y) =>x+y},2);5.數據分組
(1)groupByKey()
? ? groupByKey()會使用RDD中的鍵來對數據進行分組。對于一個由類型K的鍵和類型V的值組成的RDD,得到的RDD類型會是[K,Iterable[v]]。
? ? 以下是程序示例,對PairRDD調用groupByKey()函數之后,返回的RDD類型是RDD[K,Iterable[v]]
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(1, 3), Tuple2(2, 2), Tuple2(2, 8))); val group=nums.groupByKey(); val results=group.collect(); for(value <- results){print(value._1+": ")for(elem <- value._2)print(elem+" ")println()}輸出結果:
1: 1 3?
2: 2 8?
(2)cogroup()
? ? 除了對單個RDD的數據進行分組,還可以使用cogroup()函數對對個共享同一個鍵的RDD進行分組。對兩個鍵的類型均為K而值得類型分別為V和W的RDD進行cogroup()時,得到結果的RDD類型為[(K,(Iterable[V],Iterable[W]))]。如果其中一個RDD對于另一個RDD中存在的某個鍵沒有對應的記錄,那么對應的迭代器則為空。
舉例:
val nums1 = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4))); val nums2 = sc.parallelize(List(Tuple2(1,1),Tuple2(1,3),Tuple2(2,3))) val results=nums1.cogroup(nums2) for(tuple2 <- results.collect()){print(tuple2._1+" [ ")for(it <- tuple2._2._1)print(it+" ")print("] [ ")for(it<-tuple2._2._2)print(it+" ")println("]") }輸出:
1 [ 1 3 ] [ 1 3 ]
3 [ 4 ] [ ]
2 [ 2 4 ] [ 3 ]
6.數據排序
在Scala中以字符串順序對正數進行自定義排序
(1)對RDD進行排序:
val nums =sc.parallelize(List(12,4,6,8,0,8)); //隱式轉換聲明排序的依據 implicit val sortIntegersByString = new Ordering[Int] {override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString()) } val results=nums.sortBy(value=>value); results.collect().foreach(println)(2)對PairRDD,按key的值進行排序
val nums = sc.parallelize(List(Tuple2(1, 1), Tuple2(2, 2), Tuple2(1, 3),Tuple2(2, 4),Tuple2(3, 4))); //隱式轉換聲明排序的依據 implicit val sortIntegersByString = new Ordering[Int] {override def compare(x: Int, y: Int): Int = x.toString().compareTo(y.toString()) } val results=nums.sortByKey(); results.collect().foreach(println)7.數據分區
(1)創建數據分區
? ? 在分布式程序中,通信的代價很大,控制數據分布以獲得最少的網絡傳輸可以極大地提升整體性能。Spark程序可以通過控制RDD分區的方式來減少通信消耗。只有當數據集多次在諸如連接這種基于鍵的操作中,分區才會有作用。
? ? Spark中所有的鍵值對RDD都可以進行分區。系統會根據一個針對鍵的函數對元素進行分組。Spark可以確保同一組的鍵出現在一個節點上。
? ? 舉個簡單的例子,應用如下:內存中保存著很大的用戶信息表,由(UserID,UserInfo[])組成的RDD,UserInfo是用戶所訂閱的所有主題列表。該應用會周期性地將這張表和一個小文件進行組合,這個小文件中存這過去5分鐘發生的時間,其實就是一系列(UserId,LinkInfo)RDD,其中LinkInfo是用戶訪問的鏈接的主題。我們需要對用戶訪問其未訂閱主題的頁面情況進行統計。我們可以使用Spark的join()操作進行組合操作。將兩者根據UserId連接之后,過濾出不在UserInfo[]中的LinkInfo,就是用戶訪問其未訂閱主題的情況。
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1) val events = sc.parallelize(list2) val joined=userData.join(events) val results=joined.filter({case (id, (info, link)) =>!info.contains(link)} ).count() println(results)輸出:1
? ? 這段代碼可以正確運行,但是效率不高。因為每5分鐘就要進行一次join()操作,而我們對數據集如何分區卻一無所知。默認情況下,連接操作會將兩個數據集中的所有鍵的哈希值都求出來,將該哈希值相同的記錄通過網絡傳到同一臺機器上,然后在那臺機器上對所有鍵相同的記錄進行連接操作。因為userData表比每5分鐘出現的訪問日志表events要大很多,所以要浪費時間進行額外的工作:在每次調用時都對userDAta表進行哈希值計算和跨節點數據混洗,雖然這些數據從來不會變化。
? ? 要解決此問題:在程序開始的時候,對userData表進行partitionBy()轉化操作,將這張表轉化為哈希分區。可以通過向patitionBy傳遞一個spark.HashPartitioner對象來實現該操作。
? ? scala自定義分區方式:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY)? ? 這樣以后在調用join()時,Spark就知道了該RDD是根據鍵的哈希值來分區的,這樣在調用join()時,Spark就會利用這一點,只會對events進行數據混洗操作,將events中特定userId的記錄發送到userData的對應分區所在的那臺機器上。這樣,需要網絡傳輸的數據就大大減小了,程序運行的速度也顯著提高。
? ? 請注意,我們還對userData 這個RDD進行了持久化操作,默認情況下,每一個由轉化操作得到的RDD都會在每次執行啟動操作時重新計算生成,將userData持久化之后,就能保證userData能夠在訪問時被快速獲取。
? ? *進一步解釋數據分區帶來的好處:
? ? 如果沒有將partitionBy()轉化操作的結果進行持久化,那么后面每次用到這個RDD時都會重復對數據進行分區操作。不進行持久化會導致整個RDD譜系圖重新求值。那樣的話,partitionBy()帶來的好處就會抵消,導致重復對數據進行分區以及跨節點的混洗,和沒有指定分區方式時發生的情況是十分相似的。
(2)獲取數據分區的方式
接(1)中程序:
val list1 =List(Tuple2("zhou",List("it","math")),Tuple2("gan",List("money","book"))) val list2= List(Tuple2("zhou","it"),Tuple2("zhou","stock"),Tuple2("gan","money"),Tuple2("gan","book")) val userData =sc.parallelize(list1).partitionBy(new HashPartitioner(100)).persist(StorageLevel.MEMORY_ONLY) println(userData.partitioner)? RDD的屬性partitioner就是存儲了對應的分區方式
(3)從分區中獲益的操作
? ? Spark中的很多操作都引入了根據鍵跨結點進行混洗的過程。所有這些操作都會從數據分區中獲益。能夠從數據分區中獲益的操作有:groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),以及lockup()。
? ? 對于像reduceByKey()這樣只作用于單個RDD的操作,運行在未分區的RDD的時候或導致每個鍵所有對應值都在每臺機器上進行本地計算,只需要把本地最終歸約出的結果值從各工作節點傳回主節點,所以原本的網絡開銷就不太大。而對于諸如cogroup()和join()這樣的二元操作,預先進行數據分區會導致其中至少一個RDD(使用已知分區器的那個RDD)不發生數據混洗。如果兩個RDD使用同樣的分區方式,并且它們還緩存在同樣的機器上(比如一個RDD是通過mapValues()從另一個RDD中創建出來的,這兩個RDD就會擁有相同的鍵和分區方式),或者其中一個RDD還沒有計算出來,那么跨節點數據混洗就不會發生了。
(4)影響分區方式的操作
? ??所有會為生成的結果RDD設好分區方式的操作:cogroup(),groupWith(),join(),leftOuterJoin(),rightOuterJoin(),groupByKey(),reduceByKey(),combineByKey(),partitionBy(),sort(),mapValues()(如果父RDD有分區方式的話),filter()(如果父RDD有分區方式的話)。其他所有操作生成的結果都不會存在特定的分區方式。
注意:? ??
? ? 對于二元操作,輸出數據的分區方式取決于父RDD的分區方式。默認情況下,結果會采用哈希分區,分區的數量和操作的并行度是一樣的。如果其中一個父RDD已經設置過分區方式,那么結果就會采用那種分區方式;如果兩個父RDD都設置過分區方式,結果RDD會采用第一個RDD的分區方式。
8.示例程序-PageRank
? ??PageRank算法是一種從RDD分區中獲益的更復雜的算法,我們以它為例進行分析。PageRank算法用來根據外部文檔指向一個文檔的鏈接,對集合中每個文檔的重要程度賦一個度量值。該算法可以用于對網頁進行排序,當然,也可以用于排序科技文章或社交網絡中有影響的用戶。
? ? 算法會維護兩個數據集,一個由(pageID,linklist[])組成,包含每個頁面的鏈接到的頁面的列表;另一個由(pageID,rank)元素組成,包含每個頁面的當前排序值。它按以下步驟進行計算:
? ??① 將每個頁面的排序值初始化為1.0
? ??? ??②在每次迭代中,向每個有直接鏈接的頁面,發送一個值為rank(p)/numNeighbors(p)(出鏈數目) ? 的貢獻量
? ? ? ? ③將每個頁面的排序值設置為0.15+0.85*contributionsReceived
?? ?????最后兩步會重復幾個循環,在此過程中,算法會逐漸收斂于每個頁面的實際PageRank值。在實際操作中,收斂通常需要進行十個迭代。
下面用Scala來實現PageRank算法:
/* #以下是url的內容: www.baidu.com www.hao123.com www.baidu.com www.2345.com www.baidu.com www.zhouyang.com www.hao123.com www.baidu.com www.hao123.com www.zhouyang.com www.zhouyang.com www.baidu.com */ val inputs =sc.textFile("C:\\url.txt") //url,[urls] val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1))).distinct().groupByKey().cache() //url,rank var ranks =links.mapValues(value =>1.0) for(i<-0 until 10){val contribs =links.join(ranks).flatMap({case(pageid,(links,rank))=>//url Double links.map(dest=>(dest,rank/links.size))})//reduce and add the contribs ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v) } ranks.collect().foreach(println)結果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
9.Scala設置自定義分區方式
? ? Spark允許你通過自定義Partitioner對象來控制RDD的分區方式,這樣可以讓你利用領域知識進一步減少通信消耗。
? ? 舉個例子,假設我們要在一個網頁的集合上運行前一屆中的PageRank算法。在這里,每個頁面的ID是頁面的URL。當我們使用簡單的哈希函數進行分區時,擁有相似的URL的頁面比如?http://www.baidu.com/news?與?http://www.baidu.com/map?可能被分在完全不同的節點上。但是我們知道,同一個域名下的網頁更有可能相互連接。由于PageRank需要在每次迭代中從每個頁面向它所有相鄰的頁面發送一條消息,因襲把這些頁面分組在同一個分區中會更好。可以使用自定義的分區器來實現僅根據域名而不是整個URL進行分區。
? ? 要實現先自定義Partitioner,需要繼承Partitioner類并實現其下述方法:
? ??override def numPartitions: Int = ???
? ? 返回創建的分區數量
? ? override def getPartition(key: Any): Int = ???
? ? 返回給定鍵的數量
? ??? ??override def equals(other:Any):Boolean = ???
? ? Spark需要這個方法來檢查分區器對象是否與其他分區器實例相同,這樣Spark才能判斷兩個RDD的分區方式是否相同。
?
class DomainNamePartitioner (numParts:Int) extends Partitioner{override def numPartitions: Int = numParts//根據hashCode和numPartitions取余來得到Partition,因為返回的必須是非負數,所以對于hashCode為負的情況做了特殊處理 override def getPartition(key: Any): Int = {val domain = new URL(key.toString).getHost(); val code=(domain.hashCode%numPartitions)if(code<0){code+numPartitions}else{code}}override def equals(other:Any):Boolean = other match {//這個實例是DomainNamePartitioner的實例,并且numPartitions相同,返回true case dnp:DomainNamePartitioner =>dnp.numPartitions==numPartitions//否則,返回false case _ => false } }?
?
?
?
總結
以上是生活随笔為你收集整理的Spark中的键值对操作-scala的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 快速理解Spark Dataset
- 下一篇: Spark MLlib实现的广告点击预测