reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)
生活随笔
收集整理的這篇文章主要介紹了
reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
package SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
? *
? * 基于滑動窗口的熱點搜索詞實時統計
? * 每隔5秒鐘,統計最近20秒鐘的搜索詞的搜索頻次,
? * 并打印出排名最靠前的3個搜索詞以及出現次數
? *
? */
object WindowDemo {
? def main(args: Array[String]): Unit = {
? ? val conf=new SparkConf().setAppName("WindowDemo")
? ? ? ? ? ? ? ? .setMaster("local[2]")
? ? val ssc=new StreamingContext(conf,Seconds(5))
? ? //從nc服務中獲取數據,數據格式:name word,比如:張三 大數據
? ? val linesDStream=ssc.socketTextStream("tgmaster",9999)
? ? //將數據中的搜索詞取出
? ? val wordsDStream=linesDStream.map(_.split(" ")(1))
? ? //通過map算子,將搜索詞形成鍵值對(word,1),將搜索詞記錄為1次
? ? val searchwordDStream=wordsDStream.map(searchword=>(searchword,1))
? ? //通過reduceByKeyAndWindow算子,每隔5秒統計最近20秒的搜索詞出現的次數
? ? val reduceDStream=searchwordDStream.reduceByKeyAndWindow(
? ? ? (v1:Int,v2:Int)=>
? ? ? ? v1+v2,Seconds(20),Seconds(5)
? ? )
? ? //調用DStream中的transform算子,可以進行數據轉換
? ? val transformDStream=reduceDStream.transform(searchwordRDD=>{
? ? ? val result=searchwordRDD.map(m=>{ ?//將key與value互換位置
? ? ? ? (m._2,m._1)
? ? ? }).sortByKey(false) //根據key進行降序排列
? ? ? ? .map(m=>{ //將key與value互換位置
? ? ? ? (m._2,m._1)
? ? ? }).take(3) //取前3名
? ? ? for(elem<-result){
? ? ? ? println(elem._1+" ?"+elem._2)
? ? ? }
? ? ? searchwordRDD //注意返回值
? ? })
? ? transformDStream.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
? *
? * 基于滑動窗口的熱點搜索詞實時統計
? * 每隔5秒鐘,統計最近20秒鐘的搜索詞的搜索頻次,
? * 并打印出排名最靠前的3個搜索詞以及出現次數
? *
? */
object WindowDemo {
? def main(args: Array[String]): Unit = {
? ? val conf=new SparkConf().setAppName("WindowDemo")
? ? ? ? ? ? ? ? .setMaster("local[2]")
? ? val ssc=new StreamingContext(conf,Seconds(5))
? ? //從nc服務中獲取數據,數據格式:name word,比如:張三 大數據
? ? val linesDStream=ssc.socketTextStream("tgmaster",9999)
? ? //將數據中的搜索詞取出
? ? val wordsDStream=linesDStream.map(_.split(" ")(1))
? ? //通過map算子,將搜索詞形成鍵值對(word,1),將搜索詞記錄為1次
? ? val searchwordDStream=wordsDStream.map(searchword=>(searchword,1))
? ? //通過reduceByKeyAndWindow算子,每隔5秒統計最近20秒的搜索詞出現的次數
? ? val reduceDStream=searchwordDStream.reduceByKeyAndWindow(
? ? ? (v1:Int,v2:Int)=>
? ? ? ? v1+v2,Seconds(20),Seconds(5)
? ? )
? ? //調用DStream中的transform算子,可以進行數據轉換
? ? val transformDStream=reduceDStream.transform(searchwordRDD=>{
? ? ? val result=searchwordRDD.map(m=>{ ?//將key與value互換位置
? ? ? ? (m._2,m._1)
? ? ? }).sortByKey(false) //根據key進行降序排列
? ? ? ? .map(m=>{ //將key與value互換位置
? ? ? ? (m._2,m._1)
? ? ? }).take(3) //取前3名
? ? ? for(elem<-result){
? ? ? ? println(elem._1+" ?"+elem._2)
? ? ? }
? ? ? searchwordRDD //注意返回值
? ? })
? ? transformDStream.print()
? ? ssc.start()
? ? ssc.awaitTermination()
? }
}
總結
以上是生活随笔為你收集整理的reduceByKeyAndWindow基于滑动窗口的热点搜索词实时统计(Scala版本)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 音视频OSD——修改叠加信息的位置
- 下一篇: 反素数概念及理解