2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
目錄
SparkStreaming實戰案例二 UpdateStateByKey
需求
1.updateStateByKey
2.mapWithState
代碼實現
SparkStreaming實戰案例二 UpdateStateByKey
需求
對從Socket接收的數據做WordCount并要求能夠和歷史數據進行累加!
如:
先發了一個spark,得到spark,1
然后不管隔多久再發一個spark,得到spark,2
也就是說要對數據的歷史狀態進行維護!
?
?
注意:可以使用如下API對狀態進行維護
1.updateStateByKey
統計全局的key的狀態,但是就算沒有數據輸入,他也會在每一個批次的時候返回之前的key的狀態。假設5s產生一個批次的數據,那么5s的時候就會更新一次的key的值,然后返回。
這樣的缺點就是,如果數據量太大的話,而且我們需要checkpoint數據,這樣會占用較大的存儲。
如果要使用updateStateByKey,就需要設置一個checkpoint目錄,開啟checkpoint機制。因為key的state是在內存維護的,如果宕機,則重啟之后之前維護的狀態就沒有了,所以要長期保存它的話需要啟用checkpoint,以便恢復數據。
?
2.mapWithState
也是用于全局統計key的狀態,但是它如果沒有數據輸入,便不會返回之前的key的狀態,有一點增量的感覺。
這樣做的好處是,我們可以只是關心那些已經發生的變化的key,對于沒有數據輸入,則不會返回那些沒有變化的key的數據。這樣的話,即使數據量很大,checkpoint也不會像updateStateByKey那樣,占用太多的存儲。
?
代碼實現
package cn.itcast.streamingimport org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** 使用SparkStreaming接收Socket數據,node01:9999* 對從Socket接收的數據做WordCount并要求能夠和歷史數據進行累加!* 如:* 先發了一個spark,得到spark,1* 然后不管隔多久再發一個spark,得到spark,2* 也就是說要對數據的歷史狀態進行維護!*/
object SparkStreamingDemo02_UpdateStateByKey {def main(args: Array[String]): Unit = {//1.創建環境val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(conf)sc.setLogLevel("WARN")//batchDuration the time interval at which streaming data will be divided into batches//流數據將被劃分為批的時間間隔,就是每隔多久對流數據進行一次微批劃分!val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))// The checkpoint directory has not been set. Please set it by StreamingContext.checkpoint()//注意:因為涉及到歷史數據/歷史狀態,也就是需要將歷史數據/狀態和當前數據進行合并,作為新的Value!//那么新的Value要作為下一次的歷史數據/歷史狀態,那么應該搞一個地方存起來!//所以需要設置一個Checkpoint目錄!ssc.checkpoint("./ckp")//2.接收socket數據val linesDS: ReceiverInputDStream[String] = ssc.socketTextStream("node1",9999)//3.做WordCount//======================updateStateByKey=======================//val 函數名稱?:(參數類型)=>函數返回值類型?= (參數名稱:參數類型)=>{函數體}//參數1:Seq[Int]:當前批次的數據,如發送了2個spark,那么key為spark,參數1為:Seq[1,1]//參數2:Option[Int]:上一次該key的歷史值!注意:歷史值可能有可能沒有!如果沒有默認值應該為0,如果有就取出來//返回值:Option[Int]:當前批次的值+歷史值!//Option表示:可能有Some可能沒有Noneval updateFunc= (currentValues:Seq[Int],historyValue:Option[Int])=>{//將當前批次的數據和歷史數據進行合并作為這一次新的結果!if (currentValues.size > 0) {val newValue: Int = currentValues.sum + historyValue.getOrElse(0)//getOrElse(默認值)Option(newValue)}else{historyValue}}val resultDS: DStream[(String, Int)] = linesDS.flatMap(_.split(" ")).map((_, 1))//.reduceByKey(_ + _)// updateFunc: (Seq[V], Option[S]) => Option[S].updateStateByKey(updateFunc)//======================mapWithState=======================//Spark 1.6提供新的狀態更新函數【mapWithState】,mapWithState函數也會統計全局的key的狀態,//但是如果沒有數據輸入,便不會返回之前的key的狀態,只是關心那些已經發生的變化的key,對于沒有數據輸入,則不會返回那些沒有變化的key的數據。val mappingFunc = (word: String, current: Option[Int], state: State[Int]) => {val newCount = current.getOrElse(0) + state.getOption.getOrElse(0)val output = (word, newCount)state.update(newCount)output}val resultDS2 = linesDS.flatMap(_.split(" ")).map((_, 1)).mapWithState(StateSpec.function(mappingFunc))//4.輸出resultDS.print()resultDS2.print()//5.啟動并等待程序停止// 對于流式應用來說,需要啟動應用ssc.start()// 流式應用啟動以后,正常情況一直運行(接收數據、處理數據和輸出數據),除非人為終止程序或者程序異常停止ssc.awaitTermination()// 關閉流式應用(參數一:是否關閉SparkContext,參數二:是否優雅的關閉)ssc.stop(stopSparkContext = true, stopGracefully = true)}
}
?
總結
以上是生活随笔為你收集整理的2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(三十六):S
- 下一篇: 2021年大数据Spark(三十九):S