spark应用程序转换_Spark—RDD编程常用转换算子代码实例
Spark—RDD編程常用轉(zhuǎn)換算子代碼實(shí)例
Spark rdd 常用 Transformation 實(shí)例:
1、def map[U: ClassTag](f: T => U): RDD[U]? ?將函數(shù)應(yīng)用于RDD的每一元素,并返回一個(gè)新的RDD
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTestextendsApp{
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//map
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+","))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.map(_*10)
sourceMap.collect().foreach(e=>print(e+","))//10 20 30 40 50 60 70 80 90 100
}
2、def filter(f: T => Boolean): RDD[T] 通過(guò)提供的產(chǎn)生boolean條件的表達(dá)式來(lái)返回符合結(jié)果為T(mén)rue新的RDD
//filter
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.filter(_.
sourceMap.collect().foreach(e=>print(e+" "))//1 2 3 4
3、def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]? ?將函數(shù)應(yīng)用于RDD中的每一項(xiàng),對(duì)于每一項(xiàng)都產(chǎn)生一個(gè)集合,并將集合中的元素壓扁成一個(gè)集合。
//flatMap
var source = sc.parallelize(1 to 5)
source.collect().foreach(e=>print(e+" "))//1 2 3 4 5
var sourceMap = source.flatMap(x=>(1to x))
sourceMap.collect().foreach(e=>print(e+" "))//1 1 2 1 2 3 1 2 3 4 1 2 3 4 5
4、def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]? ? 將函數(shù)應(yīng)用于RDD的每一個(gè)分區(qū),每一個(gè)分區(qū)運(yùn)行一次,函數(shù)需要能夠接受Iterator類(lèi)型,然后返回Iterator。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//mapPartitions
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap =source.mapPartitions(partitionsFun)
sourceMap.collect().foreach(e=> print(e + " ")) //jams jack
}
def partitionsFun(iter:Iterator[(String,String)]): Iterator[String]={
var males=List[String]()while(iter.hasNext){
val next=iter.next()
next match {case (_,"male") => males =next._1::malescase _ =>}
}returnmales.iterator
}
}
5、def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]? 將函數(shù)應(yīng)用于RDD中的每一個(gè)分區(qū),每一個(gè)分區(qū)運(yùn)行一次,函數(shù)能夠接受 一個(gè)分區(qū)的索引值 和一個(gè)代表分區(qū)內(nèi)所有數(shù)據(jù)的Iterator類(lèi)型,需要返回Iterator類(lèi)型。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//mapPartitionsWithIndex
var source = sc.parallelize(List(("lucy", "female"), ("jack", "male"), ("jams", "male")))
source.collect().foreach(e=> print(e + " "))//(lucy,female) (jack,male) (jams,male)
var sourceMap =source.mapPartitionsWithIndex(partitionsFunWithIndex)
sourceMap.collect().foreach(e=> print(e + " ")) //[1]jams [1]jack
}
def partitionsFunWithIndex(index:Int,iter:Iterator[(String,String)]): Iterator[String]={
var males=List[String]()while(iter.hasNext){
val next=iter.next()
next match {case (_,"male") => males="["+index+"]"+next._1 :: malescase _ =>}
}
males.iterator
}
}
6、def sample(withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] 在RDD中移seed為種子返回大致上有fraction比例個(gè)數(shù)據(jù)樣本RDD,withReplacement表示是否采用放回式抽樣。
packagetop.ruandbimportorg.apache.spark.{SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//sample
var source = sc.parallelize(1 to 10)
source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8 9 10
var sourceMap = source.sample(true,0.4,2)
sourceMap.collect().foreach(e=> print(e + " ")) //1 2 2
}
}
7、def union(other: RDD[T]): RDD[T]? 將兩個(gè)RDD中的元素進(jìn)行合并,返回一個(gè)新的RDD
//union
var source = sc.parallelize(1 to 3)
source.collect().foreach(e=> print(e + " "))//1 2 3
var rdd = sc.parallelize(6 to 9)
var sourceMap=source.union(rdd)
sourceMap.collect().foreach(e=> print(e + " "))//1 2 3 6 7 8 9
8、def intersection(other: RDD[T]): RDD[T]? 將兩個(gè)RDD做交集,返回一個(gè)新的RDD
//intersection
var source = sc.parallelize(1 to 8)
source.collect().foreach(e=> print(e + " "))//1 2 3 4 5 6 7 8
var rdd = sc.parallelize(6 to 9)
var sourceMap=source.intersection(rdd)
sourceMap.collect().foreach(e=> print(e + " "))//6 8 7
9、def distinct(): RDD[T]? 將當(dāng)前RDD進(jìn)行去重后,返回一個(gè)新的RDD
//distinct
var source = sc.parallelize(List(1,1,2,2,3,3,4,4,5,5))
source.collect().foreach(e=> print(e + " "))//1 1 2 2 3 3 4 4 5 5
var sourceMap =source.distinct()
sourceMap.collect().foreach(e=> print(e + " "))//4 2 1 3 5
10、def partitionBy(partitioner: Partitioner): RDD[(K, V)]? 根據(jù)設(shè)置的分區(qū)器重新將RDD進(jìn)行分區(qū),返回新的RDD
//partitionBy
var source = sc.parallelize(List((1,"111"),(2,"222"),(3,"333"),(4,"444")),4)
source.collect().foreach(e=> print(e + " "))
print("分區(qū)數(shù):"+source.partitions.size)//(1,111) (2,222) (3,333) (4,444) 分區(qū)數(shù):4
var sourceMap = source.partitionBy(new HashPartitioner(2))
sourceMap.collect().foreach(e=> print(e + " "))
print("分區(qū)數(shù):"+sourceMap.partitions.size)//(2,222) (4,444) (1,111) (3,333) 分區(qū)數(shù):2
11、def reduceByKey(func: (V, V) => V): RDD[(K, V)]? ?根據(jù)Key值將相同Key的元組的值用func進(jìn)行計(jì)算,返回新的RDD
//reduceByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap = source.reduceByKey((x,y)=>x+y)
sourceMap.collect().foreach(e=> print(e + " "))//(hello,2) (world,2)
12、def groupByKey(): RDD[(K, Iterable[V])]? ?將相同Key的值進(jìn)行聚集,輸出一個(gè)(K, Iterable[V])類(lèi)型的RDD
//groupByKey
var source = sc.parallelize(List(("hello",1),("world",1),("hello",1),("world",1)))
source.collect().foreach(e=> print(e + " "))//(hello,1) (world,1) (hello,1) (world,1)
var sourceMap =source.groupByKey()
sourceMap.collect().foreach(e=> print(e + " "))//(hello,CompactBuffer(1, 1)) (world,CompactBuffer(1, 1))
13、def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]? ?根據(jù)key分別使用CreateCombiner和mergeValue進(jìn)行相同key的數(shù)值聚集,通過(guò)mergeCombiners將各個(gè)分區(qū)最終的結(jié)果進(jìn)行聚集。
packagetop.ruandbimportorg.apache.spark.{ SparkConf, SparkContext}
object RddTest {
def main(args: Array[String]): Unit={
val sparkConf= new SparkConf().setAppName("RddTest").setMaster("local[2]")
val sc= newSparkContext(sparkConf)//combineByKey 計(jì)算平均成績(jī)
var scores = Array(("lucy", 89), ("jack", 77), ("lucy", 100), ("james", 65), ("jack", 99),
("james", 44))
var input=sc.parallelize(scores);
input.collect().foreach(e=> print(e + " "))//(lucy,89) (jack,77) (lucy,100) (james,65) (jack,99) (james,44)
var output = input.combineByKey((v) => (v, 1),
(acc: (Int, Int), v)=> (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int))=> (acc1._1 + acc2._1, acc1._2 +acc2._2))
output.collect().foreach(e=> print(e + " "))//(james,(109,2)) (jack,(176,2)) (lucy,(189,2))
var result = output.map{case (key,value) => (key,value._1/value._2.toDouble)}
result.collect().foreach(e=> print(e + " "))//(james,54.5) (jack,88.0) (lucy,94.5)
}
}
14、def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]? ?通過(guò)seqOp函數(shù)將每一個(gè)分區(qū)里面的數(shù)據(jù)和初始值迭代帶入函數(shù)返回最終值,comOp將每一個(gè)分區(qū)返回的最終值根據(jù)key進(jìn)行合并操作。
總結(jié)
以上是生活随笔為你收集整理的spark应用程序转换_Spark—RDD编程常用转换算子代码实例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 数据结构之图:无向图的介绍与功能实现,P
- 下一篇: 有点看不懂了,回填知识中~~~~~