Spark 分布式计算原理
生活随笔
收集整理的這篇文章主要介紹了
Spark 分布式计算原理
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Spark 分布式計算原理
Spark Shuffle
1)在數據之間重新分配數據
2)(將父RDD重新定義進入子RDD)每一個分區里面的數據要重新進入新的分區
3)每一個shuffle階段盡量保存在內存里面,如果保存不下到磁盤
4)在每個shuffle階段不會改變分區的數量
RDD的依賴關系-1(lineage)
1) 寬依賴:一個夫RDD的分區被子RDD的多個分區使用
發生寬依賴一定shuffle()
(相當于超生)
2) 窄依賴:一個父RDD的分區被子RDD的一個分區使用
?
RDD的依賴關系-2(lineage)?? 寬依賴對比窄依賴
?
DAG工作原理
- 根據RDD之間的依賴關系,形成一個DAG(有向無環)
1)從后往前,遇到寬依賴切割為新的Stage
2)每個Stage由分區一組并行的Task組成
每個Task共享歸類內存,堆外內存Task數據在進行交換
提前聚合,避免shuffle,將數據先進行去重
RDD持久化-1
cache:
- 間數據寫入緩存
- cache()不能再有其他的算子
val rdd=sc.makeRDD(1 to 10)
val rdd2=rdd.map(x=>{println(x);x}
rdd2.cache
rdd2.collect
RDD共享變量-1
- 廣播變量(要定義的是Array)
val rdd=sc.makeRDD(1 to 10)
val j=sc.broadcast(Array(0))
rdd.map(x=>{j.value(0)=j.value(0)+1;println(j.value(0));x}).collect
RDD共享變量-2
- 累加器:只允許added操作,常用于實現計數
val accum = sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value
RDD分區設計
- 分區大小限制為2GB
分區太小
1)分區承擔的責任越大,內存壓力越大
分區過多
1)shuffle開銷越大
2)創建任務開銷越大
裝載CSV數據源
方法一:使用SparkContext
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3")//防止hadoop報錯val conf=new SparkConf().setMaster("local[2]").setAppName("hello")val sc=SparkContext.getOrCreate(conf)
val lines = sc.textFile("file:///d:/users.csv")
val fields = lines.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).map(l => l.split(",")).foreach(x=>println(x.toList))
————————————————————————————————————————————————————————————————————————————————————————————————
方法二:使用SparkSession
val df = spark.read.format("csv").option("header", "true").load("file:///home/kgc/data/users.csv")
裝載JSON數據源
方法一:使用SparkContext
val lines = sc.textFile("file:///home/kgc/data/users.json")
//scala內置的JSON庫
import scala.util.parsing.json.JSON
val result=lines.map(l=>JSON.parseFull(l))
————————————————————————————————————————————————————————————————————————————————————————————————
還有一種使用SparkSession方法:API
var spark=SparkSession.builder().master("local[2]").appName("hello").getOrCreate();val rdd=spark.read.json("file:///d:/date.json")print(rdd)
————————————————————————————————————————————————————————————————————————————————————————————————
方法二:使用SparkSession
val df = spark.read.format("json").option("header", "true").load("file:///home/kgc/data/users.json")
RDD數據傾斜*
- 數據分配的不均勻
- 通常發生在groupBy,join等之后
1)在執行shuffle操作的時候,是按照key,來進行values的輸出、拉取和聚合2)同一個key的values,一定是分配到一個reduce task進行處理的
3)如果是很多相同的key對應的values被分配到了一個task上面去執行,而另外的task,可能只分配了一些
4)這樣就會出現數據傾斜問題
解決方法:
方案一:聚合源數據
通過一些聚合的操作,比如grouByKey、reduceByKey就是拿到每個key對應的value,對每個key對應的values執行一定的計算
方案二:過濾導致傾斜的key
在sql中用where條件,過濾某幾個會導致數據傾斜的key
——————————————————————————————————————————————————
為什么要劃分Stage?
spark劃分stage思路:
1)從后往前推,一個job拆分為多組task,每組的任務被稱為一個stage
2)stage里面的Task的數量對應一個partition,而stage又分為兩類,
一類是shuffleMapTask,一類是resltTask,DAG的最后一個階段為每個partition生成一個resultask,
其余階段都會生成ShuffleMapTask,他將自己的計算結果通過shuffle傳到下一個stage中。
——————————————————————————————————————————————————
?
轉載于:https://www.cnblogs.com/tudousiya/p/11285866.html
總結
以上是生活随笔為你收集整理的Spark 分布式计算原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Introduction to Djan
- 下一篇: iOS开发8:使用Tool Bar切换视