SparkRDD——行动算子
一、行動算子定義
spark的算子可以分為trans action算子 以及 action算子 ,即變換/轉(zhuǎn)換 算子。如果執(zhí)行一個(gè)RDD算子并不觸發(fā)作業(yè)的提交,僅僅只是記錄作業(yè)中間處理過程,那么這就是trans action算子 ,相反如果執(zhí)行這個(gè) RDD 時(shí)會觸發(fā) Spark Context 提交 Job 作業(yè),那么它就是 action算子及行動算子。
總結(jié)來說就是在Spark中,轉(zhuǎn)換算子并不會馬上進(jìn)行運(yùn)算的,即所謂的“惰性運(yùn)算”,而是在遇到行動算子時(shí)才會執(zhí)行相應(yīng)的語句的,觸發(fā)Spark的任務(wù)調(diào)度并開始進(jìn)行計(jì)算。
我們可以將行動算子分為兩類:
- 1,數(shù)據(jù)運(yùn)算類:主要用于觸發(fā)RDD計(jì)算,并得到計(jì)算結(jié)果返回給Spark程序或Shell界面;
- 2,數(shù)據(jù)存儲類:用于觸發(fā)RDD計(jì)算后,將結(jié)果保存到外部存儲系統(tǒng)中,如HDFS文件系統(tǒng)或數(shù)據(jù)庫。
二、總覽
一、數(shù)據(jù)運(yùn)算類: 1、reduce 將rdd中的數(shù)據(jù)進(jìn)行聚合,先進(jìn)行分區(qū)內(nèi)聚合,在進(jìn)行分區(qū)間聚合 2、collect 將rdd中的數(shù)據(jù)按分區(qū)號采集,并以數(shù)組的形式返回所有數(shù)據(jù) 3、collectAsMap 收集Key/Value型RDD中的元素,并以map的形式返回?cái)?shù)據(jù) 4、foreach 循環(huán)遍歷分區(qū)內(nèi)數(shù)據(jù),該算子執(zhí)行位置是在Executor端 5、count 計(jì)算rdd中數(shù)據(jù)個(gè)數(shù) 6、first 取rdd中數(shù)據(jù)的第一個(gè) 7、take 取rdd中數(shù)據(jù)的前num個(gè) 8、takeOrdered 將rdd中的數(shù)據(jù)進(jìn)行排序后取前num個(gè) 9、aggregate 類似于aggregateByKey算子,同樣兩個(gè)參數(shù)列表,分別傳遞初始值和分區(qū)內(nèi)計(jì)算規(guī)則和分區(qū)間計(jì)算規(guī)則。 10、fold 簡化版的aggregate,分區(qū)內(nèi)計(jì)算規(guī)則和分區(qū)間計(jì)算規(guī)則一樣。 11、countByKey 根據(jù)鍵值對中的key進(jìn)行計(jì)數(shù),返回一個(gè)map,對應(yīng)了每個(gè)key在rdd中出現(xiàn)的次數(shù)。 12、countByValue 根據(jù)rdd中數(shù)據(jù)的數(shù)據(jù)值進(jìn)行計(jì)數(shù),注不是鍵值對中的value,同樣返回一個(gè)map,對應(yīng)每個(gè)數(shù)據(jù)出現(xiàn)的次數(shù)。 13、max 求rdd中數(shù)據(jù)的最大值 14、min 求rdd中數(shù)據(jù)的最小值 二、數(shù)據(jù)存儲類: 1、saveAsTextFile 存儲為文本文件 2、saveAsObjectFile 存儲為二進(jìn)制文件 3、saveAsSequenceFile 要求數(shù)據(jù)必須為<k,v>類型, 保存為 Sequencefile文件注:sequenceFile文件是Hadoop用來存儲二進(jìn)制形式的 (Key,Value) 對而設(shè)計(jì)的一種平面文件。詳細(xì)可以看這篇文章了解:鏈接
三、數(shù)據(jù)運(yùn)算類action算子
1、reduce
通過傳入的方法聚集rdd中所有的元素,先聚合分區(qū)內(nèi)的數(shù)據(jù),再聚合分區(qū)間的數(shù)據(jù)
def reduce(f: (T, T) => T): T
2、collect
數(shù)據(jù)采集,以數(shù)組Array的形式按分區(qū)順序返回?cái)?shù)據(jù)集中的所有元素
def collect(): Array[T]
3、collectAsMap
收集Key/Value型RDD中的元素,并以map的形式返回?cái)?shù)據(jù)
注:只有key/value類型的RDD才有這個(gè)方法
def collectAsMap(): Map[K, V]
4、foreach
循環(huán)遍歷分區(qū)內(nèi)數(shù)據(jù),該算子執(zhí)行位置是在Executor端
def foreach(f: T => Unit): Unit
5、count
返回rdd中元素的個(gè)數(shù),即collect返回的數(shù)組的長度
def count(): Long
6、first
返回rdd中的第一個(gè)元素,即collect返回的數(shù)組的第一個(gè)元素
def first(): T
7、take
返回rdd中的前n個(gè)元素,即collect返回的數(shù)組的前n個(gè)元素
def take(num: Int): Array[T]
8、takeOrdered
返回rdd中排序后的前n個(gè)元素
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
9、aggregate
與aggregateByKey類似,需要傳入兩個(gè)參數(shù)列表,列表元素意義也相同
- 第一個(gè)列表,傳入初始的比較值
- 第二個(gè)參數(shù)列表傳入兩個(gè)函數(shù),分別表示分區(qū)內(nèi)計(jì)算規(guī)則和分區(qū)間計(jì)算規(guī)則
aggregateByKey:初始值只會參與分區(qū)內(nèi)計(jì)算
aggregate:初始值既會參與分區(qū)內(nèi)計(jì)算也會參與分區(qū)間計(jì)算
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
10、fold
類似于foldByKey,即當(dāng)aggregate的分區(qū)內(nèi)和分區(qū)間計(jì)算規(guī)則相同時(shí)可以簡化使用fold,只需要傳入一個(gè)計(jì)算規(guī)則
def fold(zeroValue: T)(op: (T, T) => T): T
11、countByKey
用于統(tǒng)計(jì)鍵值對類型的數(shù)據(jù)中每個(gè)key出現(xiàn)的個(gè)數(shù)
def countByKey(): Map[K, Long]
12、countByValue
根據(jù)rdd中數(shù)據(jù)的數(shù)據(jù)值進(jìn)行計(jì)數(shù),注不是鍵值對中的value,同樣返回一個(gè)map,對應(yīng)每個(gè)數(shù)據(jù)出現(xiàn)的次數(shù)。
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
13、max && min
返回rdd數(shù)據(jù)集中的最大值/最小值
def max()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.max) } def min()(implicit ord: Ordering[T]): T = withScope {this.reduce(ord.min) } val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4)) val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("c", 1))) println(rdd.max()) println(rdd2.max()) println(rdd.min()) println(rdd2.min())總結(jié)
以上是生活随笔為你收集整理的SparkRDD——行动算子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前瞻:数据科学中的探索性数据分析(DEA
- 下一篇: 一个学员去了互联网大厂一个笔试题分享