spark 算子例子_Spark性能调优方法
公眾號后臺回復關鍵詞:pyspark,獲取本項目github地址。
Spark程序可以快如閃電??,也可以慢如蝸牛?。
它的性能取決于用戶使用它的方式。
一般來說,如果有可能,用戶應當盡可能多地使用SparkSQL以取得更好的性能。
主要原因是SparkSQL是一種聲明式編程風格,背后的計算引擎會自動做大量的性能優化工作。
基于RDD的Spark的性能調優屬于坑非常深的領域,并且很容易踩到。
我們將介紹Spark調優原理,Spark任務監控,以及Spark調優案例。
本文參考了以下文章:
《Spark性能優化指南——基礎篇》:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
《Spark性能優化指南——高級篇》:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
《spark-調節executor堆外內存》:https://www.cnblogs.com/colorchild/p/12175328.html
import?findspark#指定spark_home為剛才的解壓路徑,指定python路徑
spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path?=?"/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)
import?pyspark?
from?pyspark.sql?import?SparkSession
#SparkSQL的許多功能封裝在SparkSession的方法接口中
spark?=?SparkSession.builder?\
????????.appName("test")?\
????????.config("master","local[4]")?\
????????.enableHiveSupport()?\
????????.getOrCreate()
sc?=?spark.sparkContext
一,Spark調優原理
可以用下面三個公式來近似估計spark任務的執行時間。
可以用下面二個公式來說明spark在executor上的內存分配。
如果程序執行太慢,調優的順序一般如下:
1,首先調整任務并行度,并調整partition分區。
2,嘗試定位可能的重復計算,并優化之。
3,嘗試定位數據傾斜問題或者計算傾斜問題并優化之。
4,如果shuffle過程提示堆外內存不足,考慮調高堆外內存。
5,如果發生OOM或者GC耗時過長,考慮提高executor-memory或降低executor-core。
以下是對上述公式中涉及到的一些概念的初步解讀。
任務計算總時間:假設由一臺無限內存的同等CPU配置的單核機器執行該任務,所需要的運行時間。通過緩存避免重復計算,通過mapPartitions代替map以減少諸如連接數據庫,預處理廣播變量等重復過程,都是減少任務計算總時間的例子。
shuffle總時間:任務因為reduceByKey,join,sortBy等shuffle類算子會觸發shuffle操作產生的磁盤讀寫和網絡傳輸的總時間。shuffle操作的目的是將分布在集群中多個節點上的同一個key的數據,拉取到同一個節點上,以便讓一個節點對同一個key的所有數據進行統一處理。shuffle過程首先是前一個stage的一個shuffle write即寫磁盤過程,中間是一個網絡傳輸過程,然后是后一個stage的一個shuffle read即讀磁盤過程。shuffle過程既包括磁盤讀寫,又包括網絡傳輸,非常耗時。因此如有可能,應當避免使用shuffle類算子。例如用map+broadcast的方式代替join過程。退而求其次,也可以在shuffle之前對相同key的數據進行歸并,減少shuffle讀寫和傳輸的數據量。此外,還可以應用一些較為高效的shuffle算子來代替低效的shuffle算子。例如用reduceByKey/aggregateByKey來代替groupByKey。最后,shuffle在進行網絡傳輸的過程中會通過netty使用JVM堆外內存,spark任務中大規模數據的shuffle可能會導致堆外內存不足,導致任務掛掉,這時候需要在配置文件中調大堆外內存。
GC垃圾回收總時間:當JVM中execution內存不足時,會啟動GC垃圾回收過程。執行GC過程時候,用戶線程會終止等待。因此如果execution內存不夠充分,會觸發較多的GC過程,消耗較多的時間。在spark2.0之后excution內存和storage內存是統一分配的,不必調整excution內存占比,可以提高executor-memory來降低這種可能。或者減少executor-cores來降低這種可能(這會導致任務并行度的降低)。
任務有效并行度:任務實際上平均被多少個core執行。它首先取決于可用的core數量。當partition分區數量少于可用的core數量時,只會有partition分區數量的core執行任務,因此一般設置分區數是可用core數量的2倍以上20倍以下。此外任務有效并行度嚴重受到數據傾斜和計算傾斜的影響。有時候我們會看到99%的partition上的數據幾分鐘就執行完成了,但是有1%的partition上的數據卻要執行幾個小時。這時候一般是發生了數據傾斜或者計算傾斜。這個時候,我們說,任務實際上有效的并行度會很低,因為在后面的這幾個小時的絕大部分時間,只有很少的幾個core在執行任務。
任務并行度:任務可用core的數量。它等于申請到的executor數量和每個executor的core數量的乘積。可以在spark-submit時候用num-executor和executor-cores來控制并行度。此外,也可以開啟spark.dynamicAllocation.enabled根據任務耗時動態增減executor數量。雖然提高executor-cores也能夠提高并行度,但是當計算需要占用較大的存儲時,不宜設置較高的executor-cores數量,否則可能會導致executor內存不足發生內存溢出OOM。
partition分區數量:分區數量越大,單個分區的數據量越小,任務在不同的core上的數量分配會越均勻,有助于提升任務有效并行度。但partition數量過大,會導致更多的數據加載時間,一般設置分區數是可用core數量的2倍以上20倍以下。可以在spark-submit中用spark.default.parallelism來控制RDD的默認分區數量,可以用spark.sql.shuffle.partitions來控制SparkSQL中給shuffle過程的分區數量。
數據傾斜度:數據傾斜指的是數據量在不同的partition上分配不均勻。一般來說,shuffle算子容易產生數據傾斜現象,某個key上聚合的數據量可能會百萬千萬之多,而大部分key聚合的數據量卻只有幾十幾百個。一個partition上過大的數據量不僅需要耗費大量的計算時間,而且容易出現OOM。對于數據傾斜,一種簡單的緩解方案是增大partition分區數量,但不能從根本上解決問題。一種較好的解決方案是利用隨機數構造數量為原始key數量1000倍的中間key。大概步驟如下,利用1到1000的隨機數和當前key組合成中間key,中間key的數據傾斜程度只有原來的1/1000, 先對中間key執行一次shuffle操作,得到一個數據量少得多的中間結果,然后再對我們關心的原始key進行shuffle,得到一個最終結果。
計算傾斜度:計算傾斜指的是不同partition上的數據量相差不大,但是計算耗時相差巨大。考慮這樣一個例子,我們的RDD的每一行是一個列表,我們要計算每一行中這個列表中的數兩兩乘積之和,這個計算的復雜度是和列表長度的平方成正比的,因此如果有一個列表的長度是其它列表平均長度的10倍,那么計算這一行的時間將會是其它列表的100倍,從而產生計算傾斜。計算傾斜和數據傾斜的表現非常相似,我們會看到99%的partition上的數據幾分鐘就執行完成了,但是有1%的partition上的數據卻要執行幾個小時。計算傾斜和shuffle無關,在map端就可以發生。計算傾斜出現后,一般可以通過舍去極端數據或者改變計算方法優化性能。
堆內內存:on-heap memory, 即Java虛擬機直接管理的存儲,由JVM負責垃圾回收GC。由多個core共享,core越多,每個core實際能使用的內存越少。core設置得過大容易導致OOM,并使得GC時間增加。
堆外內存:off-heap memory, 不受JVM管理的內存, ?可以精確控制申請和釋放, 沒有GC問題。一般shuffle過程在進行網絡傳輸的過程中會通過netty使用到堆外內存。
二,Spark任務UI監控
Spark任務啟動后,可以在瀏覽器中輸入 http://localhost:4040/ 進入到spark web UI 監控界面。
該界面中可以從多個維度以直觀的方式非常細粒度地查看Spark任務的執行情況,包括任務進度,耗時分析,存儲分析,shuffle數據量大小等。
最常查看的頁面是 Stages頁面和Excutors頁面。
Jobs:每一個Action操作對應一個Job,以Job粒度顯示Application進度。有時間軸Timeline。
Stages:Job在遇到shuffle切開Stage,顯示每個Stage進度,以及shuffle數據量。
可以點擊某個Stage進入詳情頁,查看其下面每個Task的執行情況以及各個partition執行的費時統計。
Storage:
監控cache或者persist導致的數據存儲大小。
Environment:
顯示spark和scala版本,依賴的各種jar包及其版本。
Excutors : 監控各個Excutors的存儲和shuffle情況。
SQL: 顯示各種SQL命令在那些Jobs中被執行。
三,Spark調優案例
下面介紹幾個調優的典型案例:
1,資源配置優化
2,利用緩存減少重復計算
3,數據傾斜調優
4,broadcast+map代替join
5,reduceByKey/aggregateByKey代替groupByKey
1,資源配置優化
下面是一個資源配置的例子:
優化前:
#提交python寫的任務spark-submit?--master?yarn?\
--deploy-mode?cluster?\
--executor-memory?12G?\
--driver-memory?12G?\
--num-executors?100?\
--executor-cores?8?\
--conf?spark.yarn.maxAppAttempts=2?\
--conf?spark.task.maxFailures=10?\
--conf?spark.stage.maxConsecutiveAttempts=10?\
--conf?spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python?#指定excutors的Python環境
--conf?spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON?=?./anaconda3.zip/anaconda3/bin/python??#cluster模式時候設置
--archives?viewfs:///user/hadoop-xxx/yyy/anaconda3.zip?#上傳到hdfs的Python環境
--files??data.csv,profile.txt
--py-files??pkg.py,tqdm.py
pyspark_demo.py?
優化后:
#提交python寫的任務spark-submit?--master?yarn?\
--deploy-mode?cluster?\
--executor-memory?12G?\
--driver-memory?12G?\
--num-executors?100?\
--executor-cores?2?\
--conf?spark.yarn.maxAppAttempts=2?\
--conf?spark.default.parallelism=1600?\
--conf?spark.sql.shuffle.partitions=1600?\
--conf?spark.memory.offHeap.enabled=true?\
--conf?spark.memory.offHeap.size=2g\
--conf?spark.task.maxFailures=10?\
--conf?spark.stage.maxConsecutiveAttempts=10?\
--conf?spark.yarn.appMasterEnv.PYSPARK_PYTHON=./anaconda3.zip/anaconda3/bin/python?#指定excutors的Python環境
--conf?spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON?=?./anaconda3.zip/anaconda3/bin/python??#cluster模式時候設置
--archives?viewfs:///user/hadoop-xxx/yyy/anaconda3.zip?#上傳到hdfs的Python環境
--files??data.csv,profile.txt
--py-files??pkg.py,tqdm.py
pyspark_demo.py?
這里主要減小了 executor-cores數量,一般設置為1~4,過大的數量可能會造成每個core計算和存儲資源不足產生OOM,也會增加GC時間。此外也將默認分區數調到了1600,并設置了2G的堆外內存。
2, 利用緩存減少重復計算
%%time#?優化前:
import?math?
rdd_x?=?sc.parallelize(range(0,2000000,3),3)
rdd_y?=?sc.parallelize(range(2000000,4000000,2),3)
rdd_z?=?sc.parallelize(range(4000000,6000000,2),3)
rdd_data?=?rdd_x.union(rdd_y).union(rdd_z).map(lambda?x:math.tan(x))
s?=?rdd_data.reduce(lambda?a,b:a+b+0.0)
n?=?rdd_data.count()
mean?=?s/n?
print(mean)
-1.889935655259299
CPU?times:?user?40.2?ms,?sys:?12.4?ms,?total:?52.6?ms
Wall?time:?2.76?s
%%time?
#?優化后:?
import?math?
from??pyspark.storagelevel?import?StorageLevel
rdd_x?=?sc.parallelize(range(0,2000000,3),3)
rdd_y?=?sc.parallelize(range(2000000,4000000,2),3)
rdd_z?=?sc.parallelize(range(4000000,6000000,2),3)
rdd_data?=?rdd_x.union(rdd_y).union(rdd_z).map(lambda?x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)
s?=?rdd_data.reduce(lambda?a,b:a+b+0.0)
n?=?rdd_data.count()
mean?=?s/n?
rdd_data.unpersist()
print(mean)
-1.889935655259299
CPU?times:?user?40.5?ms,?sys:?11.5?ms,?total:?52?ms
Wall?time:?2.18?s
3, 數據傾斜調優
%%time?#?優化前:?
rdd_data?=?sc.parallelize(["hello?world"]*1000000+["good?morning"]*10000+["I?love?spark"]*10000)
rdd_word?=?rdd_data.flatMap(lambda?x:x.split("?"))
rdd_one?=?rdd_word.map(lambda?x:(x,1))
rdd_count?=?rdd_one.reduceByKey(lambda?a,b:a+b+0.0)
print(rdd_count.collect())?
[('good',?10000.0),?('hello',?1000000.0),?('spark',?10000.0),?('world',?1000000.0),?('love',?10000.0),?('morning',?10000.0),?('I',?10000.0)]
CPU?times:?user?285?ms,?sys:?27.6?ms,?total:?313?ms
Wall?time:?2.74?s
%%time?
#?優化后:?
import?random?
rdd_data?=?sc.parallelize(["hello?world"]*1000000+["good?morning"]*10000+["I?love?spark"]*10000)
rdd_word?=?rdd_data.flatMap(lambda?x:x.split("?"))
rdd_one?=?rdd_word.map(lambda?x:(x,1))
rdd_mid_key?=?rdd_one.map(lambda?x:(x[0]+"_"+str(random.randint(0,999)),x[1]))
rdd_mid_count?=?rdd_mid_key.reduceByKey(lambda?a,b:a+b+0.0)
rdd_count?=?rdd_mid_count.map(lambda?x:(x[0].split("_")[0],x[1])).reduceByKey(lambda?a,b:a+b+0.0)
print(rdd_count.collect())??
#作者按:此處僅示范原理,單機上該優化方案難以獲得性能優勢
[('good',?10000.0),?('hello',?1000000.0),?('spark',?10000.0),?('world',?1000000.0),?('love',?10000.0),?('morning',?10000.0),?('I',?10000.0)]
CPU?times:?user?351?ms,?sys:?51?ms,?total:?402?ms
Wall?time:?7?s
4, broadcast+map代替join
該優化策略一般限于有一個參與join的rdd的數據量不大的情況。
%%time?#?優化前:
rdd_age?=?sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender?=?sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")])
rdd_students?=?rdd_age.join(rdd_gender).map(lambda?x:(x[0],x[1][0],x[1][1]))
print(rdd_students.collect())
[('LiLy',?20,?'female'),?('LiLei',?18,?'male'),?('HanMeimei',?19,?'female'),?('Jim',?17,?'male')]
CPU?times:?user?43.9?ms,?sys:?11.6?ms,?total:?55.6?ms
Wall?time:?307?ms
%%time?
#?優化后:
rdd_age?=?sc.parallelize([("LiLei",18),("HanMeimei",19),("Jim",17),("LiLy",20)])
rdd_gender?=?sc.parallelize([("LiLei","male"),("HanMeimei","female"),("Jim","male"),("LiLy","female")],2)
ages?=?rdd_age.collect()
broads?=?sc.broadcast(ages)
def?get_age(it):
????result?=?[]
????ages?=?dict(broads.value)
????for?x?in?it:
????????name?=?x[0]
????????age?=?ages.get(name,0)
????????result.append((x[0],age,x[1]))
????return?iter(result)
rdd_students?=?rdd_gender.mapPartitions(get_age)
print(rdd_students.collect())
[('LiLei',?18,?'male'),?('HanMeimei',?19,?'female'),?('Jim',?17,?'male'),?('LiLy',?20,?'female')]
CPU?times:?user?14.3?ms,?sys:?7.43?ms,?total:?21.7?ms
Wall?time:?86.3?ms
5,reduceByKey/aggregateByKey代替groupByKey
groupByKey算子是一個低效的算子,其會產生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替,通過在每個partition內部先做一次數據的合并操作,大大減少了shuffle的數據量。
%%time?#?優化前:
rdd_students?=?sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
???????????????????????????????("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names?=?rdd_students.groupByKey().map(lambda?t:(t[0],list(t[1])))
names?=?rdd_names.collect()
print(names)
[('class1',?['LiLei',?'Lucy',?'Ann',?'Jim']),?('class2',?['HanMeimei',?'Lily'])]
CPU?times:?user?25.3?ms,?sys:?7.32?ms,?total:?32.6?ms
Wall?time:?164?ms
%%time?
#?優化后:
rdd_students?=?sc.parallelize([("class1","LiLei"),("class2","HanMeimei"),("class1","Lucy"),
???????????????????????????????("class1","Ann"),("class1","Jim"),("class2","Lily")])
rdd_names?=?rdd_students.aggregateByKey([],lambda?arr,name:arr+[name],lambda?arr1,arr2:arr1+arr2)
names?=?rdd_names.collect()
print(names)
[('class1',?['LiLei',?'Lucy',?'Ann',?'Jim']),?('class2',?['HanMeimei',?'Lily'])]
CPU?times:?user?21.6?ms,?sys:?6.63?ms,?total:?28.3?ms
Wall?time:?118?ms
公眾號后臺回復關鍵字:pyspark,獲取《eat pyspark in 10 days》 github項目地址。
總結
以上是生活随笔為你收集整理的spark 算子例子_Spark性能调优方法的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 招行账单分期影响提额吗 招行账单分期与提
- 下一篇: 比人都高!博主打造世界最大Xbox主机: