SparkStreaming - 窗口函数(窗口操作)
生活随笔
收集整理的這篇文章主要介紹了
SparkStreaming - 窗口函数(窗口操作)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
窗口操作就是把多個采集周期設置成一個窗口,一起來計算,然后進行滑動,根據設置的滑動大小。
窗口大小和滑動大小,要是采集周期的倍數
package date_10_17_SparkStreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtilsobject ss {def main(args: Array[String]): Unit = {//Scala中的窗口 // val ints = List(1,2,3,4,5) // // val ites = ints.sliding(2,2) // // for (list <- ites){ // println(list.mkString(",")) // }//SparkStreaming窗口val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]")val streamingContext = new StreamingContext(conf,Seconds(3))streamingContext.checkpoint("cp")//連接kafkaval kafkaStream = KafkaUtils.createStream(streamingContext,"chun1:2181","chun",Map("chun"->3))//一個是窗口大小和滑動大小,要是采集周期的倍數val windowDStream = kafkaStream.window(Seconds(6),Seconds(3))//wordcount運算val mapDStream = windowDStream.flatMap(_._2.split(" ")).map((_,1))val resultDStream = mapDStream.reduceByKey(_+_)resultDStream.print()//啟動采集器streamingContext.start()//等待采集器關閉才關閉DriverstreamingContext.awaitTermination()}}總結
以上是生活随笔為你收集整理的SparkStreaming - 窗口函数(窗口操作)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 以目前的银行存款利率,在银行存多少钱,可
- 下一篇: 今年二季度全球贸易量大跌,我国的对外贸易