Spark的协同过滤.Vs.Hadoop MR
基于物品的協同過濾推薦算法案例在TDW Spark與MapReudce上的實現對比,相比于MapReduce,TDW Spark執行時間減少了66%,計算成本降低了40%。
原文鏈接:http://www.csdn.net/article/2014-11-04/2822474
算法介紹
互聯網的發展導致了信息爆炸。面對海量的信息,如何對信息進行刷選和過濾,將用戶最關注最感興趣的信息展現在用戶面前,已經成為了一個亟待解決的問題。推薦系統可以通過用戶與信息之間的聯系,一方面幫助用戶獲取有用的信息,另一方面又能讓信息展現在對其感興趣的用戶面前,實現了信息提供商與用戶的雙贏。
協同過濾推薦(Collaborative Filtering Recommendation)算法是最經典最常用的推薦算法,算法通過分析用戶興趣,在用戶群中找到指定用戶的相似用戶,綜合這些相似用戶對某一信息的評價,形成系統對該指定用戶對此信息的喜好程度預測。協同過濾可細分為以下三種:
- User-based CF: 基于User的協同過濾,通過不同用戶對Item的評分來評測用戶之間的相似性,根據用戶之間的相似性做出推薦;
- Item-based CF: 基于Item的協同過濾,通過用戶對不同Item的評分來評測Item之間的相似性,根據Item之間的相似性做出推薦;
- Model-based CF: 以模型為基礎的協同過濾(Model-basedCollaborative Filtering)是先用歷史資料得到一個模型,再用此模型進行預測推薦。
問題描述
輸入數據格式:Uid,ItemId,Rating?(用戶Uid對ItemId的評分)。
輸出數據:每個ItemId相似性最高的前N個ItemId。
由于篇幅限制,這里我們只選擇基于Item的協同過濾算法解決這個例子。
算法邏輯
基于Item的協同過濾算法的基本假設為兩個相似的Item獲得同一個用戶的好評的可能性較高。因此,該算法首先計算用戶對物品的喜好程度,然后根據用戶的喜好計算Item之間的相似度,最后找出與每個Item最相似的前N個Item。該算法的詳細描述如下:
- 計算用戶喜好:不同用戶對Item的評分數值可能相差較大,因此需要先對每個用戶的評分做二元化處理,例如對于某一用戶對某一Item的評分大于其給出的平均評分則標記為好評1,否則為差評0。
- 計算Item相似性:采用Jaccard系數作為計算兩個Item的相似性方法。狹義Jaccard相似度適合計算兩個集合之間的相似程度,計算方法為兩個集合的交集除以其并集,具體的分為以下三步。
2)Item好評鍵值對統計,統計任意兩個有關聯Item的相同好評用戶數。
3)Item相似性計算,計算任意兩個有關聯Item的相似度。
- 找出最相似的前N個Item。這一步中,Item的相似度還需要歸一化后整合,然后求出每個Item最相似的前N個Item,具體的分為以下三步。
2)Item相似性評分整合。
3)獲取每個Item相似性最高的前N個Item。
基于MapReduce的實現方案
使用 MapReduce編程模型需要為每一步實現一個MapReduce作業,一共存在包含七個MapRduce作業。每個MapReduce作業都包含Map和Reduce,其中Map從HDFS讀取數,輸出數據通過Shuffle把鍵值對發送到Reduce,Reduce階段以<key,Iterator<value>>作為輸入,輸出經過處理的鍵值對到HDFS。其運行原理如圖1 所示。
圖1
七個MapReduce作業意味著需要七次讀取和寫入HDFS,而它們的輸入輸出數據存在關聯,七個作業輸入輸出數據關系如圖2所示。
圖2
基于MapReduce實現此算法存在以下問題:
- 為了實現一個業務邏輯需要使用七個MapReduce作業,七個作業間的數據交換通過HDFS完成,增加了網絡和磁盤的開銷。
- 七個作業都需要分別調度到集群中運行,增加了Gaia集群的資源調度開銷。
- MR2和MR3重復讀取相同的數據,造成冗余的HDFS讀寫開銷。
基于Spark的實現方案
相比與MapReduce編程模型,Spark提供了更加靈活的DAG(Directed Acyclic Graph) 編程模型, 不僅包含傳統的map、reduce接口, 還增加了filter、flatMap、union等操作接口,使得編寫Spark程序更加靈活方便。使用Spark編程接口實現上述的業務邏輯如圖3所示。
圖3
相對于MapReduce,Spark在以下方面優化了作業的執行時間和資源使用。
- DAG編程模型。 通過Spark的DAG編程模型可以把七個MapReduce簡化為一個Spark作業。Spark會把該作業自動切分為八個Stage,每個Stage包含多個可并行執行的Tasks。Stage之間的數據通過Shuffle傳遞。最終只需要讀取和寫入HDFS一次。減少了六次HDFS的讀寫,讀寫HDFS減少了70%。
- Spark作業啟動后會申請所需的Executor資源,所有Stage的Tasks以線程的方式運行,共用Executors,相對于MapReduce方式,Spark申請資源的次數減少了近90%。
- Spark引入了RDD(ResilientDistributed Dataset)模型,中間數據都以RDD的形式存儲,而RDD分布存儲于slave節點的內存中,這就減少了計算過程中讀寫磁盤的次數。RDD還提供了Cache機制,例如對上圖的rdd3進行Cache后,rdd4和rdd7都可以訪問rdd3的數據。相對于MapReduce減少MR2和MR3重復讀取相同數據的問題。
效果對比
測試使用相同規模的資源,其中MapReduce方式包含200個Map和100個Reduce,每個Map和Reduce配置4G的內存; 由于Spark不再需要Reduce資源, 而MapReduce主要邏輯和資源消耗在Map端,因此使用200和400個Executor做測試,每個Executor包含4G內存。測試結果如下表所示,其中輸入記錄約38億條。| 運行模式 | 計算資源 | 運行時間(min) | 成本(Slot*秒) |
| MapReduce | 200 Map+100 Reduce(4G) | 120 | 693872 |
| Spark | 200 Executor(4G) | 33 | 396000 |
| Spark | 400 Executor(4G) | 21 | 504000 |
對比結果表的第一行和第二行,Spark運行效率和成本相對于MapReduce方式減少非常明顯,其中,DAG模型減少了70%的HDFS讀寫、cache減少重復數據的讀取,這兩個優化即能減少作業運行時間又能降低成本;而資源調度次數的減少能提高作業的運行效率。
對比結果表的第二行和第三行,增加一倍的Executor數目,作業運行時間減少約50%,成本增加約25%,從這個結果看到,增加Executor資源能有效的減少作業的運行時間,但并沒有做到完全線性增加。這是因為每個Task的運行時間并不是完全相等的, 例如某些task處理的數據量比其他task多;這可能導致Stage的最后時刻某些Task未結束而無法啟動下一個Stage,另一方面作業是一直占有Executor的,這時候會出現一些Executor空閑的狀況,于是導致了成本的增加。
小結
數據挖掘類業務大多具有復雜的處理邏輯,傳統的MapReduce/Pig類框架在應對此類數據處理任務時存在著嚴重的性能問題。針對這些任務,如果利用Spark的迭代計算和內存計算優勢,將會大幅降低運行時間和計算成本。TDW目前已經維護了千臺規模的Spark集群,并且會在資源利用率、穩定性和易用性等方面做進一步的提升和改進,為業務提供更有利的支持。
總結
以上是生活随笔為你收集整理的Spark的协同过滤.Vs.Hadoop MR的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 互联网的大数据神话——NoSQL
- 下一篇: 2014 Container技术大会:未