SparkStreaming - 无状态与有状态 updataStateByKey
生活随笔
收集整理的這篇文章主要介紹了
SparkStreaming - 无状态与有状态 updataStateByKey
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
無狀態(tài)與有狀態(tài)
簡單來說,無狀態(tài)就是每個采集周期分別采集,并不會把前面的采集周期的數(shù)據(jù)一起計算
有狀態(tài)就是:把前面采集周期的也算進來,
比如wordcount,無狀態(tài)統(tǒng)計的就是每個采集周期內的個數(shù),有狀態(tài)的話是統(tǒng)計所有采集周期內的個數(shù)。
有狀態(tài)就是把前面的采集周期采集的數(shù)據(jù)存到緩存中,想要安全一些就設置檢查點存儲到磁盤,然后當前的DStream去和磁盤交互,一起統(tǒng)計出來。
package date_10_17_SparkStreamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}object upState {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("wordCount").setMaster("local[*]")val streamingContext = new StreamingContext(conf,Seconds(5))streamingContext.checkpoint("cp")//連接kafkaval kafkaStream = KafkaUtils.createStream(streamingContext,"chun1:2181","chun",Map("chun"->3))//wordcount運算val mapDStream = kafkaStream.flatMap(_._2.split(" ")).map((_,1))//有狀態(tài)val resultDStream:DStream[(String,Int)] = mapDStream.updateStateByKey {case (seq, buffer) => {val sum = buffer.getOrElse(0) + seq.sumOption(sum)}}resultDStream.print()//啟動采集器streamingContext.start()//等待采集器關閉才關閉DriverstreamingContext.awaitTermination()} }
總結
以上是生活随笔為你收集整理的SparkStreaming - 无状态与有状态 updataStateByKey的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 支持任亏券!《异度之刃3》公布全新预告:
- 下一篇: 四边等宽的Nothing Phone 1