RDD编程 下(Spark自学四)
3.5 常見的轉化操作和行動操作
3.5.1 基本RDD
1. 針對各個元素的轉化操作
兩個最常用的轉化操作是map()和filter()。轉化操作map()接受一個函數,把這個函數用于RDD中的每個元素,將函數的返回結果作為結果RDD中對應元素的值。而轉化操作filter()則接收一個函數,并將RDD中滿足該函數的元素放入新的RDD中返回。
inputRDD{1,2,3,4} >>>map x=>x*x >>> Mapped RDD{1,4,9,16}
inputRDD{1,2,3,4} >>>filter x=>x!=1 >>> Filtered RDD{2,3,4}
例 3-26:Python版計算RDD中各值的平方
nums = sc.parallelize([1,2,3,4]) squared = nums.map(lambda x : x*x) for num in squared:print "%i "%(num)例 3-27:Scala版計算RDD中各值的平方
val input = sc.parallelize(List(1,2,3,4)) val result = input.map(x => x*x) println(result.collect().mkString(","))例 3-29: Python中的flatMap()將行數據切分為單詞
>>> lines = sc.parallelize(["hello world","hi"]) >>> words1 = lines.flatMap(lambda line: line.split(" ")) >>> words2 = lines.map(lambda line : line.split(" ")) >>> words1.first() 'hello' >>> words2.first() ['hello', 'world']例 3-30: Scala中的flatMap()將行數據切分為單詞
val lines = sc.parallelize(["hello world", "hi"]) words = lines.flatMap(line=> line.split(" ")) words.first()2. 偽集合操作
RDD支持許多數學上的集合操作,比如合并和相交。注意,這些操作都要求操作的RDD、是相同數據類型的。
RDD.distinct()生成一個只包含不同元素的新RDD。
union(other)操作返回一個包含兩個RDD中所有元素的RDD。
intersection(other)方法只返回兩個RDD中都有的元素。
subtract(other)函數接受另一個RDD作為參數,返回一個由只存在于第一個RDD中而不存在第二個RDD中的所有元素組成的RDD。
cartesian(other)轉化操作會返回所有可能的(a,b)對,其中a是源RDD中的元素,b來自另一個RDD。
3. 行動操作
例 3-32:Python中的reduce()
>>> nums = sc.parallelize([1,2,3,4]) >>> sums = nums.reduce(lambda x, y:x + y) >>> sums 10例3-33:Scala中的reduce()
val sum = rdd.reduce((x, y) => x+y)RDD的一些行動操作會以普通集合或者值的形式將RDD的部分或全部數據返回驅動器程序中。
collect()操作會將整個RDD的內容返回。
take(n)返回RDD中的n個元素。
top(n)從RDD中獲取前n個元素。
foreach(func)對RDD中的每個元素使用給定的函數。
count()用來返回元素個數。
3.5.2 在不同RDD類型間轉換
略
3.6 持久化(緩存)
Spark RDD是惰性求值的,而有時我們希望能多次使用同一個RDD。如果簡單的對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。
例3-39: Scala中的兩次執行
val result = input.map(x => x*x) println(result.count()) println(result.collect().mkString(","))為了避免多次計算同一個RDD, 可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。如果一個有持久化數據的節點發生故障,Spark會在需要用到緩存數據時重算丟失的數據分區。
例3-40: 在Scala中使用persist()
val result = input.map(x => x*x) result.persist(StorageLevel.DISK_ONLY) println(result.count()) println(result.collect().mkString(","))注意,persist()調用本身不會引發強制求值。
RDD還有一個方法叫做unpersist(),調用該方法可以手動把持久化的RDD從緩存中移除。
?
轉載于:https://www.cnblogs.com/zhangtianyuan/p/7688104.html
總結
以上是生活随笔為你收集整理的RDD编程 下(Spark自学四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 树网的核 Vijos1362 NOIP2
- 下一篇: Linux系统学习----前言