SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总
生活随笔
收集整理的這篇文章主要介紹了
SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
AVG是求平均值,所以輸出類(lèi)型是Double類(lèi)型
1)創(chuàng)建弱類(lèi)型聚合函數(shù)類(lèi)extends UserDefinedAggregateFunction
class MyAgeFunction extends UserDefinedAggregateFunction {//函數(shù)輸入的數(shù)據(jù)結(jié)構(gòu),需要new一個(gè)具體的結(jié)構(gòu)對(duì)象,然后添加結(jié)構(gòu)override def inputSchema: StructType = {new StructType().add("age",LongType)}//計(jì)算時(shí)的數(shù)據(jù)結(jié)構(gòu)override def bufferSchema: StructType = {new StructType().add("sum",LongType).add("conut",LongType)}//函數(shù)返回的數(shù)據(jù)類(lèi)型override def dataType: DataType = DoubleType//表述函數(shù)是否穩(wěn)定override def deterministic: Boolean = true//表述的是函數(shù)計(jì)算之前的緩沖區(qū)的初始化 buffer(0)表示第一個(gè)結(jié)構(gòu):sum, buffer(1)示第二個(gè)結(jié)構(gòu):countoverride def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}//根據(jù)查詢(xún)結(jié)構(gòu)來(lái)更新緩沖區(qū)數(shù)據(jù)sum + = input.getLong count+=1override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}//將多個(gè)節(jié)點(diǎn)的緩沖區(qū)合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}//計(jì)算override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)} }聚合函數(shù)使用
def main(args: Array[String]): Unit = {//創(chuàng)建配置對(duì)象val conf = new SparkConf().setAppName("Spark01_Custom").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd1 = spark.sparkContext.makeRDD(List(("chun",21),("chun1",23),("chun3",22)))//隱士轉(zhuǎn)換(RDD轉(zhuǎn)換DF/DS需要引入隱式轉(zhuǎn)換)import spark.implicits._// rdd轉(zhuǎn)DFval frame = rdd1.toDF("name","age")//創(chuàng)建全局視圖frame.createGlobalTempView("people")//創(chuàng)建聚合函數(shù)對(duì)象val udaf = new MyAgeFunction//注冊(cè)聚合函數(shù)spark.udf.register("avgAge",udaf)//frame.select("age").show()//sql 這里表名要把全局名也寫(xiě)上spark.sql("select avgAge(age) from global_temp.people").show}2)創(chuàng)建強(qiáng)類(lèi)型聚合函數(shù)AVG(extends Aggregator[輸入類(lèi)型,緩沖區(qū)類(lèi)型,輸出類(lèi)型])
//聲明自定義聚合函數(shù)(強(qiáng)類(lèi)型) //case class Aggregator[K, V, C] (這里由三個(gè)泛型) class MyAgeClassFuction extends Aggregator[UserBean,AvgBuffer,Double]{//初始化緩沖區(qū)override def zero: AvgBuffer = AvgBuffer(0,0)//AvgBuffer = 把輸入的數(shù)據(jù)更新進(jìn)緩沖區(qū)override def reduce(b: AvgBuffer, a: UserBean): AvgBuffer = {//sum和count要設(shè)置為var的b.sum += a.ageb.count += 1b}//合并緩沖區(qū)override def merge(b1: AvgBuffer, b2: AvgBuffer): AvgBuffer = {b1.sum = b1.sum + b2.sumb1.count = b1.count + b2.countb1}//計(jì)算結(jié)果override def finish(reduction: AvgBuffer): Double = {reduction.sum / reduction.count}//后倆都是數(shù)據(jù)變成類(lèi)型之后的轉(zhuǎn)碼操作//第一個(gè)是自定義的類(lèi)型,就用Encoders.productoverride def bufferEncoder: Encoder[AvgBuffer] = Encoders.product//如果不是自定義類(lèi)型就用Encoders.scalaBooleanoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble }//樣例類(lèi) case class UserBean(name : String, age : Int) case class AvgBuffer(var sum : Int, var count : Int)使用
def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Spark02_Custom2").setMaster("local[*]")val spark = SparkSession.builder().config(conf).getOrCreate()val rdd = spark.sparkContext.makeRDD(List(("chun1",23),("chun2",24),("chun3",25)))import spark.implicits._rdd.toDF("name","age")//自定義強(qiáng)類(lèi)型聚合函數(shù)val udaf = new MyAgeClassFuction//這里不能注冊(cè),加入注冊(cè)了名為avgAge,使用的時(shí)候是avgAge(字段),但是傳入的應(yīng)該是Bean對(duì)象,所以不可以這樣寫(xiě)//需要將聚合函數(shù)轉(zhuǎn)換為查詢(xún)列val avgColumn = udaf.toColumn.name("avgAge")val userRDD = rdd.map {case (name, age) => {UserBean(name, age)}}//在sql里肯定沒(méi)辦法用,需要使用DSL風(fēng)格select函數(shù)val ds = userRDD.toDSval rdd1 = ds.rddds.show()/****結(jié)果:+-----+---+| name|age|+-----+---+|chun1| 23||chun2| 24||chun3| 25|+-----+---+**/rdd1.foreach(println)//結(jié)果://UserBean(chun1,23)//UserBean(chun3,25)//UserBean(chun2,24)spark.stop()}可以看到強(qiáng)類(lèi)型聚合函數(shù)輸出的結(jié)果每一行都是UserBean類(lèi)型的,是樣例類(lèi)類(lèi)型,并不像弱類(lèi)型一樣是row
總結(jié)
以上是生活随笔為你收集整理的SparkSQL自定义AVG强类型聚合函数与弱类型聚合函数汇总的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 平安信用卡优惠 平安信用卡周三优惠
- 下一篇: 相比10年前,我国居民的收入翻了两倍,为