Spark _27_自定义函数UDF和UDAF
生活随笔
收集整理的這篇文章主要介紹了
Spark _27_自定义函数UDF和UDAF
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
UDF:用戶自定義函數(shù)。
可以自定義類實(shí)現(xiàn)UDFX接口。
javaAPI:
package com.udf;import javafx.scene.chart.PieChart; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.api.java.UDF1; import org.apache.spark.sql.api.java.UDF2; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays; import java.util.List;/*** @author George* @description**/ public class Udf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setAppName("udf");conf.setMaster("local");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> rdd = sc.parallelize(Arrays.asList("George","GeorgeDage","kangkang"));JavaRDD<Row> map = rdd.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});List<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> df = sqlContext.createDataFrame(map, schema);df.show();/*** +----------+* | name|* +----------+* | George|* |GeorgeDage|* | kangkang|* +----------+*/df.registerTempTable("user");/*** 根據(jù)UDF函數(shù)參數(shù)的個數(shù)來決定是實(shí)現(xiàn)哪一個UDF UDF1,UDF2。。。。UDF1xxx*/sqlContext.udf().register("StrLen", new UDF1<String, Integer>() {public Integer call(String s) throws Exception {return s.length();}},DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 6|* |GeorgeDage| 10|* | kangkang| 8|* +----------+------+*/sqlContext.udf().register("StrLen", new UDF2<String, Integer, Integer>() {public Integer call(String s, Integer integer) throws Exception {return s.length()+integer;}}, DataTypes.IntegerType);sqlContext.sql("select name,StrLen(name,10) as length from user").show();/*** +----------+------+* | name|length|* +----------+------+* | George| 16|* |GeorgeDage| 20|* | kangkang| 18|* +----------+------+*/sc.stop();} }scalaAPI:
package com.udfimport org.apache.spark.sql.SparkSession/*** UDF用戶自定義函數(shù)*/ object UdfScalaDemo {def main(args: Array[String]): Unit = {val sparkSession = SparkSession.builder().master("local").appName("udf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk")import sparkSession.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")frame.show()/*** +------+* | name|* +------+* |George|* | lucy|* | kk|* | lmdhk|* +------+*/sparkSession.udf.register("STRLEN",(n:String)=>{n.length})sparkSession.sql("select name,STRLEN(name) as length from students sort by length desc").show(100)/*** +------+------+* | name|length|* +------+------+* |George| 6|* | lmdhk| 5|* | lucy| 4|* | kk| 2|* +------+------+*/sparkSession.stop()} }?
UDAF:用戶自定義聚合函數(shù)。
- 實(shí)現(xiàn)UDAF函數(shù)如果要自定義類要繼承UserDefinedAggregateFunction類
javaAPI:
package com.udf;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.expressions.MutableAggregationBuffer; import org.apache.spark.sql.expressions.UserDefinedAggregateFunction; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType;import java.util.ArrayList; import java.util.Arrays;/*** @author George* @description*用戶自定義聚合函數(shù)。*實(shí)現(xiàn)UDAF函數(shù)如果要自定義類要繼承UserDefinedAggregateFunction類**/ public class Udaf {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("udaf");JavaSparkContext sc = new JavaSparkContext(conf);sc.setLogLevel("error");SQLContext sqlContext = new SQLContext(sc);JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("George", "kangkang", "GeorgeDage", "limu","George","GeorgeDage"));JavaRDD<Row> map = parallelize.map(new Function<String, Row>() {public Row call(String v1) throws Exception {return RowFactory.create(v1);}});ArrayList<StructField> fields = new ArrayList<StructField>();fields.add(DataTypes.createStructField("name",DataTypes.StringType,true));StructType schema = DataTypes.createStructType(fields);Dataset<Row> frame = sqlContext.createDataFrame(map, schema);frame.show();/*** +----------+* | name|* +----------+* | George|* | kangkang|* |GeorgeDage|* | limu|* +----------+*/frame.registerTempTable("user");/*** 注冊一個UDAF函數(shù),實(shí)現(xiàn)統(tǒng)計(jì)相同值得個數(shù)* 注意:這里可以自定義一個類繼承UserDefinedAggregateFunction類也是可以的*/sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {/*** 指定輸入字段的字段及類型*/@Overridepublic StructType inputSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("name",DataTypes.StringType, true)));}@Overridepublic DataType dataType() {return DataTypes.IntegerType;}@Overridepublic boolean deterministic() {return true;}/*** 更新 可以認(rèn)為一個一個地將組內(nèi)的字段值傳遞進(jìn)來 實(shí)現(xiàn)拼接的邏輯* buffer.getInt(0)獲取的是上一次聚合后的值* 相當(dāng)于map端的combiner,combiner就是對每一個map task的處理結(jié)果進(jìn)行一次小聚合* 大聚和發(fā)生在reduce端.* 這里即是:在進(jìn)行聚合的時候,每當(dāng)有新的值進(jìn)來,對分組后的聚合如何進(jìn)行計(jì)算*/@Overridepublic void update(MutableAggregationBuffer buffer, Row input) {buffer.update(0,buffer.getInt(0)+1);}@Overridepublic StructType bufferSchema() {return DataTypes.createStructType(Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, true)));}/*** 合并 update操作,可能是針對一個分組內(nèi)的部分?jǐn)?shù)據(jù),在某個節(jié)點(diǎn)上發(fā)生的 但是可能一個分組內(nèi)的數(shù)據(jù),會分布在多個節(jié)點(diǎn)上處理* 此時就要用merge操作,將各個節(jié)點(diǎn)上分布式拼接好的串,合并起來* buffer1.getInt(0) : 大聚和的時候 上一次聚合后的值* buffer2.getInt(0) : 這次計(jì)算傳入進(jìn)來的update的結(jié)果* 這里即是:最后在分布式節(jié)點(diǎn)完成后需要進(jìn)行全局級別的Merge操作*/@Overridepublic void merge(MutableAggregationBuffer buffer1, Row buffer2) {buffer1.update(0,buffer1.getInt(0) + buffer2.getInt(0));}/*** 初始化一個內(nèi)部的自己定義的值,在Aggregate之前每組數(shù)據(jù)的初始化結(jié)果*/@Overridepublic void initialize(MutableAggregationBuffer buffer) {buffer.update(0, 0);}/*** 最后返回一個和DataType的類型要一致的類型,返回UDAF最后的計(jì)算結(jié)果*/@Overridepublic Object evaluate(Row buffer) {return buffer.getInt(0);}});sqlContext.sql("select name ,StringCount(name) from user group by name").show();/*** +----------+------+* | name|(name)|* +----------+------+* | limu| 1|* | George| 2|* |GeorgeDage| 2|* | kangkang| 1|* +----------+------+*/sc.stop();} }scalaAPI:
package com.udfimport org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._class MyUDAF extends UserDefinedAggregateFunction{// 聚合操作時,所處理的數(shù)據(jù)的類型def bufferSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("aaa",IntegerType, true)))}// 最終函數(shù)返回值的類型def dataType: DataType = {DataTypes.IntegerType}def deterministic: Boolean = {true}// 最后返回一個最終的聚合值 要和dataType的類型一一對應(yīng)def evaluate(buffer: Row): Any = {buffer.getAs[Int](0)}// 為每個分組的數(shù)據(jù)執(zhí)行初始化值def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0}//輸入數(shù)據(jù)的類型def inputSchema: StructType = {DataTypes.createStructType(Array(DataTypes.createStructField("input", StringType, true)))}// 最后merger的時候,在各個節(jié)點(diǎn)上的聚合值,要進(jìn)行merge,也就是合并def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)}// 每個組,有新的值進(jìn)來的時候,進(jìn)行分組對應(yīng)的聚合值的計(jì)算def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getAs[Int](0)+1} }?
package com.udfimport org.apache.spark.sql.SparkSession import org.apache.spark.sql.expressions.UserDefinedAggregateFunctionobject UdafScalaDemo {def main(args: Array[String]): Unit = {val session = SparkSession.builder().master("local").appName("udaf").getOrCreate()val list = List[String]("George","lucy","kk","lmdhk","kk")import session.implicits._val frame = list.toDF("name")frame.createOrReplaceTempView("students")/*** 注冊UDAF函數(shù)*/session.udf.register("NAMECOUNT",new MyUDAF())session.sql("select name,NAMECOUNT(name) as count from students group by name").show(100)/*** +------+-----+* | name|count|* +------+-----+* | lucy| 1|* | kk| 2|* |George| 1|* | lmdhk| 1|* +------+-----+*/session.stop()} }圖解UDAF:
總結(jié)
以上是生活随笔為你收集整理的Spark _27_自定义函数UDF和UDAF的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark _26_Spark On H
- 下一篇: 一些 Linux 系统故障修复和修复技巧