Spark DStream相关操作
?
DStream上的操作與RDD的類似,分為Transformations(轉換)和Output Operations(輸出)兩種,此外轉換操作中還有一些比較特殊的操作,如:updateStateByKey()、transform()以及各種Window相關的操作。
1、Transformations
| map(func) | 對DStream中的各個元素進行func函數操作,然后返回一個新的DStream |
| flatMap(func) | 與map方法類似,只不過各個輸入項可以被輸出為零個或多個輸出項 |
| filter(func) | 過濾出所有函數func返回值為true的DStream元素并返回一個新的DStream |
| repartition(numPartitions) | 增加或減少DStream中的分區數,從而改變DStream的并行度 |
| union(otherStream) | 將源DStream和輸入參數為otherDStream的元素合并,并返回一個新的DStream. |
| count() | 通過對DStream中的各個RDD中的元素進行計數,然后返回只有一個元素的RDD構成的DStream |
| reduce(func) | 對源DStream中的各個RDD中的元素利用func進行聚合操作,然后返回只有一個元素的RDD構成的新的DStream. |
| countByValue() | 對于元素類型為K的DStream,返回一個元素為(K,Long)鍵值對形式的新的DStream,Long對應的值為源DStream中各個RDD的key出現的次數 |
| reduceByKey(func, [numTasks]) | 利用func函數對源DStream中的key進行聚合操作,然后返回新的(K,V)對構成的DStream |
| join(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream |
| cogroup(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream |
| transform(func) | 通過RDD-to-RDD函數作用于DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD |
| updateStateByKey(func) | 根據于key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream |
特殊的Transformations
-
UpdateStateByKey Operation?
UpdateStateByKey用于記錄歷史記錄,保存上次的狀態 -
Window Operations(開窗函數)?
滑動窗口轉換操作:?
滑動窗口轉換操作的計算過程如下圖所示,我們可以事先設定一個滑動窗口的長度(也就是窗口的持續時間),并且設定滑動窗口的時間間隔(每隔多長時間執行一次計算),然后,就可以讓窗口按照指定時間間隔在源DStream上滑動,每次窗口停放的位置上,都會有一部分DStream被框入窗口內,形成一個小段的DStream,這時,就可以啟動對這個小段DStream的計算。
(1)紅色的矩形就是一個窗口,窗口框住的是一段時間內的數據流。?
(2)這里面每一個time都是時間單元,在官方的例子中,每隔window size是3 time unit, 而且每隔2個單位時間,窗口會slide一次。?
所以基于窗口的操作,需要指定2個參數:
- window length - The duration of the window (3 in the figure)
- slide interval - The interval at which the window-based operation is performed (2 in the figure).?
a.窗口大小,一段時間內數據的容器。?
b.滑動間隔,每隔多久計算一次。
2、Output Operations
Output Operations可以將DStream的數據輸出到外部的數據庫或文件系統,當某個Output Operations被調用時(與RDD的Action相同),spark streaming程序才會開始真正的計算過程。
| print() | 打印到控制臺 |
| saveAsTextFiles(prefix, [suffix]) | 保存流的內容為文本文件,文件名為”prefix-TIME_IN_MS[.suffix]” |
| saveAsObjectFiles(prefix, [suffix]) | 保存流的內容為SequenceFile,文件名為 “prefix-TIME_IN_MS[.suffix]” |
| saveAsHadoopFiles(prefix, [suffix]) | 保存流的內容為hadoop文件,文件名為”prefix-TIME_IN_MS[.suffix]”. |
| foreachRDD(func) | 對Dstream里面的每個RDD執行func |
總結
以上是生活随笔為你收集整理的Spark DStream相关操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark RDD的缓存
- 下一篇: Spark RDD编程API