Spark基础脚本入门实践2:基础开发
?
1、最基本的Map用法
val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
val result = distData.map(x=>x*x)
println(result.collect().mkString(","))
其中最關鍵的操作就是:從分布式數據集 --轉換--> 并行數據集
from a distributed dataset to Parallelized collections
Spark分布式數據集包含:
- local file system
- HDFS
- Cassandra
- HBase
- Amazon S3
Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.
- 比如文件:val distFile = sc.textFile("data.txt")
- 比如hdfs:hdfs://
- 比如s3:s3n://
讀取文件時需要注意的是:
- 如果使用的是本地文件路徑,那么worker節點一定是有訪問權限的.
- 文本文件的訪問方式: textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz").
- hdfs系統會把文件按128MB進行分區
2、從外部文件系統獲取數據
val lines = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
val lineLengths = lines.map(s => s.length)
val totalLength = lineLengths.reduce((a, b) => a + b)
3、flatMap用法
flatMap的做法有點象把迭代器拍扁拍碎,比如以下代碼
val lines = sc.parallelize(List("hi man","ha girl"))
val wordsFlatmap = lines.flatMap(line=>line.split(" "))
val wordsMap = lines.map(line=>line.split(" "))
看看區別:
scala> wordsFlatmap.first
res9: String = hi
scala> wordsMap.first
res10: Array[String] = Array(hi, man)
實際上wordsFlatmap被拆成了4個string,而wordsMap是對輸入的list每個元素進行了split操作,所以說flatMap的做法有點象把迭代器拍扁拍碎。比如說分詞就容易用flatMap
4、笛卡爾積
在推薦系統中,要計算各用戶對多個產品的興趣度,就可以制作一個笛卡爾積,用于比較用戶的的喜愛產品的相似度。
val man = sc.parallelize(List("Tom","Cat"))
val product = sc.parallelize(List("car","iphone","android","surfacePro"))
val result = man.cartesian(product)
result.collect
運行結果:
res0: Array[(String, String)] = Array((Tom,car), (Tom,iphone), (Tom,android), (Tom,surfacePro), (Cat,car), (Cat,iphone), (Cat,android), (Cat,surfacePro))
笛卡兒計算是很恐怖的,它會迅速消耗大量的內存,所以在使用這個函數的時候請小心
5、cache操作
在spark中使用cache是非常重要的,因為行動操作都是惰性求值,每次都會重新計算所有的依賴,如果有大量迭代,代價巨大。
緩存就可以從內容讀取,無需再次計算
scala> var data = sc.parallelize(List(1,2,3,4))
data: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[44] at parallelize at <console>:12
scala> data.getStorageLevel
res65: org.apache.spark.storage.StorageLevel =
StorageLevel(false, false, false, false, 1)
scala> data.cache
res66: org.apache.spark.rdd.RDD[Int] =
ParallelCollectionRDD[44] at parallelize at <console>:12
scala> data.getStorageLevel
res67: org.apache.spark.storage.StorageLevel =
StorageLevel(false, true, false, true, 1)
我們先是定義了一個RDD,然后通過getStorageLevel函數得到該RDD的默認存儲級別,這里是NONE。然后我們調用cache函數,將RDD的存儲級別改成了MEMORY_ONLY(看StorageLevel的第二個參數)
6、檢查點
將生成的RDD保存到外部可靠的存儲當中,對于一些數據跨度為多個bactch的有狀態tranformation操作來說,checkpoint非常有必要,因為在這些transformation操作生成的RDD對前一RDD有依賴,隨著時間的增加,依賴鏈可能會非常長,checkpoint機制能夠切斷依賴鏈,將中間的RDD周期性地checkpoint到可靠存儲當中,從而在出錯時可以直接從checkpoint點恢復。
val data = sc.parallelize(1 to 100 , 5)
sc.setCheckpointDir("/myCheckPoint")
data.checkpoint
data.count
7、cogroup組合
將多個RDD中同一個Key對應的Value組合到一起。
scala> val data1 = sc.parallelize(List((1, "www"), (2, "bbs")))
scala> val data2 = sc.parallelize(List((1, "iteblog"), (2, "iteblog"), (3, "very")))
scala> val data3 = sc.parallelize(List((1, "com"), (2, "com"), (3, "good")))
scala> val result = data1.cogroup(data2, data3)
scala> result.collect
res30: Array[(Int, (Iterable[String], Iterable[String], Iterable[String]))] =
Array((1,(CompactBuffer(www),CompactBuffer(iteblog),CompactBuffer(com))),
(2,(CompactBuffer(bbs),CompactBuffer(iteblog),CompactBuffer(com))),
(3,(CompactBuffer(),CompactBuffer(very),CompactBuffer(good))))
8、廣播變量
廣播變量是通過調用sparkcontext從變量v創建。廣播變量是V的包裝器,它的值可以通過調用值方法來訪問。下面的代碼顯示了這一點:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
在創建廣播變量之后,應該使用它在集群上運行的任何函數中代替V值,這樣v就不會不止一次地發送到節點。此外,對象v在廣播之后不應該被修改,以確保所有節點獲得相同的廣播變量值(例如,如果變量稍后被運送到新節點)。
9、累加器
累加器一般用來累計和計數
val accum = sc.longAccumulator("My Accumulator")
//計數
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(1))
accum.value
res1: Long = 4
//累加
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
accum.value
res2: Long = 10
總結
以上是生活随笔為你收集整理的Spark基础脚本入门实践2:基础开发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux ed 命令的用法
- 下一篇: 解决MVN install一直处于下载j