2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
目錄
案例-SogouQ日志分析
業(yè)務(wù)需求
準(zhǔn)備工作
HanLP?中文分詞
樣例類 SogouRecord
業(yè)務(wù)實現(xiàn)
???????搜索關(guān)鍵詞統(tǒng)計
???????用戶搜索點擊統(tǒng)計
???????搜索時間段統(tǒng)計
???????完整代碼
案例-SogouQ日志分析
使用搜狗實驗室提供【用戶查詢?nèi)罩?SogouQ)】數(shù)據(jù),使用Spark框架,將數(shù)據(jù)封裝到RDD中進行業(yè)務(wù)數(shù)據(jù)處理分析。數(shù)據(jù)網(wǎng)址:http://www.sogou.com/labs/resource/q.php
?1)、數(shù)據(jù)介紹:搜索引擎查詢?nèi)罩編煸O(shè)計為包括約1個月(2008年6月)Sogou搜索引擎部分網(wǎng)頁查詢需求及用戶點擊情況的網(wǎng)頁查詢?nèi)罩緮?shù)據(jù)集合。
?2)、數(shù)據(jù)格式
訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結(jié)果中的排名\t用戶點擊的順序號\t用戶點擊的URL
?
?
?
?
用戶ID是根據(jù)用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應(yīng)同一個用戶ID
?3)、數(shù)據(jù)下載:分為三個數(shù)據(jù)集,大小不一樣
迷你版(樣例數(shù)據(jù), 376KB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.mini.zip
精簡版(1天數(shù)據(jù),63MB):http://download.labs.sogou.com/dl/sogoulabdown/SogouQ/SogouQ.reduced.zip
完整版(1.9GB):http://www.sogou.com/labs/resource/ftp.php?dir=/Data/SogouQ/SogouQ.zip
?
業(yè)務(wù)需求
針對SougoQ用戶查詢?nèi)罩緮?shù)據(jù)中不同字段,不同業(yè)務(wù)進行統(tǒng)計分析:
?
使用SparkContext讀取日志數(shù)據(jù),封裝到RDD數(shù)據(jù)集中,調(diào)用Transformation函數(shù)和Action函數(shù)處理分析,靈活掌握Scala語言編程。
?
準(zhǔn)備工作
?????在編程實現(xiàn)業(yè)務(wù)功能之前,首先考慮如何對【查詢詞】進行中文分詞及將日志數(shù)據(jù)解析封裝。
HanLP?中文分詞
????使用比較流行好用中文分詞:HanLP,面向生產(chǎn)環(huán)境的自然語言處理工具包,HanLP 是由一系列模型與算法組成的 Java 工具包,目標(biāo)是普及自然語言處理在生產(chǎn)環(huán)境中的應(yīng)用。
官方網(wǎng)站:http://www.hanlp.com/,添加Maven依賴
<dependency><groupId>com.hankcs</groupId><artifactId>hanlp</artifactId><version>portable-1.7.7</version></dependency>
演示范例:HanLP 入門案例,基本使用
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import com.hankcs.hanlp.tokenizer.StandardTokenizerimport scala.collection.JavaConverters._/*** HanLP 入門案例,基本使用*/
object HanLPTest {def main(args: Array[String]): Unit = {// 入門Demoval terms: util.List[Term] = HanLP.segment("杰克奧特曼全集視頻")println(terms)println(terms.asScala.map(_.word.trim))// 標(biāo)準(zhǔn)分詞val terms1: util.List[Term] = StandardTokenizer.segment("放假++端午++重陽")println(terms1)println(terms1.asScala.map(_.word.replaceAll("\\s+", "")))val words: Array[String] ="""00:00:00 2982199073774412 ???[360安全衛(wèi)士] ??8 3 download.it.com.cn/softweb/software/firewall/antivirus/20067/17938.html""".split("\\s+")println(words(2).replaceAll("\\[|\\]", ""))//將"["和"]"替換為空""}}
?
???????樣例類 SogouRecord
將每行日志數(shù)據(jù)封裝到CaseClass樣例類SogouRecord中,方便后續(xù)處理:
/*** 用戶搜索點擊網(wǎng)頁記錄Record* @param queryTime ?訪問時間,格式為:HH:mm:ss* @param userId ????用戶ID* @param queryWords 查詢詞* @param resultRank 該URL在返回結(jié)果中的排名* @param clickRank ?用戶點擊的順序號* @param clickUrl ??用戶點擊的URL*/
case class SogouRecord(queryTime: String, userId: String, queryWords: String, resultRank: Int, clickRank: Int, clickUrl: String )
?
???????業(yè)務(wù)實現(xiàn)
先讀取數(shù)據(jù),封裝到SougoRecord類中,再按照業(yè)務(wù)處理數(shù)據(jù)。
最后也可以將分析的結(jié)果存儲到MySQL表中。
???????讀取數(shù)據(jù)
?????構(gòu)建SparkContext實例對象,讀取本次SogouQ.sample數(shù)據(jù),封裝到SougoRecord中 。
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地讀取SogouQ用戶查詢?nèi)罩緮?shù)據(jù)val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析數(shù)據(jù),封裝到CaseClass樣例類中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 過濾不合法數(shù)據(jù),如null,分割后長度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 對每個分區(qū)中數(shù)據(jù)進行解析,封裝到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 數(shù)據(jù)使用多次,進行緩存操作,使用count觸發(fā)recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()
?
???????搜索關(guān)鍵詞統(tǒng)計
獲取用戶【查詢詞】,使用HanLP進行分詞,按照單詞分組聚合統(tǒng)計出現(xiàn)次數(shù),類似WordCount程序,具體代碼如下:
// =================== 3.1 搜索關(guān)鍵詞統(tǒng)計?===================
// a. 獲取搜索詞,進行中文分詞
val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分詞庫進行分詞val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 將Java中集合對轉(zhuǎn)換為Scala中集合對象import scala.collection.JavaConverters._terms.asScala.map(_.word)})
})
println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 統(tǒng)計搜索詞出現(xiàn)次數(shù),獲取次數(shù)最多Top10
val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每個單詞出現(xiàn)一次.reduceByKey(_ + _) // 分組統(tǒng)計次數(shù).map(_.swap).sortByKey(ascending = false) // 詞頻降序排序.take(10) // 獲取前10個搜索詞
top10SearchWords.foreach(println)
運行結(jié)果如下:
?
?
???????用戶搜索點擊統(tǒng)計
統(tǒng)計出每個用戶每個搜索詞點擊網(wǎng)頁的次數(shù),可以作為搜索引擎搜索效果評價指標(biāo)。先按照用戶ID分組,再按照【查詢詞】分組,最后統(tǒng)計次數(shù),求取最大次數(shù)、最小次數(shù)及平均次數(shù)。
// =================== 3.2 用戶搜索點擊次數(shù)統(tǒng)計?===================
/*每個用戶在搜索引擎輸入關(guān)鍵詞以后,統(tǒng)計點擊網(wǎng)頁數(shù)目,反應(yīng)搜索引擎準(zhǔn)確度先按照用戶ID分組,再按照搜索詞分組,統(tǒng)計出每個用戶每個搜索詞點擊網(wǎng)頁個數(shù)*/
val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 獲取用戶ID和搜索詞val key = (record.userId, record.queryWords)(key, 1)})// 按照用戶ID和搜索詞組合的Key分組聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")
println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")
println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")
程序運行結(jié)果如下:
?
???????搜索時間段統(tǒng)計
按照【訪問時間】字段獲取【小時:分鐘】,分組統(tǒng)計各個小時段用戶查詢搜索的數(shù)量,進一步觀察用戶喜歡在哪些時間段上網(wǎng),使用搜狗引擎搜索,代碼如下:
// =================== 3.3 搜索時間段統(tǒng)計?===================
/*從搜索時間字段獲取小時,統(tǒng)計個小時搜索次數(shù)*/
val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小時和分鐘.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分組聚合.map((_, 1)) // 每個單詞出現(xiàn)一次.reduceByKey(_ + _) // 分組統(tǒng)計次數(shù).sortBy(_._2, ascending = false)
hourSearchRDD.foreach(println)
??程序運行結(jié)果如下:
?
???????完整代碼
業(yè)務(wù)實現(xiàn)完整代碼SogouQueryAnalysis如下所示:
package cn.itcast.coreimport java.utilimport com.hankcs.hanlp.HanLP
import com.hankcs.hanlp.seg.common.Term
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}/*** 用戶查詢?nèi)罩?SogouQ)分析,數(shù)據(jù)來源Sogou搜索引擎部分網(wǎng)頁查詢需求及用戶點擊情況的網(wǎng)頁查詢?nèi)罩緮?shù)據(jù)集合。* ????1. 搜索關(guān)鍵詞統(tǒng)計,使用HanLP中文分詞* ????2. 用戶搜索次數(shù)統(tǒng)計* ????3. 搜索時間段統(tǒng)計* 數(shù)據(jù)格式:* 訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結(jié)果中的排名\t用戶點擊的順序號\t用戶點擊的URL* 其中,用戶ID是根據(jù)用戶使用瀏覽器訪問搜索引擎時的Cookie信息自動賦值,即同一次使用瀏覽器輸入的不同查詢對應(yīng)同一個用戶ID*/
object SogouQueryAnalysis {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// TODO: 1. 本地讀取SogouQ用戶查詢?nèi)罩緮?shù)據(jù)val rawLogsRDD: RDD[String] = sc.textFile("data/input/SogouQ.sample")//val rawLogsRDD: RDD[String] = sc.textFile("D:/data/sogou/SogouQ.reduced")//println(s"Count = ${rawLogsRDD.count()}")// TODO: 2. 解析數(shù)據(jù),封裝到CaseClass樣例類中val recordsRDD: RDD[SogouRecord] = rawLogsRDD// 過濾不合法數(shù)據(jù),如null,分割后長度不等于6.filter(log => log != null && log.trim.split("\\s+").length == 6)// 對每個分區(qū)中數(shù)據(jù)進行解析,封裝到SogouRecord.mapPartitions(iter => {iter.map(log => {val arr: Array[String] = log.trim.split("\\s+")SogouRecord(arr(0),arr(1),arr(2).replaceAll("\\[|\\]", ""),arr(3).toInt,arr(4).toInt,arr(5))})})println("====解析數(shù)據(jù)===")println(s"Count = ${recordsRDD.count()},\nFirst = ${recordsRDD.first()}")// 數(shù)據(jù)使用多次,進行緩存操作,使用count觸發(fā)recordsRDD.persist(StorageLevel.MEMORY_AND_DISK).count()// TODO: 3. 依據(jù)需求統(tǒng)計分析/*1. 搜索關(guān)鍵詞統(tǒng)計,使用HanLP中文分詞2. 用戶搜索次數(shù)統(tǒng)計3. 搜索時間段統(tǒng)計*/println("====3.1 搜索關(guān)鍵詞統(tǒng)計===")// =================== 3.1 搜索關(guān)鍵詞統(tǒng)計?===================// a. 獲取搜索詞,進行中文分詞val wordsRDD: RDD[String] = recordsRDD.mapPartitions(iter => {iter.flatMap(record => {// 使用HanLP中文分詞庫進行分詞val terms: util.List[Term] = HanLP.segment(record.queryWords.trim)// 將Java中集合對轉(zhuǎn)換為Scala中集合對象import scala.collection.JavaConverters._terms.asScala.map(_.word)})})//println(s"Count = ${wordsRDD.count()}, Example = ${wordsRDD.take(5).mkString(",")}")// b. 統(tǒng)計搜索詞出現(xiàn)次數(shù),獲取次數(shù)最多Top10val top10SearchWords: Array[(Int, String)] = wordsRDD.map((_, 1)) // 每個單詞出現(xiàn)一次.reduceByKey(_ + _) // 分組統(tǒng)計次數(shù).map(_.swap).sortByKey(ascending = false) // 詞頻降序排序.take(10) // 獲取前10個搜索詞top10SearchWords.foreach(println)println("====3.2 用戶搜索點擊次數(shù)統(tǒng)計===")// =================== 3.2 用戶搜索點擊次數(shù)統(tǒng)計?===================/*每個用戶在搜索引擎輸入關(guān)鍵詞以后,統(tǒng)計點擊網(wǎng)頁數(shù)目,反應(yīng)搜索引擎準(zhǔn)確度先按照用戶ID分組,再按照搜索詞分組,統(tǒng)計出每個用戶每個搜索詞點擊網(wǎng)頁個數(shù)*/val clickCountRDD: RDD[((String, String), Int)] = recordsRDD.map(record => {// 獲取用戶ID和搜索詞val key = (record.userId, record.queryWords)(key, 1)})// 按照用戶ID和搜索詞組合的Key分組聚合.reduceByKey(_ + _)clickCountRDD.sortBy(_._2, ascending = false).take(10).foreach(println)println(s"Max Click Count = ${clickCountRDD.map(_._2).max()}")println(s"Min Click Count = ${clickCountRDD.map(_._2).min()}")println(s"Avg Click Count = ${clickCountRDD.map(_._2).mean()}")println("====3.3 搜索時間段統(tǒng)計===")// =================== 3.3 搜索時間段統(tǒng)計?===================/*從搜索時間字段獲取小時,統(tǒng)計個小時搜索次數(shù)*/val hourSearchRDD: RDD[(String, Int)] = recordsRDD// 提取小時和分鐘.map(record => {// 03:12:50record.queryTime.substring(0, 5)})// 分組聚合.map((_, 1)) // 每個單詞出現(xiàn)一次.reduceByKey(_ + _) // 分組統(tǒng)計次數(shù).sortBy(_._2, ascending = false)hourSearchRDD.foreach(println)// 釋放緩存數(shù)據(jù)recordsRDD.unpersist()// 應(yīng)用結(jié)束,關(guān)閉資源sc.stop()}/*** 用戶搜索點擊網(wǎng)頁記錄Record** @param queryTime ?訪問時間,格式為:HH:mm:ss* @param userId ????用戶ID* @param queryWords 查詢詞* @param resultRank 該URL在返回結(jié)果中的排名* @param clickRank ?用戶點擊的順序號* @param clickUrl ??用戶點擊的URL*/case class SogouRecord(queryTime: String,userId: String,queryWords: String,resultRank: Int,clickRank: Int,clickUrl: String)}
???????
總結(jié)
以上是生活随笔為你收集整理的2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(二十):Sp
- 下一篇: 2021年大数据Spark(二十二):内