Spark Streaming 实战案例(二) Transformation操作
生活随笔
收集整理的這篇文章主要介紹了
Spark Streaming 实战案例(二) Transformation操作
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
本節(jié)主要內(nèi)容
本節(jié)部分內(nèi)容來(lái)自官方文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html
1. Transformation操作
| map(func) | 對(duì)DStream中的各個(gè)元素進(jìn)行func函數(shù)操作,然后返回一個(gè)新的DStream. |
| flatMap(func) | 與map方法類(lèi)似,只不過(guò)各個(gè)輸入項(xiàng)可以被輸出為零個(gè)或多個(gè)輸出項(xiàng) |
| filter(func) | 過(guò)濾出所有函數(shù)func返回值為true的DStream元素并返回一個(gè)新的DStream |
| repartition(numPartitions) | 增加或減少DStream中的分區(qū)數(shù),從而改變DStream的并行度 |
| union(otherStream) | 將源DStream和輸入?yún)?shù)為otherDStream的元素合并,并返回一個(gè)新的DStream. |
| count() | 通過(guò)對(duì)DStreaim中的各個(gè)RDD中的元素進(jìn)行計(jì)數(shù),然后返回只有一個(gè)元素的RDD構(gòu)成的DStream |
| reduce(func) | 對(duì)源DStream中的各個(gè)RDD中的元素利用func進(jìn)行聚合操作,然后返回只有一個(gè)元素的RDD構(gòu)成的新的DStream. |
| countByValue() | 對(duì)于元素類(lèi)型為K的DStream,返回一個(gè)元素為(K,Long)鍵值對(duì)形式的新的DStream,Long對(duì)應(yīng)的值為源DStream中各個(gè)RDD的key出現(xiàn)的次數(shù) |
| reduceByKey(func, [numTasks]) | 利用func函數(shù)對(duì)源DStream中的key進(jìn)行聚合操作,然后返回新的(K,V)對(duì)構(gòu)成的DStream |
| join(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類(lèi)型的DStream,返回一個(gè)新的(K,(V,W)類(lèi)型的DStream |
| cogroup(otherStream, [numTasks]) | 輸入為(K,V)、(K,W)類(lèi)型的DStream,返回一個(gè)新的 (K, Seq[V], Seq[W]) 元組類(lèi)型的DStream |
| transform(func) | 通過(guò)RDD-to-RDD函數(shù)作用于源碼DStream中的各個(gè)RDD,可以是任意的RDD操作,從而返回一個(gè)新的RDD |
| updateStateByKey(func) | 根據(jù)于key的前置狀態(tài)和key的新值,對(duì)key進(jìn)行更新,返回一個(gè)新?tīng)顟B(tài)的DStream |
具體示例:
//讀取本地文件~/streaming文件夾val lines = ssc.textFileStream(args(0))val words = lines.flatMap(_.split(" "))val wordMap = words.map(x => (x, 1))val wordCounts=wordMap.reduceByKey(_ + _)val filteredWordCounts=wordCounts.filter(_._2>1)val numOfCount=filteredWordCounts.count()val countByValue=words.countByValue()val union=words.union(word1)val transform=words.transform(x=>x.map(x=>(x,1))) //顯式原文件lines.print() //打印flatMap結(jié)果words.print() //打印map結(jié)果wordMap.print() //打印reduceByKey結(jié)果wordCounts.print() //打印filter結(jié)果filteredWordCounts.print() //打印count結(jié)果numOfCount.print() //打印countByValue結(jié)果countByValue.print() //打印union結(jié)果union.print() //打印transform結(jié)果transform.print()- 1
下面的代碼是運(yùn)行時(shí)添加的文件內(nèi)容
root@sparkmaster:~/streaming# echo "A B C D" >> test12.txt; echo "A B" >> test12.txt- 1
下面是前面各個(gè)函數(shù)的結(jié)果
------------------------------------------- lines.print() ------------------------------------------- A B C D A B------------------------------------------- flatMap結(jié)果 ------------------------------------------- A B C D A B------------------------------------------- map結(jié)果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)------------------------------------------- reduceByKey結(jié)果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- filter結(jié)果 ------------------------------------------- (B,2) (A,2)------------------------------------------- count結(jié)果 ------------------------------------------- 2------------------------------------------- countByValue結(jié)果 ------------------------------------------- (B,2) (D,1) (A,2) (C,1)------------------------------------------- union結(jié)果 ------------------------------------------- A B C D A B A B C D ...------------------------------------------- transform結(jié)果 ------------------------------------------- (A,1) (B,1) (C,1) (D,1) (A,1) (B,1)- 1
示例2:
上節(jié)課中演示的WordCount代碼并沒(méi)有只是對(duì)輸入的單詞進(jìn)行分開(kāi)計(jì)數(shù),沒(méi)有記錄前一次計(jì)數(shù)的狀態(tài),如果想要連續(xù)地進(jìn)行計(jì)數(shù),則可以使用updateStateByKey方法來(lái)進(jìn)行。下面的代碼主要給大家演示如何updateStateByKey的方法。
下圖是初始時(shí)的值:
使用下列命令啟動(dòng)netcat server
然后輸入
root@sparkmaster:~/streaming# nc -lk 9999 hello將得到下圖的結(jié)果
然后再輸入world,
root@sparkmaster:~/streaming# nc -lk 9999 hello world- 1
則將得到下列結(jié)果
總結(jié)
以上是生活随笔為你收集整理的Spark Streaming 实战案例(二) Transformation操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: arcgis api for jav
- 下一篇: Scala入门到精通——第九节 继承与组