pyspark性能调优参数
20220311
參數調節
把executor數量調小,其他參數值調大,不容易報錯
一.指定spark executor 數量的公式
executor 數量 = spark.cores.max/spark.executor.cores
spark.cores.max 是指你的spark程序需要的總核數
spark.executor.cores 是指每個executor需要的核數
二.指定并行的task數量
spark.default.parallelism
參數說明:該參數用于設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
參數調優建議:Spark作業的默認task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設置這個參數,那么此時就會導致Spark自己根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。通常來說,Spark默認設置的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設置好的Executor的參數都前功盡棄。試想一下,無論你的Executor進程有多少個,內存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那么設置1000個task是可以的,此時可以充分地利用Spark集群的資源。
三. 命令示例
1
spark-submit --class com.cjh.test.WordCount --conf spark.default.parallelism=12 --conf spark.executor.memory=800m --conf spark.executor.cores=2 --conf spark.cores.max=6 my.jar
四.其他調優參數
spark.storage.memoryFraction
參數說明:該參數用于設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那么這個參數的值適當降低一些比較合適。此外,如果發現作業由于頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
spark.shuffle.memoryFraction
參數說明:該參數用于設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由于頻繁的gc導致運行緩慢,意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。
spark.spark.default.parallelism與sql.shuffle.partitions的設置:
Spark中RDD對應有partition的概念,每個partition都會對應一個task,task越多,在處理大規模數據的時候,就會越有效率。但是并不是task越多越好,如果平時測試,或者數據量沒有那么大,則沒有必要task數量太多。
我的第一個query程序,有200個task,我改成了50個,節約了1s左右。
參數可以通過spark_home/conf/spark-default.conf配置文件設置:
spark.sql.shuffle.partitions 50
spark.default.parallelism 10
上邊兩個參數,第一個是針對spark sql的task數量,我的程序邏輯是將rdd首先轉換成dataframe,然后進行query,所以對應第一個參數。
而如果程序是非sql則第二個參數生效。
SparkContext中默認有defaultMinPartitions指定最小的分區數;
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
SparkContext中生成RDD的接口中往往指定的是minPartitions最小分區數目。
https://www.cnblogs.com/hadoop-dev/p/6669232.html
spark分區數,task數目,core數,worker節點個數,excutor數量梳理
作者:王燚光
https://www.zhihu.com/question/33270495/answer/934
輸入可能以多個文件的形式存儲在HDFS上,每個File都包含了很多塊,稱為Block。
當Spark讀取這些文件作為輸入時,會根據具體數據格式對應的InputFormat進行解析,一般是將若干個Block合并成一個輸入分片,稱為InputSplit,注意InputSplit不能跨越文件。
隨后將為這些輸入分片生成具體的Task。InputSplit與Task是一一對應的關系。
隨后這些具體的Task每個都會被分配到集群上的某個節點的某個Executor去執行。
每個節點可以起一個或多個Executor。
每個Executor由若干core組成,每個Executor的每個core一次只能執行一個Task。
每個Task執行的結果就是生成了目標RDD的一個partiton。
注意: 這里的core是虛擬的core而不是機器的物理CPU核,可以理解為就是Executor的一個工作線程。
而 Task被執行的并發度 = Executor數目 * 每個Executor核數。
至于partition的數目:
對于數據讀入階段,例如sc.textFile,輸入文件被劃分為多少InputSplit就會需要多少初始Task。
在Map階段partition數目保持不變。
在Reduce階段,RDD的聚合會觸發shuffle操作,聚合后的RDD的partition數目跟具體操作有關,例如repartition操作會聚合成指定分區數,還有一些算子是可配置的。
RDD在計算的時候,每個分區都會起一個task,所以rdd的分區數目決定了總的的task數目。
申請的計算節點(Executor)數目和每個計算節點核數,決定了你同一時刻可以并行執行的task。
比如的RDD有100個分區,那么計算的時候就會生成100個task,你的資源配置為10個計算節點,每個兩2個核,同一時刻可以并行的task數目為20,計算這個RDD就需要5個輪次。
如果計算資源不變,你有101個task的話,就需要6個輪次,在最后一輪中,只有一個task在執行,其余核都在空轉。
如果資源不變,你的RDD只有2個分區,那么同一時刻只有2個task運行,其余18個核空轉,造成資源浪費。這就是在spark調優中,增大RDD分區數目,增大任務并行度的做法。
引用https://www.cnblogs.com/hadoop-dev/p/6669232.html
引用https://blog.csdn.net/wangguohe/article/details/80645978
總結
以上是生活随笔為你收集整理的pyspark性能调优参数的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pyspark汇总小结
- 下一篇: 联邦学习fate笔记小结