【核心API开发】Spark入门教程[3]
本教程源于2016年3月出版書籍《Spark原理、機制及應用》?,在此以知識共享為初衷公開部分內容,如有興趣,請支持正版書籍。
? ? ? ???
? ? ? ? Spark綜合了前人分布式數據處理架構和語言的優缺點,使用簡潔、一致的函數式語言Scala作為主要開發語言,同時為了方便更多語言背景的人使用,還支持Java、Python和R語言。Spark因為其彈性分布式數據集(RDD)的抽象數據結構設計,通過實現抽象類RDD可以產生面對不同應用場景的子類。本章將先介紹Spark編程模型、RDD的相關概念、常用API源碼及應用案例,然后具體介紹四大應用框架,為后續進一步學習Spark框架打下基礎。
?
3.1 Spark 編程模型概述
?
Spark的編程模型如圖3-1所示。
?
圖3-1?Spark編程模型
? ? ? ?開發人員在編寫Spark應用的時候,需要提供一個包含main函數的驅動程序作為程序的入口,開發人員根據自己的需求,在main函數中調用Spark提供的數據操縱接口,利用集群對數據執行并行操作。
?????? Spark為開發人員提供了兩類抽象接口。第一類抽象接口是彈性分布式數據集(Resilient Distributed Dataset,下文簡稱RDD),顧名思義,RDD是對數據集的抽象封裝,開發人員可以通過RDD提供的開發接口來訪問和操縱數據集合,而無需了解數據的存儲介質(內存或磁盤)、文件系統(本地文件系統、HDFS或Tachyon)、存儲節點(本地或遠程節點)等諸多實現細節;第二類抽象是共享變量(Shared Variables),通常情況下,一個應用程序在運行的時候會被劃分成分布在不同執行器之上的多個任務,從而提高運算的速度,每個任務都會有一份獨立的程序變量拷貝,彼此之間互不干擾,然而在某些情況下需要任務之間相互共享變量,Apache Spark提供了兩類共享變量,它們分別是:廣播變量(Broadcast Variable)和累加器(Accumulators)。第3.3節將介紹RDD的基本概念和RDD提供的編程接口,并在后面詳細解讀接口的源碼實現,從而加深對RDD的理解,此外會在第3.4節中介紹兩類共享變量的使用方法。
?
3.2 Spark Context
? ?????SparkContext是整個項目程序的入口,無論從本地讀取文件(textfile方法)還是從HDFS讀取文件或者通過集合并行化獲得RDD,都先要創建SparkContext對象,然后使用SparkContext對RDD進行創建和后續的轉換操作。本節主要介紹SparkContext類的作用和創建過程,然后通過一個簡單的例子向讀者介紹SparkContext的應用方法,從應用角度來理解其作用。
?
3.2.1 ?SparkContext的作用
?
SparkContext除了是Spark的主要入口,它也可以看作是對用戶的接口,它代表與Spark集群的連接對象,由圖3-2可以看到,SparkContext主要存在于Driver Program中。可以使用SparkContext來創建集群中的RDD、累積量和廣播量,在后臺SparkContext還能發送任務給集群管理器。每一個JVM只能有運行一個程序,即對應只有一個SparkContext處于激活狀態,因此在創建新的SparkContext前需要把舊的SparkContext停止。
?
圖3-2? SparkContext在Spark架構圖中的位置
3.2.2 SparkContext創建
?
? ? ? ?SparkContext的創建過程首先要加載配置文件,然后創建SparkEnv、TaskScheduler和DAGScheduler,具體過程和源碼分析如下。
1.加載配置文件SparkConf
? ? ? ?SparkConf在初始化時,需先選擇相關的配置參數,包含master、appName、sparkHome、jars、environment等信息,然后通過構造方法傳遞給SparkContext,這里的構造函數有多種表達形式,當SparkContex獲取了全部相關的本地配置信息后開始下一步操作。 def this(master: String, appName: String, conf: SparkConf) = this(SparkContext.updatedConf(conf, master, appName)) def this( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map(), preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { this(SparkContext.updatedConf(newSparkConf(),master,appName,sparkHome,jars,environment)) this.preferredNodeLocationData = preferredNodeLocationData } 2.創建SparkEnv 創建SparkConf后就需要創建SparkEnv,這里面包括了很多Spark執行時的重要組件,包括 MapOutputTracker、ShuffleFetcher、BlockManager等,在這里源碼是通過SparkEnv類的伴生對象SparkEnv Object內的createDriverEnv方法實現的。 private[spark] defcreateDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, mockOutputCommitCoordinator:Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains("spark.driver.host"),"spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"),"spark.driver.port is not set on the driver!") val hostname =conf.get("spark.driver.host") val port =conf.get("spark.driver.port").toInt create( conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, isDriver = true, isLocal = isLocal, listenerBus = listenerBus, mockOutputCommitCoordinator =mockOutputCommitCoordinator ) } <p style="background:#F3F3F3;"> </p>3.創建TaskScheduler
? ? ? ? 創建SparkEnv后,就需要創建SparkContext中調度執行方面的變量TaskScheduler。
private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master) private val heartbeatReceiver = env.actorSystem.actorOf( Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver") @volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) } catch { case e: Exception => { try { stop() } finally { throw new SparkException("Error while constructing DAGScheduler", e) } } } // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor taskScheduler.start()TaskScheduler是依據Spark的執行模式進行初始化的,詳細代碼在SparkContext中的createTaskScheduler方法中。在這里以Standalone模式為例,它會將sc傳遞給TaskSchedulerImpl,然后創建SparkDeploySchedulerBackend并初始化,最后返回Scheduler對象。
case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler)4.創建DAGScheduler
創建TaskScheduler對象后,再將TaskScheduler對象傳至DAGScheduler,用來創建DAGScheduler對象。
@volatile private[spark] var dagScheduler: DAGScheduler = _ try { dagScheduler = new DAGScheduler(this) } catch { case e: Exception => { try { stop() } finally { throw new SparkException("Error while constructing DAGScheduler", e) } } } def this(sc: SparkContext) = this(sc, sc.taskScheduler)創建DAGScheduler后再調用其start()方法將其啟動。以上4點是整個SparkContext的創建過程,這其中包含了很多重要的步驟,從這個過程能理解Spark的初始啟動情況。?
?
3.2.3 使用shell
除了單獨編寫一個應用程序的方式之外,Spark還提供了一個交互式Shell來使用。在Shell中,用戶的每條語句都能在輸入完畢后及時得到結果,而無需手動編譯和運行程序。Shell的使用十分簡單,改變當前工作路徑到Spark的安裝目錄,執行指令$ ./bin/spark-shell即可進入Shell。
在Shell中,系統根據命令提供的參數自動配置和生成了一個SparkContext對象sc,直接使用即可,無需再手動實例化SparkContext。除了結果會實時顯示之外,其余操作與編寫單獨應用程序類似。讀者可直接參考Spark官方提供的Spark ProgrammingGuide等文檔,在此不做具體介紹。
3.2.4 應用實踐
?這里向讀者介紹一段用于統計文件中字母a和字母b出現頻率的Spark應用,通過這個程序向讀者展示SparkContext的用法。
【例3-1】簡單的Spark程序
/* SimpleApp.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // 本地文件目錄 val conf = new SparkConf().setAppName("Simple Application") //給Application命名 val sc = new SparkContext(conf) //創建SparkContext val logData = sc.textFile(logFile, 2).cache() //緩存文件 val numAs = logData.filter(line => line.contains("a")).count() //計算字母a的個數 val numBs = logData.filter(line => line.contains("b")).count() //計算字母b的個數 println("Lines with a: %s, Lines with b: %s".format(numAs, numBs)) //打印結果 }這個例子中,首先創建本地文件目錄logFile和配置文件conf,然后使用配置信息conf實例化SparkContext得到sc,得到sc之后就可以從本地文件中讀取數據并把數據轉化成RDD,并命名為logData,然后logData調用filter方法分別計算包含字母a的行數和包含字母b的行數,最后打印出結果。該例子中使用了SparkContext的實例化對象創建RDD數據集。
?
3.3 RDD簡介
? ? ? ?本節主要介紹彈性分布式數據集RDD的相關概念,其中包括RDD創建來源、兩種重要的Transformation和Action操作、數據持久化和檢查點機制,通過對Spark中RDD核心抽象的深入理解,能幫助讀者全面理解后面的RDD的分區、并行計算和依賴等機制和變換過程。
3.3.1 RDD創建?? ? ? ?RDD是Spark應用程序開發過程中最為基本也最為重要的一類數據結構,RDD被定義為只讀、分區化的記錄集合,更為通俗來講,RDD是對原始數據的進一步封裝,封裝導致兩個結果:第一個結果是數據訪問權限被限制,數據只能被讀,而無法被修改;第二個結果是數據操作功能被強化,使得數據能夠實現分布式存儲、并發處理、自動容錯等等諸多功能。Spark的整個計算過程都是圍繞數據集RDD來進行,下面將會對RDD的創建以及數據結構進行簡單介紹。
1.RDD的兩類來源
??? 1)將未被封裝的原始數據進行封裝操作得到,根據原始數據的存在形式,又可被進一步分成由集合并行化獲得或從外部數據集中獲得。
? ? 2)由其他RDD通過轉換操作獲得,由于RDD的只讀特性,內部的數據無法被修改,因此RDD內部提供了一系列數據轉換(Transformation)操作接口,這類接口可返回新的RDD,而不影響原來的RDD內容。在后面第3章3.3節中將會對RDD的創建方法進行更加詳盡的說明。
2.RDD內部數據結構
1)分區信息的列表
2)對父RDD的依賴信息
3)對Key-Value鍵值對數據類型的分區器(可選)
4)計算分區的函數
5)每個數據分區的物理地址列表(可選)
? ? ? ? RDD的數據操作并非在調用內部接口的一刻便開始計算,而是遇到要求將數據返回給驅動程序,或者寫入到文件的接口時,才會進行真正的計算,這類會觸發計算的操作稱為動作(Action)操作,而這種延時計算的特性,被稱為RDD計算的惰性(Lazy),在第六章機篇將分別講述動作操作和惰性特征。
??? 在第1章中說過,Spark是一套內存計算框架,其能夠將頻繁使用的中間數據存儲在內存當中,數據被使用的頻率越高,性能提升越明顯。數據的內存化操作在RDD層次上,體現為RDD的持久化操作,在3.3.4節描述RDD的持久化操作。除此之外,RDD還提供了類似于持久化操作的檢查點機制,表面看上去與存儲在HDFS的持久化操作類似,實際使用上又有諸多不同,在3.3.5小節描述RDD的檢查點機制。
?
3.3.2 RDD轉換操作
?? ? ? ? 轉換(Transformation)操作是由一個RDD轉換到另一個新的RDD,例如,map操作在RDD中是一個轉換操作,map轉換會讓RDD中的每一個數據都通過一個指定函數得到一個新的RDD。
? ? ? ? RDD內部可以封裝任意類型的數據,但某些操作只能應用在封裝鍵值對類型數據的RDD之上,例如轉換操作reduceByKey、groupByKey和countByKey等。
?????? 表3-1展示了RDD所提供的所有轉換操作及其含義。
表3-1:RDD提供的轉換操作
| Transformation | 算子作用 |
| map(func) | 新RDD中的數據由原RDD中的每個數據通過函數func得到 |
| filter(func) | 新RDD種的數據由原RDD中每個能使函數func返回true值的數據組成 |
| flatMap(func) | 類似于map轉換,但func的返回值是一個Seq對象,Seq中的元素個數可以是0或者多個 |
| mapPartitions(func) | 類似于map轉換,但func的輸入不是一個數據項,則是一個分區,若RDD內數據類型為T,則func必須是Iterator<T> => Iterator<U>類型 |
| mapPartitionsWithIndex(func) | 類似于mapPartitions轉換,但func的數據還多了一個分區索引,即func類型是(Int, Iterator<T> => Iterator<U>) |
| sample(withReplacement, fraction, seed) | 對fraction中的數據進行采樣,可以選擇是否要進行替換,需要提供一個隨機數種子 |
| union(otherDataset) | 新RDD中數據是原RDD與RDD otherDataset中數據的并集 |
| Intersection(otherDataset) | 新RDD中數據是原RDD與RDD otherDataset中數據的交集 |
| distinct([numTasks]) | 新RDD中數據是原RDD中數據去重的結果 |
| groupByKey([numTasks]) | 原RDD中數據類型為(K, V)對,新RDD中數據類型為(K, Iterator(V))對,即將相同K的所有V放到一個迭代器中 |
| reduceByKey(func, [numTasks]) | 原RDD和新RDD數據的類型都為(K, V)對,讓原RDD相同K的所有V依次經過函數func,得到的最終值作為K的V |
| aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) | 原RDD數據的類型為(K, V),新RDD數據的類型為(K, U),類似于groupbyKey函數,但聚合函數由用戶指定。鍵值對的值的類型可以與原RDD不同 |
| sortByKey([ascending], [numTasks]) | 原RDD和新RDD數據的類型為(K, V)鍵值對,新RDD的數據根據ascending的指定順序或者逆序排序 |
| join(otherDataset, [numTasks]) | 原RDD數據的類型為(K, V),otherDataset數據的類型為(K, W),對于相同的K,返回所有的(K, (V, W)) |
| cogroup(otherDataset, [numTasks]) | 原RDD數據的類型為(K, V),otherDataset數據的類型為(K, W),對于相同的K,返回所有的(K, Iterator<V>, Iterator<W>) |
| catesian(otherDataset) | 原RDD數據的類型為為T,otherDataset數據的類型為U,返回所有的(T, U) |
| pipe(command, [envValue]) | 令原RDD中的每個數據以管道的方式依次通過命令command,返回得到的標準輸出 |
| coalesce(numPartitions) | 減少原RDD中分區的數目至指定值numPartitions |
| repartition(numPartitions) | 修改原RDD中分區的數目至指定值numPartitions |
3.3.3 RDD動作操作
?? ? ? ? 相對于轉換,動作(Action)操作用于向驅動(Driver)程序返回值或者將值寫入到文件當中。例如reduce動作會使用同一個指定函數讓RDD中的所有數據做一次聚合,把運算的結果返回。表3-2展示了RDD所提供的所有動作操作及其含義。
表3-2:RDD提供的動作操作
| Action | 算子作用 |
| reduce(func) | 令原RDD中的每個值依次經過函數func,func的類型為(T, T) => T,返回最終結果 |
| collect() | 將原RDD中的數據打包成數組并返回 |
| count() | 返回原RDD中數據的個數 |
| first() | 返回原RDD中的第一個數據項 |
| take(n) | 返回原RDD中前n個數據項,返回結果為數組 |
| takeSample(withReplacement, num, [seed]) | 對原RDD中的數據進行采樣,返回num個數據項 |
| saveAsTextFile(path) | 將原RDD中的數據寫入到文本文件當中 |
| saveAsSequenceFile(path)(Java and Scala) | 將原RDD中的數據寫入到序列文件當中 |
| savaAsObjectFile(path)(Java and Scala) | 將原RDD中的數據序列化并寫入到文件當中。可以通過SparkContext.objectFile()方法加載 |
| countByKey() | 原RDD數據的類型為(K, V),返回hashMap(K, Int),用于統計K出現的次數 |
| foreach(func) | 對于原RDD中的每個數據執行函數func,返回數組 |
?
3.3.4 惰性計算
?? ? ? ? 需要注意的是,一個RDD執行轉換操作之后,數據的計算是延遲的,新生成的RDD會記錄轉換的相關信息,包括父RDD的編號、用戶指定函數等等,但并不會立即執行計算操作,真正的計算操作過程得等到遇到一個動作操作(Action)才會執行,此外,除非用戶指定持久化操作,否則轉換過程中產生的中間數據在計算完畢后會被丟棄,即數據是非持久化。即使對同一個RDD執行相同的轉換操作,數據同樣會被重新計算。
?????? Spark采取惰性計算機制有其道理所在。例如可以實現通過map方法創建的一個新數據集,然后使用reduce方法,最終只返回 reduce 的結果給driver,而不是整個大的新數據集。
?
3.3.5 RDD持久化
? ? ?? ? 惰性計算的缺陷也是明顯的:中間數據默認不會保存,每次動作操作都會對數據重復計算,某些計算量比較大的操作可能會影響到系統的運算效率,因此Spark允許將轉換過程中手動將某些會被頻繁使用的RDD執行持久化操作,持久化后的數據可以被存儲在內存、磁盤或者Tachyon當中,這將使得后續的動作(Actions)變得更加迅速(通常快10倍)。
? ? ? ? 通過調用RDD提供的cache或persist函數即可實現數據的持久化,persist函數需要指定存儲級別(StorageLevel),cache等價于采用默認存儲級別的persist函數,Spark提供的存儲級別及其含義如表3-3所示。在6.4節會繼續討論RDD持久化過程在源碼級別上的實現細節。
表3-3? RDD的存儲級別
| 存儲級別 | 含義 |
| MEMORY_ONLY | 把RDD以非序列化狀態存儲在內存中,如果內存空間不夠,則有些分區數據會在需要的時候進行計算得到 |
| MEMORY_AND_DISK | 把RDD以非序列化存儲在內存中,如果內存空間不夠,則存儲在硬盤中 |
| ? MEMORY_ONLY_SER | 把RDD以Java對象序列化儲存在內存中,序列化后占用空間更小,尤其當使用快速序列化庫(如Kyro[1])時效果更好。缺點是讀數據要反序列化,會消耗CPU計算資源 |
| MEMORY_AND_DISK_SER | 類似MEMORY_ONLY_SER,區別是當內存不夠的時候會把RDD持久化到磁盤中,而不是在需要它們的時候實時計算 |
| DISK_ONLY | 只把RDD存儲到磁盤中 |
| MEMORY_ONLY_2, | 類似MEMORY_ONLY,不同的是會復制一個副本到另一個集群節點 |
| MEMORY_AND_DISK_2, etc. | 類似MEMORY_AND_DISK,不同的是會復制一個副本到另一個集群節點 |
| ? OFF_HEAP | 把RDD以序列化形式存儲在Tachyon中,與MEMORY_ONLY_SER不同的是,使用OFF-HEAP模式會減少垃圾回收的開銷,此外還能讓執行器共享內存,這種模式更適應于多并發和對內存要求高的環境 |
?
3.3.6 RDD檢查點? ? ? ? 因為DAG中血統(lineage)如果太長,當重計算的時候開銷會很大,故使用檢查點機制,將計算過程持久化到磁盤,這樣如果出現計算故障的時候就可以在檢查點開始重計算,而不需要從頭開始。RDD的檢查點(Checkpoint)機制類似持久化機制中的persist(StorageLevel.DISK_ONLY),數據會被存儲在磁盤當中,兩者最大的區別在于:持久化機制所存儲的數據,在驅動程序運行結束之后會被自動清除;檢查點機制則會將數據永久存儲在磁盤當中,如果不手動刪除,數據會一直存在。換句話說,檢查點機制存儲的數據能夠被下一次運行的應用程序所使用。
? ? ? ? 檢查點的使用與持久化類似,調用RDD的checkpoint方法即可。在6.4小節中繼續介紹檢查點機制的實現以及其與持久化過程的區別。
?
3.4 共享變量? ? ? ? 因為在tasks之間讀寫共享變量會很低效,spark提供兩種類型的共享變量類型,即broadcast variables和accumulators。
?
?
3.4.1 廣播變量 廣播變量(Broadcast variables)允許用戶將一個只讀變量緩存到每一臺機器之上,而不像傳統變量一樣,拷貝到每一個任務當中,同一臺機器上的不同任務可以共享該變量值。如例3-1代碼所示,對于變量v,只需要調用SparkContext.broadcast(v)即可得到變量v的廣播變量broadcastVar,通過調用broadcastVar的value方法即可取得變量值。【例3-1】廣播變量的用法
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar:spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) 3.4.2 累加器? ? ? ? 累加器(Accumulators)是另外一種共享變量。累加器變量只能執行加法操作,但其支持并行操作,這意味著不同任務多次對累加器執行加法操作后,加法器最后的值等于所有累加的和。累加器的值只能被驅動程序訪問,集群中的任務無法訪問該值。
【例3-2】累加器的用法
scala> val accum = sc.accumulator(0, "My Accumulator") scala> accum.value() //(通過這種方法進行讀取原始變量值) accum: spark.Accumulator[Int] = 0 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) res2:Int = 10 3.5 Spark核心開發實踐? ? ? ? 本節主要介紹核心開發中RDD的兩個主要操作算子Transformation和Action的使用方法,由于Spark是基于延遲計算,Transforamation算子并不立即執行,這時只是保存計算狀態,當Action算子出現才真正執行計算。為此下面就這兩個算子分別學習主要的API方法和應用實例,如果想了解更多關于RDD的API操作,建議讀者參考拉籌伯大學教授的個人主頁http://homepage.cs.latrobe.edu.au/zhe/。
3.5.1 單值型Tranformation算子
單值型的算子就是輸入為單個值形式,這里主要介紹map、flatMap、mapPartitions、union、cartesian、groupBy、filter、distinct、subtract、foreach、cache、persist、sample以及takeSample方法,如表3-4列出各方法的簡要概述。
表3-4? 單值型Transformation算子
| 方法名 | 方法定義 |
| map | def map[U](f: (T) ? U)(implicit arg0: ClassTag[U]): RDD[U] |
| flatMap | defmapPartitions[U](f: (Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false) |
| mapPartition | def mapPartitions[U](f: (Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] |
| mapPartitionsWith Index | def mapPartitionsWithIndex[U](f: (Int, Iterator[T])??Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U] |
| foreach | def foreach(f: (T)???Unit): Unit |
| foreachPartition | def foreachPartition(f: (Iterator[T])??Unit): Unit |
| glom | def glom(): RDD[Array[T]] |
| union | def union(other: RDD[T]): RDD[T] |
| cartesian | def cartesian[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)] |
| groupBy | def groupBy[K](f: (T)???K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] |
| filter | def filter(f: (T)???Boolean): RDD[T] |
| distinct | def distinct(): RDD[T] |
| subtract | def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] |
| cache | def cache(): RDD.this.type |
| persist | def persist(): RDD.this.type |
| sample | def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] |
| takeSample | def takeSample(withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] |
1.map
? ? ? ? 對原來每一個輸入的RDD數據集進行函數轉換,返回的結果為新的RDD,該方法對分區操作是一對一的。
方法源碼實現: def map[U: ClassTag](f: T =>U): RDD[U] = new MappedRDD(this, sc.clean(f))【例3-3】map方法應用樣例
val a = sc.parallelize(List("bit", "linc", "xwc", "fjg", "wc","spark"), 3) //創建RDD val b = a.map(word => word.length) //計算每個單詞的長度 val c = a.zip(b) //拉鏈方法,把兩列數據對應配對成鍵值對格式 c.collect //把結果轉換為數組 res0: Array[(String, Int)] = Array((bit,3), (linc,4), (xwc,3), (fjg,3), (wc,2),(spark,5))這個例子中map方法從a中依次讀入一個單詞,然后計算單詞長度,把最后計算的長度賦給b,然后因為a和b的長度相同,使用zip方法將a、b中對應元素組成K-V鍵值對形式,最后使用Action算子中的collect方法把鍵值對以數組形式輸出。
圖3-3? map方法應用樣例
2.flatMap
flapMap方法與map方法類似,但是允許在一次map方法中輸出多個對象,而不是map中的一個對象經過函數轉換生成另一個對象。
方法源碼實現:
def flatMap[U: ClassTag](f: T=> TraversableOnce[U]): RDD[U] =new FlatMappedRDD(this, sc.clean(f))
【例3-4】flatMap方法應用樣例
val a = sc.parallelize(1 to 10, 5) //生成從1到10的序列,5個分區 a.flatMap(num => 1 to num).collect //方法的作用是把每一個num映射到從1到num的序列 res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)這個例子先得到從1到10的序列,然后調用flatMap方法對輸入num依次生成從1到num的序列,最后使用collect方法轉換成數組輸出。
3.mapPartitions
mapPartitions是map的另一個實現。map的輸入函數是應用于RDD中每個元素,而mapPartitions的輸入函數是作用于每個分區,也就是把每個分區中的內容作為整體來處理的。
方法源碼實現:
def mapPartitions[U:ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean =false): RDD[U] = {val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(iter)new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)}【例3-5】mapPartitions方法應用樣例
scala> val a = sc.parallelize(1 to 9, 3) scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next res .::= (pre, cur) pre = cur } res.iterator } scala> a.mapPartitions(myfunc).collect res3: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))如圖3-4,這個例子是先得到從1到9的序列,因為有3個分區,所以每個分區數據分別是(1,2,3),(4,5,6)和(7,8,9),然后調用mapPartitions方法,因為scala是函數式編程,函數能作為參數值,所以mapPartition方法輸入參數是myfunc函數。myfunc函數的作用是先構造一個空list集合,輸入單元素集合iter,輸出雙元素Tuple集合,把分區中一個元素和它的下一個元素組成一個Tuple。因為每個分區中最后一個元素沒有下一個元素,所以(3,4)和(6,7)不在結果中。
mapPartitions還有其他的類似實現,比如mapPartitionsWithContext,它能把處理過程中的一些狀態信息傳遞給用戶指定的輸入函數,此外還有mapPartitionsWithIndex,它能把分區中的index信息傳遞給用戶指定的輸入函數,這些其他類似的實現都是基于map方法,只是細節不同,這樣做更方面使用者在不同場景下的應用。
圖3-4? mapPartitions方法應用樣例
4.mapPartitionWithIndex
mapPartitionWithIndex方法與mapPartitions方法功能類似,不同的是mapPartition-WithIndex還會對原始分區的索引進行追蹤,這樣能知道分區所對應的元素,方法的參數為一個函數,函數的輸入為整型索引和迭代器。
方法源碼實現:
def mapPartitionsWithIndex[U:ClassTag](f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning:Boolean = false): RDD[U] = {val func = (context: TaskContext, index:Int, iter: Iterator[T]) => f(index, iter)new MapPartitionsRDD(this, sc.clean(func),preservesPartitioning)}【例3-6】mapPartitionWithIndex方法應用樣例
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = { iter.toList.map(x => index + "," + x).iterator } x.mapPartitionsWithIndex(myfunc).collect() res10: Array[String] = Array(0,1, 0,2, 0,3, 1,4, 1,5, 1,6, 2,7, 2,8, 2,9, 2,10)這個例子中先得到一個名為的x序列,然后調用mapPartitionsWithIndex方法,參數為myfunc函數,這個函數的實現是把輸入經過map方法映射為分區索引加值的形式。結果中的0,1表示分區下標0和第一個輸入值1,后面依次輸出其他分區和對應的值,說明分區數是從下標0開始的。
5.foreach
foreach方法主要是對輸入的數據對象執行循環操作,該方法常用來輸出RDD中的內容。
方法源碼實現:
def foreach(f: T => Unit) {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) =>iter.foreach(cleanF))}【例3-7】foreach方法應用樣例
val c = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 3) c.foreach(x => println(x + " are from BIT")) xwc are from BIT fjg are from BIT wc are from BIT dcp are from BIT zq are from BIT ssn are from BIT mk are from BIT zl are from BIT hk are from BIT lp are from BIT這個方法比較直觀,直接對c變量中的每一個元素對象使用println函數,打印對象內容。
6.foreachPartition
foreachPartition方法的作用是通過迭代器參數對RDD中每一個分區的數據對象應用函數。mapPartitions方法的作用于foreachPartition方法作用非常相似,區別就在于使用的參數是否有返回值。
方法源碼實現:
def foreachPartition(f:Iterator[T] => Unit) {val cleanF = sc.clean(f)sc.runJob(this, (iter: Iterator[T]) =>cleanF(iter))}【例3-8】foreachPartition方法應用樣例
val b = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 3) b.foreachPartition(x => println((a,b) => x.reduce(a + b))) 6 15 24這個例子是將序列b中的每一個元素進行reduce操作,對每個分區中輸入的每一個元素累加,例如對于分區0,輸入1和2相加等于3,然后把上個結果3與下一個輸入3相加就等于6,其他分區的運算與該分區一樣。
7.glom
作用類似collect,但它不知直接將所有RDD直接轉化為數組形式,glom方法的作用是將RDD中分區數據進行組裝到數組類型RDD中,每一個返回的數組包含一個分區的元素,按分區轉化為數組,最后有幾個分區就返回幾個數組類型的RDD。
方法源碼實現:
def glom(): RDD[Array[T]] = newGlommedRDD(this)private[spark] classGlommedRDD[T: ClassTag](prev: RDD[T])extends RDD[Array[T]](prev) {overridedef getPartitions: Array[Partition] = firstParent[T].partitionsoverridedef compute(split: Partition, context: TaskContext) =Array(firstParent[T].iterator(split,context).toArray).iterator}【例3-9】glom方法應用樣例
val a = sc.parallelize(1 to 99, 3) a.glom.collect res5: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33), Array(34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66), Array(67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99))這個例子很簡潔,在執行glom方法后就調用collect方法獲得Array數組并輸出,可以看出a.glom方法輸出的是三個數組組成的RDD,其中每個數組代表一個分區數據。
8.union
union方法(等價于“++”)是將兩個RDD取并集,取并集過程中不會把相同元素去掉。union操作是輸入分區與輸出分區多對一模式,如圖所示。
方法源碼實現:
def union(other: RDD[T]):RDD[T] = new UnionRDD(sc, Array(this, other))class UnionRDD[T: ClassTag](sc: SparkContext,var rdds: Seq[RDD[T]])extends RDD[T](sc, Nil) {overridedef getPartitions: Array[Partition] = {val array = newArray[Partition](rdds.map(_.partitions.size).sum)var pos = 0for ((rdd, rddIndex) <- rdds.zipWithIndex;split <- rdd.partitions) {array(pos) = new UnionPartition(pos,rdd, rddIndex, split.index)pos += 1}array}overridedef getDependencies: Seq[Dependency[_]] = {valdeps = new ArrayBuffer[Dependency[_]]varpos = 0for(rdd <- rdds) {deps += new RangeDependency(rdd, 0, pos,rdd.partitions.size)pos += rdd.partitions.size}deps}override def compute(s: Partition, context:TaskContext): Iterator[T] = {valpart = s.asInstanceOf[UnionPartition[T]]parent[T](part.parentRddIndex).iterator(part.parentPartition,context)}overridedef getPreferredLocations(s: Partition): Seq[String] =s.asInstanceOf[UnionPartition[T]].preferredLocations()overridedef clearDependencies() {super.clearDependencies()rdds= null}}【例3-10】union方法應用樣例
val a = sc.parallelize(1 to 4, 2) val b = sc.parallelize(2 to 4, 1) (a ++ b).collect res4: Array[Int] = Array(1, 2, 3, 4, 2, 3, 4)如圖3-5可見,這個例子先創建2個RDD變量a和b,然后對a與b使用union方法,返回兩個RDD并集的結果。
圖3-5? union方法應用樣例
9.cartesian
計算兩個RDD中每個對象的笛卡爾積(例如第一個RDD中的每一個對象與第二個RDD中的對象join連接),但使用該方法時要注意可能出現內存不夠的情況。
方法源碼實現:
def cartesian[U:ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)class CartesianRDD[T: ClassTag,U: ClassTag](sc: SparkContext,var rdd1 : RDD[T],var rdd2 : RDD[U])extends RDD[Pair[T, U]](sc, Nil)with Serializable {valnumPartitionsInRdd2 = rdd2.partitions.sizeoverridedef getPartitions: Array[Partition] = {//create the cross product split val array = newArray[Partition](rdd1.partitions.size * rdd2.partitions.size)for(s1 <- rdd1.partitions; s2 <- rdd2.partitions) {val idx = s1.index * numPartitionsInRdd2+ s2.indexarray(idx) = new CartesianPartition(idx,rdd1, rdd2, s1.index, s2.index)}array}overridedef getPreferredLocations(split: Partition): Seq[String] = {valcurrSplit = split.asInstanceOf[CartesianPartition](rdd1.preferredLocations(currSplit.s1)++ rdd2.preferredLocations(currSplit.s2)).distinct}overridedef compute(split: Partition, context: TaskContext) = {valcurrSplit = split.asInstanceOf[CartesianPartition]for (x <- rdd1.iterator(currSplit.s1,context);y <- rdd2.iterator(currSplit.s2,context)) yield (x, y)}overridedef getDependencies: Seq[Dependency[_]] = List(new NarrowDependency(rdd1) {def getParents(id: Int): Seq[Int] = List(id/ numPartitionsInRdd2)},new NarrowDependency(rdd2) {def getParents(id: Int): Seq[Int] =List(id % numPartitionsInRdd2)})override def clearDependencies() {super.clearDependencies()rdd1 = nullrdd2 = null}}【例3-11】cartesian方法應用樣例
val x =sc.parallelize(List(1,2,3),1) val y =sc.parallelize(List(4,5),1) x.cartesian(y).collect res0: Array[(Int, Int)] =Array((1,4),(1,5),(2,4),(2,5),(3,4),(3,5))例子中x是第一個RDD,其中的每個元素都跟y中元素進行連接,如果第一個RDD有m個元素,第二個RDD中元素n個,則求笛卡爾積后總元素為m×n個,本例結果為6個,如圖3-6所示。
圖3-6? cartesian方法應用樣例
10.groupBy
groupBy方法有三個重載方法,功能是將元素通過map函數生成Key-Value格式,然后使用reduceByKey方法對Key-Value對進行聚合,具體可參考源碼實現。
方法源碼實現:
def groupBy[K](f: T => K, p:Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = {valcleanF = sc.clean(f) //對用戶函數預處理this.map(t=> (cleanF(t), t)).groupByKey(p) //對數據進行map操作,生成Key-Value對,再聚合 }def groupBy[K](f: T =>K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =groupBy[K](f, defaultPartitioner(this)) //使用默認分區器 def groupBy[K](f: T => K,numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =groupBy(f, new HashPartitioner(numPartitions)) //使用hash分區器,分區數自定義【例3-12】groupBy方法應用樣例
(1)val a = sc.parallelize(1 to 9, 3) a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect res42: Array[(String, Seq[Int])] = Array((even,ArrayBuffer(2, 4, 6, 8)), (odd,ArrayBuffer(1, 3, 5, 7, 9))) (2)val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc).collect res3: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9))) (3)val a = sc.parallelize(1 to 9, 3) def myfunc(a: Int) : Int = { a % 2 } a.groupBy(myfunc(_), 1).collect res7: Array[(Int, Seq[Int])] = Array((0,ArrayBuffer(2, 4, 6, 8)), (1,ArrayBuffer(1, 3, 5, 7, 9)))第一個例子中是單個參數,調用groupBy方法,結果集的key只有兩種,即even和odd,然后對相同的key進行聚合得到最終結果。第二個例子和第三個例子本質一樣,只是使用的重載方法不同。
11.filter
filter方法通過名稱就能猜出來功能,其實就是對輸入元素進行過濾,參數是一個返回值為boolean的函數,如果函數對輸入元素運算結果為true,則通過該元素,否則將該元素過濾,不能進入結果集。
方法源碼實現:
def filter(f: T => Boolean):RDD[T] = new FilteredRDD(this, sc.clean(f))【例3-13】filter方法應用樣例
(1)val a = sc.parallelize(1 to 10, 3) val b = a.filter(x => x % 3 == 0) b.collect res3: Array[Int] = Array(3, 6, 9) (2)val b = sc.parallelize(1 to 8) b.filter(x => x < 4).collect res15: Array[Int] = Array(1, 2, 3) (3)val a = sc.parallelize(List("cat", "horse", 4.0, 3.5, 2, "dog")) a.filter(_ < 4).collect <console>:15: error: value < is not a member of Any第一個和第二個例子比較好理解,因為a中元素都是整型,可以順利進行比較,但第三個例子會報錯,因為a中有部分對象不能與整數比較,如果使用scala中的偏函數就可以解決混合數據類型的問題。
12.distinct
將RDD中重復的元素去掉,只留下唯一的RDD元素。
方法源碼實現:
def distinct(): RDD[T] =distinct(partitions.size)def distinct(numPartitions:Int)(implicit ord: Ordering[T] = null): RDD[T] =map(x => (x,null)).reduceByKey((x, y) => x, numPartitions).map(_._1)【例3-14】distinct方法應用樣例
(1)val c = sc.parallelize(List("Gnu", "Cat", "Rat", "Dog", "Gnu", "Rat"), 2) c.distinct.collect res6: Array[String] = Array(Dog, Gnu, Cat, Rat) (2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) a.distinct(2).partitions.length res16: Int = 2 (3)a.distinct(3).partitions.length res17: Int = 3這個例子就是把RDD中的元素map為Key-Value對形式,然后使用reduceByKey將重復Key合并,也就是把重復元素刪除,只留下唯一的元素。此外distinct有一個重載方法需要一個參數,這個參數就是分區數numPartitions,從例子中看出使用帶參的distinct方法不僅能刪除重復元素,而且還能對結果重新分區。
13.subtract
subtract的含義就是求集合A-B的差,即把集合A中包含集合B的元素都刪除,結果是剩下的元素。
方法源碼實現:
def subtract(other: RDD[T], p:Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = {if (partitioner == Some(p)) {val p2 = newPartitioner() {override def numPartitions = p.numPartitionsoverridedef getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)}// Unfortunately, since we're making a new p2,we'll get ShuffleDependencies//anyway, and when calling .keys, will not have a partitioner set, even though// the SubtractedRDD will, thanks to p2'sde-tupled partitioning, already be//partitioned by the right/real keys (e.g. p).this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p2).keys} else {this.map(x=> (x, null)).subtractByKey(other.map((_, null)), p).keys}}【例3-15】subtract方法應用樣例
val a = sc.parallelize(1 to 9, 3) val b = sc.parallelize(1 to 3, 3) val c = a.subtract(b) c.collect res3: Array[Int] = Array(6, 9, 4, 7, 5, 8)這個例子就是把a中包含b中的元素都刪除掉,底層實現使用subtractByKey,也就是根據鍵值對中的Key來刪除a包含的b元素。
14.persist,cache
cache方法顧名思義,是緩存數據,其作用是把RDD緩存到內存中,以方便下一次計算被再次調用。
方法源碼實現:
def cache(): this.type = persist()【例3-16】cache方法應用樣例
val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"),1) c.cache res11: c.type = ParallelCollectionRDD[10] at parallelize at <console>:21這個例子就是直接把RDD緩存在內存中。
15.persist
persist方法的作用是把RDD根據不同的級別進行持久化,通過使用帶參數方法能指定持久化級別,如果不帶參數則為默認持久化級別,即只保存到內存,與cache等價。
【例3-17】persist方法應用樣例
val a = sc.parallelize(1 to 9, 3) a.persist(StorageLevel.MEMORY_ONLY)這個例子使用persist方法,指定持久化級別為MEMORY_ONLY,該級別等價于cache方法。
16.sample
sample的作用是隨機的對RDD中的元素采樣,獲得一個新的子集RDD,根據參數能指定是否又放回采樣、子集占總數的百分比和隨機種子。
方法源碼實現:
def sample(withReplacement:Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T] = {require(fraction >= 0.0, "Negativefraction value: " + fraction)if (withReplacement) {new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction), true, seed)} else {new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction), true, seed)}}【例3-18】sample方法應用樣例
(1)val a = sc.parallelize(1 to 1000, 2) a.sample(false, 0.1, 0).collect res4: Array[Int] = Array(3, 21, 22, 27, 48, 50, 57, 80, 88, 90, 97, 113, 126, 130, 135, 145, 162, 169, 182, 230, 237, 242, 267, 271, 287, 294, 302, 305, 324, 326, 330, 351, 361, 378, 383, 384, 409, 412, 418, 432, 433, 485, 493, 497, 502, 512, 514, 521, 522, 531, 536, 573, 585, 595, 615, 617, 629, 640, 642, 647, 651, 664, 671, 673, 684, 692, 707, 716, 718, 721, 723, 736, 738, 756, 759, 788, 799, 827, 828, 833, 872, 898, 899, 904, 915, 916, 919, 927, 929, 951, 969, 980) (2)val a = sc.parallelize(1 to 100, 2) a.sample(true, 0.3, 0).collect res5: Array[Int] = Array(1, 1, 9, 18, 18, 24, 26, 29, 32, 34, 37, 38, 42, 43, 45, 51, 54, 56, 60, 65, 67, 70, 73, 74, 74, 75, 85, 86, 95, 99)上述例子中第一個參數withReplacement為true時使用放回抽樣(泊松抽樣[1]),為false時使用不放回抽樣(伯努利抽樣),第二個參數fraction是百分比,第三個參數seed是種子,也就是隨機取值的起源數字。從例子中還看出當選擇放回抽樣時,取出的元素中會出現重復值。
3.5.2 鍵值對型Transformation算子
RDD的操作算子除了單值型還有鍵值對(Key-Value)型。這里開始介紹鍵值對型的算子,主要包括groupByKey、combineByKey、reduceByKey、sortByKey、cogroup和join,如表3-5所示。
表3-5?鍵值對型Transformation算子
| 方法名 | 方法定義 |
| groupByKey | def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] |
| combineByKey | def combineByKey[C](createCombiner: V => C, mergeValue: (C,V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] |
| reduceByKey | def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] |
| sortByKey | def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] |
| cogroup | def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD |
| join | def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] |
?
1.groupByKey
類似groupBy方法,作用是把每一個相同Key值的的Value聚集起來形成一個序列,可以使用默認分區器和自定義分區器,但是這個方法開銷比較大,如果想對同一Key進行Value的聚合或求平均,則推薦使用aggregateByKey或者reduceByKey。
方法源碼實現:
def groupByKey(numPartitions:Int): RDD[(K, Iterable[V])] = {groupByKey(new HashPartitioner(numPartitions))}def groupByKey(partitioner:Partitioner): RDD[(K, Iterable[V])] = {//groupByKey不應該使用map端的combine操作,因為map端并不會減少shuffle的數據,還要求//所有map端的數據都插入hash表中,導致很多對象進入內存中的老年代。 val createCombiner = (v: V) =>CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v:V) => buf += vval mergeCombiners = (c1: CompactBuffer[V],c2: CompactBuffer[V]) => c1 ++= c2val bufs = combineByKey[CompactBuffer[V]](createCombiner, mergeValue,mergeCombiners, partitioner, mapSideCombine=false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}【例3-19】groupByKey方法應用樣例
val a = sc.parallelize(List("mk", "zq", "xwc", "fjg", "dcp", "snn"), 2) val b = a.keyBy(x => x.length) // keyBy方法調用map(x => (f(x),x))生成鍵值對 b.groupByKey.collect res6: Array[(Int, Iterable[String])] = Array((2,CompactBuffer(mk, zq)), (3,CompactBuffer(xwc, fjg, dcp, snn)))這個例子先創建包含List集合對象的RDD,然后使用keyBy方法生成Key-Value鍵值對,然后調用groupByKey方法將相同Key的Value聚合,最后調用collect方法以數組形式輸出。
?
圖3-7? groupByKey方法應用樣例
2.combineByKey
combineByKey方法能高效的將鍵值對形式的RDD按相同的Key把Value合并成序列形式,用戶能自定義RDD的分區器和是否在map端進行聚合操作。
方法源碼實現:
defcombineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = {combineByKey(createCombiner, mergeValue,mergeCombiners, defaultPartitioner(self))}defcombineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C, //輸入2個不同類型參數,返回其中一個類型參數 mergeCombiners: (C, C) => C, //輸入2個同類型參數,返回一個參數 numPartitions: Int): RDD[(K, C)] = {combineByKey(createCombiner,mergeValue, mergeCombiners, new HashPartitioner(numPartitions))}def combineByKey[C](createCombiner:V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null): RDD[(K, C)]= {require(mergeCombiners!= null, "mergeCombiners must be defined") // required as of Spark0.9.0if(keyClass.isArray) {if(mapSideCombine) {throw new SparkException("Cannot usemap-side combining with array keys.")}if(partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](self.context.clean(createCombiner),self.context.clean(mergeValue),self.context.clean(mergeCombiners))if (self.partitioner == Some(partitioner)){self.mapPartitions(iter => {val context = TaskContext.get()new InterruptibleIterator(context,aggregator.combineValuesByKey(iter, context))}, preservesPartitioning = true)} else {new ShuffledRDD[K, V, C](self,partitioner).setSerializer(serializer).setAggregator(aggregator).setMapSideCombine(mapSideCombine)}}【例3-20】combineByKey方法應用樣例
val a = sc.parallelize(List("xwc", "fjg", "wc", "dcp", "zq", "snn", "mk", "zl", "hk", "lp"), 2) val b = sc.parallelize(List(1,2,2,3,2,1,2,2,2,3), 2) val c = b.zip(a) //把a和b中對應元素組合成鍵值對,如Array((1,xwc), (3,fjg), (2,wc), (3,dcp)... val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String] = x ::: y) d.collect res13: Array[(Int, List[String])] = Array((2,List(zq, wc, fjg, hk, zl, mk)), (1,List(xwc, snn)), (3,List(dcp, lp)))在使用zip方法得到鍵值對序列c后調用combineByKey,把相同Key的value進行合并到List中。這個例子中使用三個參數的重載方法,該方法第一個參數是createCombiner,作用是把元素V轉換到另一類型元素C,該例子中使用的是List(_),表示將輸入的元素放在list集合中;mergeValue的含義是把元素V合并到元素C中,在該例子中使用的是函數是x:List[String],y:String) => y :: x,表示將y字符串合并到x鏈表集合中;mergeCombiners含義是將兩個C元素合并,在該例子中使用的是x:List[String], y:List[String]= x ::: y,表示把x鏈表集合中的內容合并到y鏈表集合中。
3.reduceByKey
使用一個reduce函數來實現對相同Key的Value的聚集操作,在發送結果給reduce前會在map端的執行本地merge操作。該方法的底層實現就是調用combineByKey方法的一個重載方法。
方法源碼實現:
def reduceByKey(partitioner:Partitioner, func: (V, V) => V): RDD[(K, V)] = {combineByKey[V]((v: V) => v, func, func,partitioner)}def reduceByKey(func: (V, V)=> V, numPartitions: Int): RDD[(K, V)] = {reduceByKey(newHashPartitioner(numPartitions), func)}def reduceByKey(func: (V, V)=> V): RDD[(K, V)] = {reduceByKey(defaultPartitioner(self), func)}【例3-21】reduceByKey方法應用樣例
(1)val a = sc.parallelize(List("dcp", "fjg", "snn", "wc", "zq"), 2) val b = a.map(x => (x.length, x)) b.reduceByKey((a,b) => a + b).collect res22: Array[(Int, String)] = Array((2,wczq), (3,dcpfjgsnn)) (2)val a = sc.parallelize(List(3,12,124,32,5 ), 2) val b = a.map(x => (x.toString.length, x)) b.reduceByKey(_ + _).collect res24: Array[(Int, Int)] = Array((2,44), (1,8), (3,124))這個例子先用map方法映射出鍵值對,然后調用reduceByKey方法對相同Key的Value值進行累加。例子中第一個是使用字符串,故使用聚合相加后是字符串的合并;第二個例子使用的是數字,結果是對應Key的Value數字相加。
4.sortByKey
這個函數會根據Key值對鍵值對進行排序,如果Key是字母,則按字典順序排序,如果Key是數字,則從小到大排序(或從大到小),該方法的第一個參數控制是否為升序排序,當為true時是升序,反之為降序。
方法源碼實現:
def sortByKey(ascending: Boolean = true,numPartitions: Int = self.partitions.size) : RDD[(K, V)] ={val part = new RangePartitioner(numPartitions,self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if (ascending) orderingelse ordering.reverse)}【例3-22】sortByKey方法應用樣例
val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2) val b = sc.parallelize(1 to a.count.toInt, 2) //a.count得到單詞的字母個數 val c = a.zip(b) c.sortByKey(true).collect res74: Array[(String, Int)] = Array((ant,5), (cat,2), (dog,1), (gnu,4), (owl,3)) c.sortByKey(false).collect res75: Array[(String, Int)] = Array((owl,3), (gnu,4), (dog,1), (cat,2), (ant,5))這個例子先通過zip方法得到包含鍵值對的變量c,然后演示了sortByKey方法中參數為true和false時的計算結果。本例中的key是字符串,故可以看出當Key為true時,結果是按Key的字典順序升序輸出,反之則為降序輸出結果;當key為數字的時候,則按大小排列。
5.cogroup
cogroup是一個比較高效的函數,能根據Key值聚集最多3個鍵值對的RDD,把相同Key值對應的Value聚集起來。
方法源碼實現:
//參數為一個RDD情況 def cogroup[W](other: RDD[(K,W)], partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W]))] = {if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other), partitioner)cg.mapValues { case Array(vs, w1s) =>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W]])}}//參數為兩個RDD情況 def cogroup[W1, W2](other1:RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2]))] = {if(partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other1, other2), partitioner)cg.mapValues { case Array(vs, w1s, w2s)=>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W1]],w2s.asInstanceOf[Iterable[W2]])}}//參數為3個RDD情況 def cogroup[W1, W2, W3](other1:RDD[(K, W1)],other2: RDD[(K, W2)],other3: RDD[(K, W3)],partitioner: Partitioner): RDD[(K, (Iterable[V], Iterable[W1],Iterable[W2], Iterable[W3]))] = {if (partitioner.isInstanceOf[HashPartitioner]&& keyClass.isArray) {throw new SparkException("Defaultpartitioner cannot partition array keys.")}val cg = new CoGroupedRDD[K](Seq(self,other1, other2, other3), partitioner)cg.mapValues { case Array(vs, w1s, w2s, w3s)=>(vs.asInstanceOf[Iterable[V]],w1s.asInstanceOf[Iterable[W1]],w2s.asInstanceOf[Iterable[W2]],w3s.asInstanceOf[Iterable[W3]])}}【例3-23】cogroup方法應用樣例
(1)val a =sc.parallelize(List(1,2,2 ,3, 1, 3), 1) val b =a.map(x => (x, "b")) val c =a.map(y => (y, "c")) b.cogroup(c).collect res25:Array[(Int, (Iterable[String], Iterable[String]))] = Array((1,(CompactBuffer(b,b), CompactBuffer(c,c))), (3,(CompactBuffer(b, b),CompactBuffer(c, c))), (2,(CompactBuffer(b, b),CompactBuffer(c, c)))) (2)val d = a.map(m => (m,"x")) b.cogroup(c,d).collect res26:Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =Array((1,(CompactBuffer(b,b) ,CompactBuffer(c,c),CompactBuffer(x, x))), (3,(CompactBuffer(b, b),CompactBuffer(c, c), CompactBuffer(x,x))), (2,(CompactBuffer(b, b),CompactBuffer(c, c),CompactBuffer(x, x))))例子中有兩個個小例子,依次是單個參數和兩個參數的情況,使用cogroup方法對單個RDD和2個RDD進行聚集操作。
6.join
對鍵值對的RDD進行cogroup操作,然后對每個新的RDD下Key的值進行笛卡爾積操作,再對返回結果使用flatMapValues方法,最后返回結果。
方法源碼實現:
def join[W](other: RDD[(K, W)],partitioner: Partitioner): RDD[(K, (V, W))] = {this.cogroup(other,partitioner).flatMapValues( pair =>for (v <- pair._1.iterator; w <-pair._2.iterator) yield (v, w))}【例3-24】join方法應用樣例
val a = sc.parallelize(List("fjg", "wc", "xwc","dcp"), 2) val b = a.keyBy(_.length) //得到諸如(3,"fjg"),(2,"wc")的鍵值對序列 val c = sc.parallelize(List("fjg", "wc", "snn", "zq", "xwc","dcp"), 2) val d = c.keyBy(_.length) b.join(d).collect res29: Array[(Int, (String, String))] = Array((2,(wc,wc)), (2,(wc,zq)), (3,(fjg,fjg)), (3,(fjg,snn)), (3,(fjg,xwc)), (3,(fjg,dcp)), (3,(xwc,fjg)), (3,(xwc,snn)), (3,(xwc,xwc)), (3,(xwc,dcp)), (3,(dcp,fjg)), (3,(dcp,snn)), (3,(dcp,xwc)), (3,(dcp,dcp)))這個例子先構造兩個包含鍵值對元素的變量b和d,然后調用join方法,得到join后的結果,根據源碼實現,join方法本質是cogroup方法和flatMapValues方法的組合,其中cogroup方法得到聚合值,flatMapValues方法實現的是笛卡爾積,笛卡爾積的過程是在各個分區內進行,如例子中的Key等于2分區,wc與(wc,zq)求笛卡爾積,得到(2,(wc,wc))和(2,(wc,zq))的結果。
?
圖3-8? join方法應用樣例
3.5.3 Action算子
當Spark的計算模型中出現Action算子時才會執行提交作業的runJob動作,這時會觸發后續的DAGScheduler和TaskScheduler工作。這里主要講解常用的Action算子,有collect、reduce、take、top、count、takeSample、saveAsTextFile、countByKey、aggregate,具體方法和定義如表3-6所示。
表3-6? Action算子
| 方法名 | 方法定義 |
| collect | def collect(): Array[T] |
| reduce | def reduce(f: (T, T) => T): T |
| take | def take(num: Int): Array[T] |
| top | def top(num: Int)(implicit ord: Ordering[T]): Array[T] |
| count | def count(): Long |
| takeSample | def takeSample(withReplacement: Boolean,num: Int,seed: Long = Utils.random.nextLong): Array[T] |
| saveAsTextFile | def saveAsTextFile(path: String) |
| countByKey | def countByKey(): Map[K, Long] |
| aggregate | def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U |
1.collect
collect方法的作用是把RDD中的元素以數組的方式返回。
方法源碼實現:
def collect(): Array[T] = {val results = sc.runJob(this, (iter:Iterator[T]) => iter.toArray)Array.concat(results: _*)}【例3-25】collect方法應用樣例
val c = sc.parallelize(List("a", "b", "c", "d", "e", "f"), 2) c.collect res29: Array[String] = Array(a, b, c, d, e, f)這個例子直接把RDD中的元素轉換成數組返回。
2.reduce
reduce方法使用一個帶兩個參數的函數把元素進行聚集,返回一個元素結果,注意該函數中的二元操作應該滿足交換律和結合律,這樣才能在并行系統中正確計算。
方法源碼實現:
def reduce(f: (T, T) => T): T = { //輸入是兩個參數的函數,返回一個值 val cleanF = sc.clean(f)val reducePartition: Iterator[T] =>Option[T] = iter => {if (iter.hasNext) {Some(iter.reduceLeft(cleanF))} else {None}}var jobResult: Option[T] = Noneval mergeResult = (index: Int, taskResult:Option[T]) => {if (taskResult.isDefined) {jobResult = jobResult match {case Some(value) => Some(f(value,taskResult.get))case None => taskResult}}}sc.runJob(this, reducePartition, mergeResult)//獲得Option的最后結果,或者當RDD為空時拋出異常 jobResult.getOrElse(throw newUnsupportedOperationException("empty collection"))}【例3-26】reduce方法應用樣例
val a = sc.parallelize(1 to 10) a.reduce((a,b)=> a + b) res41: Int = 55這個例子使用簡單的函數將輸入的元素相加,過程是先輸入前兩個元素相加,然后將得到的結果與下一個輸入元素相加,依次規則計算出所有元素的和。
3.take
take方法會從RDD中取出前n[1]個元素。方法是先掃描一個分區并后從分區中得到結果,然后評估得到的結果是否達到取出元素個數,如果沒達到則繼續從其他分區中掃描獲取。
方法源碼實現:
def take(num: Int): Array[T] = {if (num == 0) {return new Array[T](0)}val buf = new ArrayBuffer[T]val totalParts = this.partitions.lengthvar partsScanned = 0while (buf.size < num &&partsScanned < totalParts) {// numPartsToTry表示在這個迭代中嘗試的分區數,這個數可以比總分區數大,因為在runJob中的總分區會限定它的值。 var numPartsToTry = 1if (partsScanned > 0) {//如果沒有在之前的迭代中找到記錄,則會重復尋找(次數翻四倍),此外還會調整分 區數,最多調整漲幅不超過50%if (buf.size == 0) {numPartsToTry = partsScanned * 4} else {// the left side of max is >=1whenever partsScanned >= 2 numPartsToTry = Math.max((1.5 * num *partsScanned / buf.size).toInt - partsScanned, 1)numPartsToTry =Math.min(numPartsToTry, partsScanned * 4)}}val left = num - buf.sizeval p = partsScanned untilmath.min(partsScanned + numPartsToTry, totalParts)val res = sc.runJob(this, (it:Iterator[T]) => it.take(left).toArray, p, allowLocal = true)res.foreach(buf ++= _.take(num -buf.size))partsScanned += numPartsToTry}buf.toArray}【例3-27】take方法應用樣例
(1) val b = sc.parallelize(List("a", "b", "c", "d", "e"), 2) b.take(2) res18: Array[String] = Array(a, b) (2) val b = sc.parallelize(1 to 100, 5) b.take(30) res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21,22, 23, 24, 25, 26, 27, 28, 29, 30)這里例子分別演示了字母和數字情況,其實工作原理都相同,即從分區中按先后順序拿元素出來。
4.top
top方法會利用隱式排序轉換方法(見實現源碼中implicit修飾的方法)來獲取最大的前n個元素。
方法源碼實現:
def top(num: Int)(implicit ord:Ordering[T]): Array[T] = takeOrdered(num)(ord.reverse)def takeOrdered(num:Int)(implicit ord: Ordering[T]): Array[T] = {if (num == 0) {Array.empty} else {val mapRDDs = mapPartitions { items =>// Priority keeps the largest elements,so let's reverse the ordering. val queue = newBoundedPriorityQueue[T](num)(ord.reverse)queue ++=util.collection.Utils.takeOrdered(items, num)(ord)Iterator.single(queue)}if (mapRDDs.partitions.size == 0) {Array.empty} else {mapRDDs.reduce { (queue1, queue2) =>queue1 ++= queue2queue1}.toArray.sorted(ord) }}}【例3-28】top方法應用樣例
val c = sc.parallelize(Array(1, 3, 2,4, 9, 2,11,5), 3) c.top(3) res10:Array[Int] = Array(11, 9, 5)例子顯示了top的使用方法,很簡潔,直接輸入元素個數作為參數就能得到前n個元素的值。
5.count
count方法計算并返回RDD中元素的個數。
方法源碼實現:
def count(): Long =sc.runJob(this, Utils.getIteratorSize _).sumdef runJob[T, U: ClassTag](rdd:RDD[T], func: Iterator[T] => U): Array[U] = {runJob(rdd, func, 0 until rdd.partitions.size,false)}【例3-29】count方法應用樣例
val c = sc.parallelize(Array(1,3, 2,4, 9, 2,11,5), 2) c.count res3: Long = 86.takeSample
takeSample方法返回一個固定大小的數組形式的采樣子集,此外還把返回的元素順序隨機打亂,方法的三個參數含義依次是否放回數據、返回取樣的大小和隨機數生成器的種子。
方法源碼實現:
def takeSample(withReplacement:Boolean,num: Int,seed: Long = Utils.random.nextLong):Array[T] = {val numStDev = 10.0if (num < 0) {throw newIllegalArgumentException("Negative number of elements requested")} else if (num == 0) {return new Array[T](0)}val initialCount = this.count()if (initialCount == 0) {return new Array[T](0)}val maxSampleSize = Int.MaxValue -(numStDev * math.sqrt(Int.MaxValue)).toIntif (num > maxSampleSize) {throw new IllegalArgumentException("Cannotsupport a sample size > Int.MaxValue - " +s"$numStDev *math.sqrt(Int.MaxValue)")}val rand = new Random(seed)if (!withReplacement && num >=initialCount) {returnUtils.randomizeInPlace(this.collect(), rand)}valfraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,withReplacement)var samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()//如果采樣容量不夠大,則繼續采樣 var numIters = 0while (samples.length < num) {logWarning(s"Needed to re-sample dueto insufficient sample size. Repeat #$numIters")samples = this.sample(withReplacement,fraction, rand.nextInt()).collect()numIters += 1}Utils.randomizeInPlace(samples,rand).take(num)}【例3-30】takeSample方法應用樣例
val x = sc.parallelize(1 to 100, 2) x.takeSample(true, 30, 1) res13: Array[Int] = Array(72, 37, 96, 47, 40, 96, 57, 100, 8, 44, 82, 11, 32, 47, 99, 94, 37, 97,52, 41, 100, 78, 93, 11, 6, 100, 75, 14, 47, 16)這個例子直接使用takeSample方法,得到30個固定數字的樣本,采取有放回抽樣的方式。
7.saveAsTextFile
把RDD存儲為文本文件,一次存一行。
方法源碼實現:
def saveAsTextFile(path:String) { val nullWritableClassTag =implicitly[ClassTag[NullWritable]]val textClassTag =implicitly[ClassTag[Text]]val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)}def saveAsTextFile(path:String, codec: Class[_ <: CompressionCodec]) { //參數可選擇壓縮方式// 參考https://issues.apache.org/jira/browse/SPARK-2075 val nullWritableClassTag =implicitly[ClassTag[NullWritable]]val textClassTag =implicitly[ClassTag[Text]]val r = this.map(x =>(NullWritable.get(), new Text(x.toString)))rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)}【例3-31】saveAsTextFile方法應用樣例
val a = sc.parallelize(1 to 100, 3) a.saveAsTextFile("BIT_Spark") //控制臺打印出的部分日志 15/08/04 10:27:58 INFO FileOutputCommitter: Saved output of task 'attempt_201508041027_0001 _m_000002_5' to file:/home/hadoop/spark/bin/BIT_Spark //在當前路徑下可以看到輸出3個文件part-***,原因是RDD有3個分區,每個分區默認輸出一個文件,SUCCESS文件執行表示成功。 hadoop@master:~/spark/bin/BIT_Spark$ ls part-00000 part-00001 part-00002 _SUCCESS hadoop@master:~/spark/bin/BIT_Spark$ vim part-00000 //查看第一個分區文件的內容 1 2 3 4 58.countByKey
類似count方法,不同的是countByKey方法會根據相同的Key計算其對應的Value個數,返回的是map類型的結果。
方法源碼實現:
def countByKey(): Map[K, Long]= self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap【例3-32】countByKey方法應用樣例
val a = sc.parallelize(List((1, "bit"), (2, "xwc"), (2, "fjg"), (3, "wc"),(3, "wc"),(3, "wc")), 2) a.countByKey res3: scala.collection.Map[Int,Long] = Map(1 -> 1, 2 -> 2,3 -> 3)這個例子先構造鍵值對變量a,然后使用countByKey方法對相同Key的Value進行統計,過程是先調用mapValue方法把Value映射為1,再reduceByKey到Key和其對應Value的個數。
9.aggregate
aggregate方法先將每個分區里面的元素進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致。
aggregate有兩個函數seqOp和combOp,這兩個函數都是輸入兩個參數,輸出一個參數,其中seqOp函數可以看成是reduce操作,combOp函數可以看成是第二個reduce操作(一般用于combine各分區結果到一個總體結果),由定義,combOp操作的輸入和輸出類型必須一致。
方法源碼實現:
def aggregate[U:ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {//克隆zero值,因為這個值也會被序列化 var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp)val cleanCombOp = sc.clean(combOp)val aggregatePartition = (it: Iterator[T])=> it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)val mergeResult = (index: Int, taskResult:U) => jobResult = combOp(jobResult, taskResult)sc.runJob(this, aggregatePartition,mergeResult)jobResult}【例3-33】aggregate方法應用樣例
/ 分區0的reduce操作是max(0, 2,3) = 3 // 分區1的reduce操作是max(0, 4,5) = 5 // 分區2的reduce操作是max(0, 6,7) = 7 // 最后的combine操作是0 + 3 + 5 + 7 = 15 //注意最后的reduce操作包含初始值 (1)val z = sc.parallelize(List(2,3,4,5,6,7), 3) z.aggregate(0)((a,b) => math.max(a, b), (c,d) => c + d ) res6: Int = 15 // 分區0的reduce操作是max(3, 2,3) = 3 // 分區1的reduce操作是max(3, 4,5) = 5 // 分區2的reduce操作是max(3, 6,7) = 7 // 最后的combine操作是3 + 3 + 5 + 7 = 18 (2)val z = sc.parallelize(List(2,3,4,5,6,7), 3) z.aggregate(3)((a,b) => math.max(a, b), (c,d) => c + d ) res7: Int = 18 (3)val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("")(_ + _, _+_) res8: String = defabc (4)val z = sc.parallelize(List("a","b","c","d","e","f"),2) z.aggregate("x")(_ + _, _+_) res9: String = xxdefxabc在spark中一個分區對應一個task,從源碼來看,zeroValue參與每個分區的seqOp(reduce)方法和最后的combOp(第二個reduce)方法,先對每個分區求reduce,在該例子中是對3個分區分別求Max操作,得到分區最大值,得到的結果參與combOp方法,即把各分區的結果和zeroValue相加最后得到結果值,從前兩個例子可以看出這個操作特點,體現先分后總的思想。
對于后面兩個例子使用的是字符串,aggregate方法的思路一樣,先對各分區求seqOp方法然后再使用combOp方法把各分區的結果聚合相加,得到最終結果。
10.fold
fold方法與aggregate方法原理類似,區別就是少了一個seqOp方法。fold方法是把每個分區的元素進行聚合,然后調用reduce(op)方法處理。
方法源碼實現:
def fold(zeroValue: T)(op: (T,T) => T): T = {//克隆zero值,因為這個值也會被序列化 var jobResult = Utils.clone(zeroValue,sc.env.closureSerializer.newInstance())val cleanOp = sc.clean(op)val foldPartition = (iter: Iterator[T])=> iter.fold(zeroValue)(cleanOp)val mergeResult = (index: Int, taskResult:T) => jobResult = op(jobResult, taskResult)sc.runJob(this, foldPartition, mergeResult)jobResult}【例3-34】fold方法應用樣例
// 分區0的reduce操作是0 + 1 + 2 + 3 = 6 // 分區1的reduce操作是0 + 4 + 5 + 6 = 15 // 分區2的reduce操作是0 + 7 + 8 + 9 = 24 // 最后的combine操作是0 + 6 + 15 + 24 = 45 (1)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.fold(0)(_ + _) res11:: Int = 45 // 分區0的reduce操作是1 + 1 + 2 + 3 = 7 // 分區1的reduce操作是1 + 4 + 5 + 6 = 16 // 分區2的reduce操作是1 + 7 + 8 + 9 = 25 // 最后的combine操作是1 + 7 + 16 + 25 = 53 (2)val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3) a.fold(1)(_ + _) res12: Int = 53?這個例子中的使用方式與aggregate方法非常相似,注意zeroValue參與所有分區計算。fold計算是保證每個分區能獨立計算,它與aggregate最大的區別是aggregate對不同分區提交的最終結果定義了一個專門的comOp函數來處理,而fold方法是采用一個方法來處理aggregate的兩個方法過程。
3.6 本章小結
? ??? ? 本章主要為讀者講述了Spark核心開發部分,其中講述了SparkContext的作用與創建過程,還對RDD的概念模型進行介紹,說明了RDD的Transformation和Action操作的內涵意義。在基本介紹Spark編程模型后在實踐環節列出了主要的Transformation和Action方法的使用范例,同時結合了方法源碼說明范例計算過程。本章為Spark應用基礎,第六章將繼續集合源碼深入介紹RDD的運行機制和Spark調度機制。下一章將逐一介紹Spark的四大編程模型,讓讀者進一步學習并掌握Spark在不同業務場景下的應用。
?
轉自http://blog.csdn.net/xwc35047/article/details/51146622
總結
以上是生活随笔為你收集整理的【核心API开发】Spark入门教程[3]的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 访问兄弟节点
- 下一篇: kotlin for android--