谈一谈RDD 持久化的三个算子:cache、persist、checkpoint
這段偽代碼的瑕疵:
lines = sc.textFile(“hdfs://...”) errors = lines.filter(_.startsWith(“ERROR”)) mysql_errors = errors.filter(_.contain(“MySQL”)).count http_errors = errors.filter(_.contain(“Http”)).counterrors 是一個 RDD,mysql_errors 這個 RDD 執行時,會先讀文件,然后獲取數據,通過計算 errors,把數據傳給 mysql_errors,再進行計算,因為 RDD 中是不存儲數據的,所以 http_errors 計算的時候會重新讀數據,計算 errors 后把數據傳給 http_errors 進行計算,重復使用 errors 這個 RDD 很有必須,這就需要把 errors 這個 RDD 持久化,以便其他 RDD使用。
RDD 持久化有三個算子:cache、persist、checkpoint
- cache:把 RDD 持久化到內存
使用方法:
var rdd = sc.textFile("test") rdd = rdd.cache() val count = rdd.count() //或者其他操作- persist:可以選擇多種持久化方式
使用方法:
var rdd = sc.textFile("test") rdd = rdd.persist(StorageLevel.MEMORY_ONLY) val count = rdd.count() //或者其他操作
Persist StorageLevel 說明:
初始化 StorageLevel 可以傳入 5 個參數,分別對應是否存入磁盤、是否存入內存、是否使用堆外內存、是否不進行序列化,副本數(默認為 1)
使用不同參數的組合構造的實例被預先定義為一些值,比如 MEMORY_ONLY 代表著不存入磁盤,存入內存,不使用堆外內存,不進行序列化,副本數為 1,使用 persisit()方法時把這些持久化的級別作為參數傳入即可,cache()與 persist( StorageLevel. MEMORY_ONLY)是等價的。
cache 和 persist 的注意事項
1. ?cache 和 persist 是懶執行算子,需要有一個 action 類的算子觸發執行
2. ?cache 和 persist 算子的返回執行必須賦值給一個變量,在接下來的 job 中直接使用這
個變量,那么就是使用了持久化的數據了,如果 application 中只有一個 job,沒有必要
使用 RDD 持久化
3. ?cache 和 persist 算子后不能立即緊跟 action 類算子,比如 count 算子,但是在下一行
可以有 action 類算子
error : cache().count()
right : rdd = rdd.cache() rdd.count()
4. ?cache() = persist(StorageLevel.MEMORY_ONLY)
- checkpoint : 可以把 RDD 持久化到 HDFS,同時切斷 RDD 之間的依賴
使用方法:
sc.setCheckpointDir("hdfs://...") var rdd = sc.textFile("test") rdd.checkpoint() val count = rdd.count() //或者其他操作對于切斷 RDD 之間的依賴的說明:
當業務邏輯很復雜時,RDD 之間頻繁轉換,RDD 的血統很長,如果中間某個 RDD 的數據丟失,還需要重新從頭計算,如果對中間某個 RDD 調用了 checkpoint()方法,把這個RDD 上傳到 HDFS,同時讓后面的 RDD 不再依賴于這個 RDD,而是依賴于 HDFS 上的數據,那么下次計算會方便很多。
checkpoint()執行原理:
1. ?當 RDD 的 job 執行完畢后,會從 finalRDD 從后往前回溯
2. ?當回溯到調用了 checkpoint()方法的 RDD 后,會給這個 RDD 做一個標記
3. ?Spark 框架自動啟動一個新的 job,計算這個 RDD 的數據,然后把數據持久化到 HDFS上
4. ?優化:對某個 RDD 執行 checkpoint()之前,對該 RDD 執行 cache(),這樣的話,新啟動的 job 只需要把內存中的數據上傳到 HDFS 中即可,不需要重新計算
總結
以上是生活随笔為你收集整理的谈一谈RDD 持久化的三个算子:cache、persist、checkpoint的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 简单说一下kafka 与其他消息队列
- 下一篇: Spark对Kafka两种连接方式的对比