2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
目錄
案例一:花式查詢
案例二:WordCount
基于DSL編程
基于SQL編程
具體演示代碼如下:
?
案例一:花式查詢
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示SparkSQL的各種花式查詢*/
object FlowerQueryDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.準備環境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加載數據val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//錯誤的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.將每一行(每一個Array)轉為樣例類(相當于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.將RDD轉為DataFrame(DF)//注意:RDD的API中沒有toDF方法,需要導入隱式轉換!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看約束personDF.printSchema()//7.查看分布式表中的數據集personDF.show(6,false)//false表示不截斷列名,也就是列名很長的時候不會用...代替//演示SQL風格查詢//0.注冊表名//personDF.registerTempTable("t_person")//已經過時//personDF.createTempView("t_person")//創建表,如果已存在則報錯:TempTableAlreadyExistsException//personDF.createOrReplaceGlobalTempView("t_person")//創建全局表,可以夸session使用,查詢的時候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用personDF.createOrReplaceTempView("t_person")//創建一個臨時表,只有當前session可用!且表如果存在會替換!//1.查看name字段的數據spark.sql("select name from t_person").show//2.查看?name 和age字段數據spark.sql("select name,age from t_person").show//3.查詢所有的name和age,并將age+1spark.sql("select name,age,age+1 from t_person").show//4.過濾age大于等于25的spark.sql("select name,age from t_person where age >=25").show//5.統計年齡大于30的人數spark.sql("select count(age) from t_person where age >30").show//6.按年齡進行分組并統計相同年齡的人數spark.sql("select age,count(age) from t_person group by age").show//演示DSL風格查詢//1.查看name字段的數據import org.apache.spark.sql.functions._personDF.select(personDF.col("name")).showpersonDF.select(personDF("name")).showpersonDF.select(col("name")).showpersonDF.select("name").show//2.查看?name 和age字段數據personDF.select(personDF.col("name"),personDF.col("age")).showpersonDF.select("name","age").show//3.查詢所有的name和age,并將age+1//personDF.select("name","age","age+1").show//錯誤,沒有age+1這一列//personDF.select("name","age","age"+1).show//錯誤,沒有age1這一列personDF.select(col("name"),col("age"),col("age")+1).showpersonDF.select($"name",$"age",$"age"+1).show//$表示將"age"變為了列對象,先查詢再和+1進行計算personDF.select('name,'age,'age+1).show//'表示將age變為了列對象,先查詢再和+1進行計算//4.過濾age大于等于25的,使用filter方法/where方法過濾personDF.select("name","age").filter("age>=25").showpersonDF.select("name","age").where("age>=25").show//5.統計年齡大于30的人數personDF.where("age>30").count()//6.按年齡進行分組并統計相同年齡的人數personDF.groupBy("age").count().show}}
?
???????案例二:WordCount
前面使用RDD封裝數據,實現詞頻統計WordCount功能,從Spark 1.0開始,一直到Spark 2.0,建立在RDD之上的一種新的數據結構DataFrame/Dataset發展而來,更好的實現數據處理分析。DataFrame 數據結構相當于給RDD加上約束Schema,知道數據內部結構(字段名稱、字段類型),提供兩種方式分析處理數據:DataFrame API(DSL編程)和SQL(類似HiveQL編程),下面以WordCount程序為例編程實現,體驗DataFrame使用。
基于DSL編程
使用SparkSession加載文本數據,封裝到Dataset/DataFrame中,調用API函數處理分析數據(類似RDD中API函數,如flatMap、map、filter等),編程步驟:
?第一步、構建SparkSession實例對象,設置應用名稱和運行本地模式;
?第二步、讀取HDFS上文本文件數據;
?第三步、使用DSL(Dataset?API),類似RDD?API處理分析數據;
?第四步、控制臺打印結果數據和關閉SparkSession;
?
基于SQL編程
也可以實現類似HiveQL方式進行詞頻統計,直接對單詞分組group by,再進行count即可,步驟如下:
?第一步、構建SparkSession對象,加載文件數據,分割每行數據為單詞;
?第二步、將DataFrame/Dataset注冊為臨時視圖(Spark 1.x中為臨時表);
?第三步、編寫SQL語句,使用SparkSession執行獲取結果;
?第四步、控制臺打印結果數據和關閉SparkSession;
?
具體演示代碼如下:
package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Author itcast* Desc 使用SparkSQL完成WordCount---SQL風格和DSL風格*/
object WordCount {def main(args: Array[String]): Unit = {//1.準備環境val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.加載數據//val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用該方式,然后使用昨天的知識將rdd轉為df/dsval df: DataFrame = spark.read.text("data/input/words.txt")val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")//df.show()//查看分布式表數據//ds.show()//查看分布式表數據//3.做WordCount//切割//df.flatMap(_.split(" ")) //注意:直接這樣寫報錯!因為df沒有泛型,不知道_是String!//df.flatMap(row=>row.getAs[String]("value").split(" "))val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))//wordsDS.show()//使用SQL風格做WordCountwordsDS.createOrReplaceTempView("t_words")val sql:String ="""|select value,count(*) as count|from t_words|group by value|order by count desc|""".stripMarginspark.sql(sql).show()//使用DSL風格做WordCountwordsDS.groupBy("value").count().orderBy($"count".desc).show()/*+-----+-----+|value|count|+-----+-----+|hello| ???4|| ?her| ???3|| ?you| ???2|| ??me| ???1|+-----+-----++-----+-----+|value|count|+-----+-----+|hello| ???4|| ?her| ???3|| ?you| ???2|| ??me| ???1|+-----+-----+*/}
}
?
無論使用DSL還是SQL編程方式,底層轉換為RDD操作都是一樣,性能一致,查看WEB UI監控中Job運行對應的DAG圖如下:
?
從上述的案例可以發現將數據封裝到Dataset/DataFrame中,進行處理分析,更加方便簡潔,這就是Spark框架中針對結構化數據處理模:Spark SQL模塊。
官方文檔:http://spark.apache.org/sql/
總結
以上是生活随笔為你收集整理的2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十六):S
- 下一篇: 2021年大数据Spark(二十八):S