2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
生活随笔
收集整理的這篇文章主要介紹了
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
目錄
案例三:電影評分數據分析
代碼實現
Shuffle分區數
案例三:電影評分數據分析
?????使用電影評分數據進行數據分析,分別使用DSL編程和SQL編程,熟悉數據處理函數及SQL使用,業務需求說明:
對電影評分數據進行統計分析,獲取Top10電影(電影評分平均值最高,并且每個電影被評分的次數大于200)。
數據格式如下,每行數據各個字段之間使用雙冒號分開:
?
數據處理分析步驟如下:
- 第一步、讀取電影評分數據,從本地文件系統讀取
- ?第二步、轉換數據,指定Schema信息,封裝到DataFrame
- ?第三步、基于SQL方式分析
- ?第四步、基于DSL方式分析
?
代碼實現
?????電影評分數據分析,經過數據ETL、數據分析(SQL分析和DSL分析)及最終保存結果,整套數據處理分析流程,其中涉及到很多數據細節,完整代碼如下:
package cn.itcast.sqlimport java.util.Propertiesimport org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel/*** 需求:對電影評分數據進行統計分析,獲取Top10電影(電影評分平均值最高,并且每個電影被評分的次數大于2000)*/
object SparkTop10Movie {def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 設置shuffle時分區數目.config("spark.sql.shuffle.partitions", "4").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._// 1. 讀取電影評分數據,從本地文件系統讀取val rawRatingsDS: Dataset[String] = spark.read.textFile("data/input/rating_100k.data")// 2. 轉換數據val ratingsDF: DataFrame = rawRatingsDS// 過濾數據.filter(line => null != line && line.trim.split("\t").length == 4)// 提取轉換數據.mapPartitions{iter =>iter.map{line =>// 按照分割符分割,拆箱到變量中val Array(userId, movieId, rating, timestamp) = line.trim.split("\t")// 返回四元組(userId, movieId, rating.toDouble, timestamp.toLong)}}// 指定列名添加Schema.toDF("userId", "movieId", "rating", "timestamp")/*root|-- userId: string (nullable = true)|-- movieId: string (nullable = true)|-- rating: double (nullable = false)|-- timestamp: long (nullable = false)*/ratingsDF.printSchema()/*+------+-------+------+---------+|userId|movieId|rating|timestamp|+------+-------+------+---------+| ????1| ??1193| ??5.0|978300760|| ????1| ???661| ??3.0|978302109|| ????1| ???594| ??4.0|978302268|| ????1| ???919| ??4.0|978301368|+------+-------+------+---------+*/ratingsDF.show(4)// TODO: 基于SQL方式分析// 第一步、注冊DataFrame為臨時視圖ratingsDF.createOrReplaceTempView("view_temp_ratings")// 第二步、編寫SQLval top10MovieDF: DataFrame = spark.sql("""|SELECT| ?movieId, ROUND(AVG(rating), 2) AS avg_rating, COUNT(movieId) AS cnt_rating|FROM| ?view_temp_ratings|GROUP BY| ?movieId|HAVING| ?cnt_rating > 200|ORDER BY| ?avg_rating DESC, cnt_rating DESC|LIMIT| ?10""".stripMargin)//top10MovieDF.printSchema()top10MovieDF.show(10, truncate = false)println("===============================================================")// TODO: 基于DSL=Domain Special Language(特定領域語言) 分析import org.apache.spark.sql.functions._val resultDF: DataFrame = ratingsDF// 選取字段.select($"movieId", $"rating")// 分組:按照電影ID,獲取平均評分和評分次數.groupBy($"movieId").agg(round(avg($"rating"), 2).as("avg_rating"),count($"movieId").as("cnt_rating"))// 過濾:評分次數大于200.filter($"cnt_rating" > 200)// 排序:先按照評分降序,再按照次數降序.orderBy($"avg_rating".desc, $"cnt_rating".desc)// 獲取前10.limit(10)//resultDF.printSchema()resultDF.show(10)/*// TODO: 將分析的結果數據保存MySQL數據庫和CSV文件// 結果DataFrame被使用多次,緩存resultDF.persist(StorageLevel.MEMORY_AND_DISK)// 1. 保存MySQL數據庫表匯總resultDF.coalesce(1).write.mode("overwrite").option("driver", "com.mysql.jdbc.Driver").option("user", "root").option("password", "root").jdbc("jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","top10_movies",new Properties())// 2. 保存CSV文件:每行數據中個字段之間使用逗號隔開resultDF.coalesce(1).write.mode("overwrite").csv("data/output/top10-movies")// 釋放緩存數據resultDF.unpersist()*/spark.stop()}
}
?
???????Shuffle分區數
運行上述程序時,查看WEB UI監控頁面發現,某個Stage中有200個Task任務,也就是說RDD有200分區Partition。
?
原因:在SparkSQL中當Job中產生Shuffle時,默認的分區數(spark.sql.shuffle.partitions )為200,在實際項目中要合理的設置。可以在構建SparkSession實例對象時進行設置
val spark = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]")// TODO: 設置shuffle時分區數目.config("spark.sql.shuffle.partitions", "4").getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十七):S
- 下一篇: 2021年大数据Spark(三十一):S