spark将rdd转为string_八、Spark之详解Tranformation算子
生活随笔
收集整理的這篇文章主要介紹了
spark将rdd转为string_八、Spark之详解Tranformation算子
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
RDD中的所有轉(zhuǎn)換(Transformation)算子都是延遲加載的,也就是說,它們并不會直接計算結(jié)果。相反的,它們只是記住這些應用到基礎數(shù)據(jù)集(例如一個文件)上的轉(zhuǎn)換動作。只有當發(fā)生一個要求返回結(jié)果給Driver的動作時,這些轉(zhuǎn)換才會真正運行。這種設計讓Spark更加有效率地運行。
常用Transformation類算子列表
常用Transformation類算子列表
常用Transformation類算子實例
- map(func): 返回一個新的RDD,該RDD由每一個輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成, map操作是一對一操作,每進去一個元素,就出來一個元素
- filter(func): 過濾。返回一個新的RDD,該RDD由經(jīng)過func函數(shù)計算后返回值為true的輸入元素組成, RDD元素的類型不會改變。
- flatMap(func): 壓平。類似于map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
集合類Transformation算子實例
- union(otherRDD): 對源RDD和參數(shù)RDD求并集后返回一個新的RDD, 需要兩個RDD
- subtract(otherRDD): 對源RDD和參數(shù)RDD求差集后返回一個新的RDD, 需要兩個RDD
- intersection(otherRDD): 對源RDD和參數(shù)RDD求交集后返回一個新的RDD, 需要有兩個RDD
- distinct(): 對源RDD進行去重后返回一個新的RDD, 只需要一個RDD
其底層的實現(xiàn)原理(如下面Java代碼所示)是:mapToPair+reduceByKey+mapToPair =>
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.*;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * distinct: 對RDD中的元素去重 */public class distinctOperator { public static void main(String[] args) { SparkConf conf = new SparkConf() .setMaster("local") .setAppName("distinct"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); JavaRDD rdd1 = sc.parallelize(Arrays.asList( "a", "a", "a", "a", "b", "b", "b", "b" )); /** * 傳統(tǒng)方式實現(xiàn)RDD元素去重需要三步 * 第一步:把RDD轉(zhuǎn)換成K,V格式的RDD, K為元素,V為1 * 每二步:對K,V格式的RDD的Key進行分組計算 * 第三步:對得到的RDD只取第一位鍵 */ // [(a,1),(a,1),(a,1),(a,1),(b,1),b,1),b,1),b,1)] JavaPairRDD mapToPairRDD = rdd1.mapToPair(new PairFunction() { @Override public Tuple2 call(String s) throws Exception { return new Tuple2(s, 1); } }); //對每個key進行聚合 //[(a,4),(b,4)] JavaPairRDD reduceRDD = mapToPairRDD.reduceByKey(new Function2() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //只取鍵,不要值 JavaRDD mapRDD = reduceRDD.map(new Function, String>() { @Override public String call(Tuple2 tuple) throws Exception { return tuple._1; } }); mapRDD.foreach(new VoidFunction() { @Override public void call(String s) throws Exception { System.out.println(s); } }); System.out.println("-----------------------------------"); //使用Spark提供的算子distinct實現(xiàn)RDD元素去重 JavaRDD distinctRDD = rdd1.distinct(); distinctRDD.foreach(new VoidFunction() { @Override public void call(String s) throws Exception { System.out.println(s); } }); sc.stop(); }}分組類的轉(zhuǎn)換算子
groupByKey([numTasks]): 在一個(K,V)的RDD上調(diào)用,返回一個(K, Iterator[V])的RDD。偏底層scala> val rdd1 = sc.parallelize(List(("張軍",1000),("李軍",2500),("王軍",3000),("張軍",1500)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[30] at parallelize at :24scala> val rdd2 = rdd1.groupByKey()rdd2: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[31] at groupByKey at :25scala> rdd2.collect()res11: Array[(String, Iterable[Int])] = Array((王軍,CompactBuffer(3000)), (張軍,CompactBuffer(1000, 1500)), (李軍,CompactBuffer(2500)))- reduceByKey(func, [numTasks]): 在一個(K,V)的RDD上調(diào)用,返回一個(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務的個數(shù)可以通過第二個可選的參數(shù)來設置。調(diào)用groupByKey。
- cogroup(otherRDD, [numTasks]): 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(K,(Iterable,Iterable))類型的RDD
排序類Transformation算子
sortBy(func,[ascending], [numTasks]): 與sortByKey類似,但是更靈活scala> val rdd1=sc.parallelize(Array(10,9,8,7,4,6,5,3,1,2))rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at :24scala> val rdd2=rdd1.sortBy(x=>x,true)rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[41] at sortBy at :25scala> rdd2.collect()res14: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)# K, V格式的RDDscala> val rdd1=sc.parallelize(Array(("張飛",30),("劉備",42),("關羽",32),("曹操",46),("公孫瓚",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24scala> val rdd2=rdd1.sortBy(tuple=>tuple._2, false)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[45] at sortBy at :25scala> rdd2.collect()res15: Array[(String, Int)] = Array((公孫瓚,62), (曹操,46), (劉備,42), (關羽,32), (張飛,30))sortByKey([ascending], [numTasks]): 在一個(K,V)的RDD上調(diào)用,K必須實現(xiàn)Ordered接口,返回一個按照key進行排序的(K,V)的RDDscala> val rdd1=sc.parallelize(Array(("張飛",30),("劉備",42),("關羽",32),("曹操",46),("公孫瓚",62)))rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[42] at parallelize at :24# 同樣對rdd1調(diào)用,需要進行轉(zhuǎn)換scala> val rdd2=rdd1.map(tuple=>tuple.swap).sortByKey(false).map(tuple=>tuple.swap)rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[48] at map at :25scala> rdd2.collect()res16: Array[(String, Int)] = Array((公孫瓚,62), (曹操,46), (劉備,42), (關羽,32), (張飛,30))高級類的轉(zhuǎn)換算子
- mapPartitionWithIndex(func): 類似于mapPartitions, 但是func帶有一個整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運行時,func的函數(shù)類型必須是(Int, Iterator[T]) => Iterator[U]。其功能是對RDD中的每個分區(qū)進行操作,帶有索引下標,可以取到分區(qū)號。
- func: 接收兩個參數(shù),第一個參數(shù)代表分區(qū)號,第二參數(shù)代表分區(qū)中的元素。
- aggregateByKey: 后面有單獨文章講解此算子
總結(jié)
以上是生活随笔為你收集整理的spark将rdd转为string_八、Spark之详解Tranformation算子的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在诊断与排除汽车制动故障的操作准备前应?
- 下一篇: 广汽传祺gm6避用油能跑多少公里?