电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统
為了學習spark,在實驗樓上找到的一個spark入門課程,在此記錄一下學習過程。
我使用的Spark版本為Spark 2.2.0, 實驗樓教程使用的是Spark 1.6.1
流程和算法介紹
這個簡單的電影推薦系統是根據已有用戶對電影的評價系統,針對特定用戶輸出其可能會感興趣的電影,構成一個簡單的電影推薦系統。
主要步驟
加載數據集,解析成特定格式
劃分數據集,分為訓練集和測試集
利用交替最小二乘法(ALS)算法,訓練用戶與電影之間的矩陣模型
基于訓練集進行預測,利用測試集來驗證預測結果是否有效。
實際上,上述步驟的第三四步是使用了協同過濾算法來推薦電影。
引用知乎上的回答解釋協同過濾
舉個簡單的小例子
我們已知道用戶u1喜歡的電影是A,B,C
用戶u2喜歡的電影是A, C, E, F
用戶u3喜歡的電影是B,D
我們需要解決的問題是:決定對u1是不是應該推薦F這部電影。
基于內容的做法:要分析F的特征和u1所喜歡的A、B、C的特征,需要知道的信息是A(戰爭片),B(戰爭片),C(劇情片),如果F(戰爭片),那么F很大程度上可以推薦給u1,這是基于內容的做法,你需要對item進行特征建立和建模。
協同過濾的辦法:那么你完全可以忽略item的建模,因為這種辦法的決策是依賴user和item之間的關系,也就是這里的用戶和電影之間的關系。我們不再需要知道ABCF哪些是戰爭片,哪些是劇情片,我們只需要知道用戶u1和u2按照item向量表示,他們的相似度比較高,那么我們可以把u2所喜歡的F這部影片推薦給u1。
在Spark MLlib中,協同過濾算法是通過交替最小二乘法(ALS)實現的,具體算法實現在此并不關注。
數據集
數據集來自GroupLens,是一個名為MovieLens的數據集的數據,在此處選擇數據量為一百萬條的數據集,下載地址
具體代碼和分析
1.導入包
我們需要導入以下包
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
mllib包是Spark中的機器學習包,我們這次導入的有ALS,MatrixFactorizationModel,Rating。ALS即為上文提到的交替最小二乘算法,在Spark中ALS算法的返回結果為MatrixFactorizationModel類,最后的Rating是Spark定義的評價Model,對應于我們數據中的Rating.dat中的內容,不用用戶再自行定義
然后,我們還需要導入implicits包,這個是Spark中的隱式轉換包,可以自動地對一些數據類型進行轉換,但是這個包需要在代碼中動態導入
val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
import spark.implicits._
其中spark為SparkSession類,在Spark 2.2.0中用來代替SparkContext,作為整個程序的入口點
2.數據處理
定義電影、用戶數據實體類,用來映射對應的數據
case class Movie(movieId: Int, title: String)
case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
定義解析函數,將數據從文件中解析出來
def parseMovieData(data: String): Movie = {
val dataField = data.split("::")
assert(dataField.size == 3)
Movie(dataField(0).toInt, dataField(1))
}
def parseUserData(data: String): User = {
val dataField = data.split("::")
assert(dataField.size == 5)
User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
}
def parseRatingData(data: String): Rating = {
val dataField = data.split("::")
Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
}
導入數據
var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()
var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()
var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()
3. 訓練模型
// convert to DataFrame
val moviesDF = moviesData.toDF()
val usersDF = usersData.toDF()
val ratingsDF = ratingsData.toDF()
// split to data set and test set
val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
val testSetOfRatingData = tempPartitions(1).cache().rdd
// training model
val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
按7:3的比例將數據集分為訓練集和驗證集,由于劃分出來的數據集為DataSet類型,而ALS算法的run函數接收的參數為RDD類型,所以需要將DataSet轉換為RDD,方法很簡單,就加上”.rdd"就可以了,如果不轉換會報錯
spark_error_5.PNG
訓練完之后可以調用模型進行推薦,比如要給用戶ID為1000的用戶推薦適合TA看的10部電影,就可以執行
val recomResult = recomModel.recommendProducts(1000, 10)
結果如下
運行結果
返回的結果包括用戶ID,電影ID,和對應的相關性
如果我們要顯示電影名,可以執行以下代碼
val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))
在Spark老版本中,可以直接使用
val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()
將moviesDF轉換為key為電影ID,value為電影名的map,但是在2.2.0中,如果這樣寫會提示DataSet沒有collectAsMap()方法,錯誤截圖如下
錯誤截圖
經過一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要將moviesDF轉換為RDD類型,即上文用到的方法
打印出來的結果如圖
轉換結果
4.驗證模型
如何知道模型是否正確呢?可以用之前從數據集里面劃分出來的驗證集,通過調用模型得出預測結果,與驗證集中的原數據進行對比,可以判斷模型的效果如何
val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
case Rating(user, product, rating) => (user, product)
})
val formatResultOfTestSet = testSetOfRatingData.map{
case Rating(user, product, rating) => ((user, product), rating)
}
val formatResultOfPredictionResult = predictResultOfTestSet.map {
case Rating(user, product, rating) => ((user, product), rating)
}
val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
val MAE = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
Math.abs(error)
}.mean()
在得到測試集的預測評分結果之后,我們用 map 操作和 join 操作將它與測試集的原始數據組合成為 ((用戶ID, 電影ID), (測試集原有評分, 預測評分))的格式。這個格式是 Key-Value 形式的,Key 為 (user, product)。我們是要把這里的測試集原有評分與預測時得到的評分相比較,二者的聯系就是 user 和 product 相同。
上述代碼中首先調用模型進行預測,然后將在測試集上的預測結果和測試集本身的數據都轉換為 ((user,product), rating) 的格式,之后將兩個數據組合在一起,計算兩者之間的評價的差值的絕對值,然后求平均值,這種方法叫做計算平均絕對誤差
平均絕對誤差( Mean Absolute Error )是所有單個觀測值與算術平均值偏差的絕對值的平均。
與平均誤差相比,平均絕對誤差由于離差被絕對值化,不會出現正負相抵消的情況,所以平均絕對誤差能更好地反映預測值誤差的實際情況。
最終算出的結果為
image.png
效果還算可以,如果想繼續優化可以通過增加ALS的迭代次數和特征矩陣的秩來提高準確率
完整代碼
import org.apache.spark.rdd._
import org.apache.spark.sql._
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
object PredictMovie {
case class Movie(movieId: Int, title: String)
case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)
def parseMovieData(data: String): Movie = {
val dataField = data.split("::")
assert(dataField.size == 3)
Movie(dataField(0).toInt, dataField(1))
}
def parseUserData(data: String): User = {
val dataField = data.split("::")
assert(dataField.size == 5)
User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)
}
def parseRatingData(data: String): Rating = {
val dataField = data.split("::")
Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)
}
def main(args: Array[String]){
val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()
import spark.implicits._
var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()
var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()
var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()
// convert to DataFrame
val moviesDF = moviesData.toDF()
val usersDF = usersData.toDF()
val ratingsDF = ratingsData.toDF()
// split to data set and test set
val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)
val trainingSetOfRatingsData = tempPartitions(0).cache().rdd
val testSetOfRatingData = tempPartitions(1).cache().rdd
// training model
val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)
val recomResult = recomModel.recommendProducts(1000, 10)
println(s"Recommend Movie to User ID 1000")
println(recomResult.mkString("\n"))
val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()
val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))
println(recommendMoviesWithTitle.mkString("\n"))
val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{
case Rating(user, product, rating) => (user, product)
})
val formatResultOfTestSet = testSetOfRatingData.map{
case Rating(user, product, rating) => ((user, product), rating)
}
val formatResultOfPredictionResult = predictResultOfTestSet.map {
case Rating(user, product, rating) => ((user, product), rating)
}
val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)
val MAE = finalResultForComparison.map {
case ((user, product), (ratingOfTest, ratingOfPrediction)) =>
val error = (ratingOfTest - ratingOfPrediction)
Math.abs(error)
}.mean()
println(s"mean error: $MAE")
spark.stop()
}
}
總結
以上是生活随笔為你收集整理的电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: firefly游戏服务器学习笔记 6——
- 下一篇: 暑假训练---三棱锥内切球公式及海伦公式