RDD持久化、广播、累加器
1、持久化
RDD的持久化包括兩個方面:①操作RDD的時候怎么保存結果,這個部分屬于action算子的部分②在實現算法的時候要進行cache、persist,還有checkpoint進行持久化。
1.1 persist和cache
Spark稍微復雜一點的算法里面都會有persit的身影,因為spark默認情況下是放在內存中,比較適合高速的迭代,如一個Stage有步驟非常多,中間不會產生臨時數據,對于高速迭代是非常好的事情,但是對于分布式文件系統風險非常高,容易出錯,這個時候就涉及到容錯,由于RDD有血統繼承關系,后面的RDD如果數據分片出錯或者RDD本身出錯之后可以根據前面的依賴血統關系算出來,但是如果沒有對父RDD進行persist或cache還是要從頭開始做。
首先先看下StorageLevel類,里面設置了RDD的各種緩存級別,總共有12種,其實是它的多個構造參數的組合形成的。
cache源碼如下:
由源碼可知,cache方法實際上調用了無參數的persist方法,緩存級別為僅在內存中。
persist源碼如下:
無參的persist方法,默認緩存級別為僅在內存中
其實有2種情況:①是我們之前對RDD調用了checkpoint方法,這個方法是把RDD存儲到disk上,之后我們再調用persist(newLevel)方法也是不會報錯的,他會做檢查你是否執行過checkpoint方法(即isLocallyCheckpointed),如果是的話就會調用persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true),而這里LocalRDDCheckpointData.transformStorageLevel(newLevel)返回的緩存級別是disk級別,故不會報錯②如果我們之前設置過RDD的緩存級別,現在再次調用此方法進行緩存級別設置,但是緩存級別與之前一樣,程序也是不會報錯的,因為里面調用了persist(newLevel, allowOverride = false)方法
這個persist方法,適用于之前我們設置過了RDD的緩存級別,現在想要修改RDD的緩存級別的情況,只需要把allowOverride設置為true
這一段程序也解釋了上面第一種方法的第一個特殊情況為什么不會報錯
總結:①cache方法其實是persist方法的一個特例:調用的是無參數的persist(),代表緩存級別是僅內存的情況②persist方法有三種,分為默認無參數僅內存級別的persist(),還有persist(newLevel):這個方法需要之前對RDD沒有設置過緩存級別,persist(newLevel,allowOverride):這個方法適用于之前對RDD設置過緩存級別,但是想更改緩存級別的情況。③取消緩存統一使用unpersist()方法④persist是lazy級別的(前面的算子都是lazy的每執行,所以他肯定也要是lazy級別的),unpersist是eager級別的(即調用的時候會立即清除)。
注意:①cache之后一定不能立即有其他算子,cache后有算子的話,它每次都會重新觸發這個計算過程,cache不是一個action;②cache被gc清除的兩種方式:unpersist強制銷毀數據;會被后續的計算結果擠掉
緩存實現的原理:DiskStore磁盤存儲和MemoryStore內存存儲
DiskStore磁盤存儲:spark會在磁盤上創建spark文件夾,命名為(spark-local-x年x月x日時分秒-隨機數),block塊都會存在這里,然后把block id映射成相應的文件路徑,就可以存取文件了
MemoryStore內存存儲:使用hashmap管理block就行了,block id作為key,MemoryEntry為value
1.2?做緩存的時機
(1)計算特別耗時;
(2)計算鏈條很長,失敗的時候會有很大的代價:假設900個步驟在第800個步驟緩存,801的步驟失敗了就會在800個步驟開始恢復;
(3)shuffle之后:shuffle是進行分發數據,緩存之后假設后面失敗就不需要重新shuffle;
(4)checkpoint之前:checkpoint是把整個數據放到分布是文件系統中或磁盤,checkpoint是在當前作業執行之后,再觸發一個作業,恢復時前面的步驟就不需要計算
緩存是不一定可靠的,緩存在內存中不一定是可靠的,把數據緩存在內存中有可能會丟失,例如只緩存在內存中,而不同時放在內存和磁盤上,可能內存crash(奔潰),crash內存現在有一種辦法就是用Tachyon做底層存儲,但是使用checkpoint的數數據一定放在文件系統上,這個時候數據就不會丟失。假設緩存了100萬個數據分片,開始緩存是成功的,由于內存的緊張在一些機器上把一些數據分片清理掉了,那這時候就需要重新計
(5)shuffle之前persist(不過框架已經默認幫我們把數據持久化到本地磁盤)
//cache實例 val?cached=sc.textFile("G:\\Scala\\data\\README.md").flatMap(_.split("?")).map(_=>(_,1)).reduceByKey(_+_,1).cache cached.count2?廣播BroadCast
為什么需要廣播?每個task運行,讀取全局數據的時候每個task每次都要拷貝一個數據副本,他的好處就是狀態一致性,不好的就是耗大量的內存,變量大就容易OOM,這種是非常嚴重的,必須全部放在內存上,不能一部分放磁盤上。
廣播過去的全局只讀不能修改的,廣播到worker的executor內存中。廣播變量不需要銷毀,應用程序存在他就存在,sc銷毀它也就銷毀了。
總結:廣播是由Driver發送給當前Application分配的所有Executor內存級別的全局只讀變量,Executor中的線程池中的線程共享該全局變量,極大減少網絡傳輸(否則每個task都要傳輸一次該變量),并極大的節省內存,減少OOM的可能,當然也隱形的提高CPU的有效工作(因為每次傳CPU也很忙)
//廣播實例: val?number?=6 val broadcastVar = sc.broadcast(number?) //廣播只能由broadcast廣播 val?data=sc.parallelize(1?to?10)//創建RDD,由Task使用廣播變量 val?bn=data.map(_*broadcastVar .value)3?累加器Accumulator
為什么需要累加器?累加器的特征:是全局級別的,且Executor中的task只能修改(增加內容),只有driver可讀,因為我們通過driver控制整個集群有必要知道整個集群的狀態。(對于Executor只能修改但不可讀,只對driver可讀)。Executor修改一定不會彼此覆蓋相當于加鎖了。
因為他的特性,在記錄集群狀態的時候,尤其是全局唯一狀態的時候至關重要,可以保存唯一的全局變量。
累加器原理:由于被driver控制,在實際task運行的時候,每次都可以保證只對driver可讀獲取全局唯一的狀態對象。
總結:①累加器是全局唯一的,每次操作只增不減②在executor中只能修改他,也就是只能增加他的值。
可以認為Broadcast是線程級別全局共享,累加器是executor全局共享
//累加器實例: val acc = sc.accumulator(0)? val?data=sc.parallelize(1?to?10) val?result=data.foreach(item=>acc +=item) println(result)?
總結
以上是生活随笔為你收集整理的RDD持久化、广播、累加器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ADS1.2开发环境创建与简要介绍
- 下一篇: Python基础(五)--函数