【机器学习】3万字长文,PySpark入门级学习教程,框架思维
為什么要學習Spark?作為數據從業者多年,個人覺得Spark已經越來越走進我們的日常工作了,無論是使用哪種編程語言,Python、Scala還是Java,都會或多或少接觸到Spark,它可以讓我們能夠用到集群的力量,可以對BigData進行高效操作,實現很多之前由于計算資源而無法輕易實現的東西。網上有很多關于Spark的好處,這里就不做過多的贅述,我們直接進入這篇文章的正文!
關于PySpark,我們知道它是Python調用Spark的接口,我們可以通過調用Python API的方式來編寫Spark程序,它支持了大多數的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我們了解Python的基本語法,那么在Python里調用Spark的力量就顯得十分easy了。下面我將會從相對宏觀的層面介紹一下PySpark,讓我們對于這個神器有一個框架性的認識,知道它能干什么,知道去哪里尋找問題解答,爭取看完這篇文章可以讓我們更加絲滑地入門PySpark。話不多說,馬上開始!
???? 目錄
???? 安裝指引
安裝這塊本文就不展開具體的步驟了,畢竟大家的機子環境都不盡相同。不過可以簡單說幾點重要的步驟,然后節末放上一些安裝示例供大家參考。
1)要使用PySpark,機子上要有Java開發環境
2)環境變量記得要配置完整
3)Mac下的/usr/local/ 路徑一般是隱藏的,PyCharm配置py4j和pyspark的時候可以使用 shift+command+G 來使用路徑訪問。
4)Mac下如果修改了 ~/.bash_profile 的話,記得要重啟下PyCharm才會生效的哈
5)版本記得要搞對,保險起見Java的jdk版本選擇低版本(別問我為什么知道),我選擇的是Java8.
下面是一些示例demo,可以參考下:
1)Mac下安裝spark,并配置pycharm-pyspark完整教程
https://blog.csdn.net/shiyutianming/article/details/99946797
2)virtualBox里安裝開發環境
https://www.bilibili.com/video/BV1i4411i79a?p=3
3)快速搭建spark開發環境,云哥項目
https://github.com/lyhue1991/eat_pyspark_in_10_days
???? 基礎概念
關于Spark的基礎概念,我在先前的文章里也有寫過,大家可以一起來回顧一下 《想學習Spark?先帶你了解一些基礎的知識》。作為補充,今天在這里也介紹一些在Spark中會經常遇見的專有名詞。
?????♀? Q1: 什么是RDD
RDD的全稱是 Resilient Distributed Datasets,這是Spark的一種數據抽象集合,它可以被執行在分布式的集群上進行各種操作,而且有較強的容錯機制。RDD可以被分為若干個分區,每一個分區就是一個數據集片段,從而可以支持分布式計算。
?????♀? Q2: RDD運行時相關的關鍵名詞
簡單來說可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,這幾個東西在調優的時候也會經常遇到的。
Client:指的是客戶端進程,主要負責提交job到Master;
Job:Job來自于我們編寫的程序,Application包含一個或者多個job,job包含各種RDD操作;
Master:指的是Standalone模式中的主控節點,負責接收來自Client的job,并管理著worker,可以給worker分配任務和資源(主要是driver和executor資源);
Worker:指的是Standalone模式中的slave節點,負責管理本節點的資源,同時受Master管理,需要定期給Master回報heartbeat(心跳),啟動Driver和Executor;
Driver:指的是 job(作業)的主進程,一般每個Spark作業都會有一個Driver進程,負責整個作業的運行,包括了job的解析、Stage的生成、調度Task到Executor上去執行;
Stage:中文名 階段,是job的基本調度單位,因為每個job會分成若干組Task,每組任務就被稱為 Stage;
Task:任務,指的是直接運行在executor上的東西,是executor上的一個線程;
Executor:指的是 執行器,顧名思義就是真正執行任務的地方了,一個集群可以被配置若干個Executor,每個Executor接收來自Driver的Task,并執行它(可同時執行多個Task)。
?????♀? Q3: 什么是DAG
全稱是 Directed Acyclic Graph,中文名是有向無環圖。Spark就是借用了DAG對RDD之間的關系進行了建模,用來描述RDD之間的因果依賴關系。因為在一個Spark作業調度中,多個作業任務之間也是相互依賴的,有些任務需要在一些任務執行完成了才可以執行的。在Spark調度中就是有DAGscheduler,它負責將job分成若干組Task組成的Stage。
?????♀? Q4: Spark的部署模式有哪些
主要有local模式、Standalone模式、Mesos模式、YARN模式。
更多的解釋可以參考這位老哥的解釋。https://www.jianshu.com/p/3b8f85329664
?????♀? Q5: Shuffle操作是什么
Shuffle指的是數據從Map端到Reduce端的數據傳輸過程,Shuffle性能的高低直接會影響程序的性能。因為Reduce task需要跨節點去拉在分布在不同節點上的Map task計算結果,這一個過程是需要有磁盤IO消耗以及數據網絡傳輸的消耗的,所以需要根據實際數據情況進行適當調整。另外,Shuffle可以分為兩部分,分別是Map階段的數據準備與Reduce階段的數據拷貝處理,在Map端我們叫Shuffle Write,在Reduce端我們叫Shuffle Read。
?????♀? Q6: 什么是惰性執行
這是RDD的一個特性,在RDD中的算子可以分為Transform算子和Action算子,其中Transform算子的操作都不會真正執行,只會記錄一下依賴關系,直到遇見了Action算子,在這之前的所有Transform操作才會被觸發計算,這就是所謂的惰性執行。具體哪些是Transform和Action算子,可以看下一節。
???? 常用函數
從網友的總結來看比較常用的算子大概可以分為下面幾種,所以就演示一下這些算子,如果需要看更多的算子或者解釋,建議可以移步到官方API文檔去Search一下哈。
pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD
圖來自 edureka 的pyspark入門教程下面我們用自己創建的RDD:sc.parallelize(range(1,11),4)
import?os import?pyspark from?pyspark?import?SparkContext,?SparkConfconf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]") sc?=?SparkContext(conf=conf)#?使用?parallelize方法直接實例化一個RDD rdd?=?sc.parallelize(range(1,11),4)?#?這里的?4?指的是分區數量 rdd.take(100) #?[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]""" ----------------------------------------------Transform算子解析 ---------------------------------------------- """ #?以下的操作由于是Transform操作,因為我們需要在最后加上一個collect算子用來觸發計算。 #?1.?map:?和python差不多,map轉換就是對每一個元素進行一個映射 rdd?=?sc.parallelize(range(1,?11),?4) rdd_map?=?rdd.map(lambda?x:?x*2) print("原始數據:",?rdd.collect()) print("擴大2倍:",?rdd_map.collect()) #?原始數據:?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] #?擴大2倍:?[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]#?2.?flatMap:?這個相比于map多一個flat(壓平)操作,顧名思義就是要把高維的數組變成一維 rdd2?=?sc.parallelize(["hello?SamShare",?"hello?PySpark"]) print("原始數據:",?rdd2.collect()) print("直接split之后的map結果:",?rdd2.map(lambda?x:?x.split("?")).collect()) print("直接split之后的flatMap結果:",?rdd2.flatMap(lambda?x:?x.split("?")).collect()) #?直接split之后的map結果:?[['hello', 'SamShare'], ['hello', 'PySpark']] #?直接split之后的flatMap結果:?['hello', 'SamShare', 'hello', 'PySpark']#?3.?filter:?過濾數據 rdd?=?sc.parallelize(range(1,?11),?4) print("原始數據:",?rdd.collect()) print("過濾奇數:",?rdd.filter(lambda?x:?x?%?2?==?0).collect()) #?原始數據:?[1, 2, 3, 4, 5, 6, 7, 8, 9, 10] #?過濾奇數:?[2, 4, 6, 8, 10]#?4.?distinct:?去重元素 rdd?=?sc.parallelize([2,?2,?4,?8,?8,?8,?8,?16,?32,?32]) print("原始數據:",?rdd.collect()) print("去重數據:",?rdd.distinct().collect()) #?原始數據:?[2, 2, 4, 8, 8, 8, 8, 16, 32, 32] #?去重數據:?[4, 8, 16, 32, 2]#?5.?reduceByKey:?根據key來映射數據 from?operator?import?add rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)]) print("原始數據:",?rdd.collect()) print("原始數據:",?rdd.reduceByKey(add).collect()) #?原始數據:?[('a', 1), ('b', 1), ('a', 1)] #?原始數據:?[('b', 1), ('a', 2)]#?6.?mapPartitions:?根據分區內的數據進行映射操作 rdd?=?sc.parallelize([1,?2,?3,?4],?2) def?f(iterator):yield?sum(iterator) print(rdd.collect()) print(rdd.mapPartitions(f).collect()) #?[1,?2,?3,?4] #?[3,?7]#?7.?sortBy:?根據規則進行排序 tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)] print(sc.parallelize(tmp).sortBy(lambda?x:?x[0]).collect()) print(sc.parallelize(tmp).sortBy(lambda?x:?x[1]).collect()) #?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)] #?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)]#?8.?subtract:?數據集相減,?Return?each?value?in?self?that?is?not?contained?in?other. x?=?sc.parallelize([("a",?1),?("b",?4),?("b",?5),?("a",?3)]) y?=?sc.parallelize([("a",?3),?("c",?None)]) print(sorted(x.subtract(y).collect())) #?[('a',?1),?('b',?4),?('b',?5)]#?9.?union:?合并兩個RDD rdd?=?sc.parallelize([1,?1,?2,?3]) print(rdd.union(rdd).collect()) #?[1,?1,?2,?3,?1,?1,?2,?3]#?10.?interp:?取兩個RDD的交集,同時有去重的功效 rdd1?=?sc.parallelize([1,?10,?2,?3,?4,?5,?2,?3]) rdd2?=?sc.parallelize([1,?6,?2,?3,?7,?8]) print(rdd1.interp(rdd2).collect()) #?[1,?2,?3]#?11.?cartesian:?生成笛卡爾積 rdd?=?sc.parallelize([1,?2]) print(sorted(rdd.cartesian(rdd).collect())) #?[(1,?1),?(1,?2),?(2,?1),?(2,?2)]#?12.?zip:?拉鏈合并,需要兩個RDD具有相同的長度以及分區數量 x?=?sc.parallelize(range(0,?5)) y?=?sc.parallelize(range(1000,?1005)) print(x.collect()) print(y.collect()) print(x.zip(y).collect()) #?[0,?1,?2,?3,?4] #?[1000,?1001,?1002,?1003,?1004] #?[(0,?1000),?(1,?1001),?(2,?1002),?(3,?1003),?(4,?1004)]# 13. zipWithIndex:?將RDD和一個從0開始的遞增序列按照拉鏈方式連接。 rdd_name?=?sc.parallelize(["LiLei",?"Hanmeimei",?"Lily",?"Lucy",?"Ann",?"Dachui",?"RuHua"]) rdd_index?=?rdd_name.zipWithIndex() print(rdd_index.collect()) #?[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]#?14.?groupByKey:?按照key來聚合數據 rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)]) print(rdd.collect()) print(sorted(rdd.groupByKey().mapValues(len).collect())) print(sorted(rdd.groupByKey().mapValues(list).collect())) #?[('a',?1),?('b',?1),?('a',?1)] #?[('a',?2),?('b',?1)] #?[('a',?[1,?1]),?('b',?[1])]#?15.?sortByKey: tmp?=?[('a',?1),?('b',?2),?('1',?3),?('d',?4),?('2',?5)] print(sc.parallelize(tmp).sortByKey(True,?1).collect()) #?[('1',?3),?('2',?5),?('a',?1),?('b',?2),?('d',?4)]#?16.?join: x?=?sc.parallelize([("a",?1),?("b",?4)]) y?=?sc.parallelize([("a",?2),?("a",?3)]) print(sorted(x.join(y).collect())) #?[('a',?(1,?2)),?('a',?(1,?3))]#?17.?leftOuterJoin/rightOuterJoin x?=?sc.parallelize([("a",?1),?("b",?4)]) y?=?sc.parallelize([("a",?2)]) print(sorted(x.leftOuterJoin(y).collect())) #?[('a',?(1,?2)),?('b',?(4,?None))]""" ----------------------------------------------Action算子解析 ---------------------------------------------- """ #?1.?collect:?指的是把數據都匯集到driver端,便于后續的操作 rdd?=?sc.parallelize(range(0,?5)) rdd_collect?=?rdd.collect() print(rdd_collect) #?[0,?1,?2,?3,?4]#?2.?first:?取第一個元素 sc.parallelize([2,?3,?4]).first() #?2#?3.?collectAsMap:?轉換為dict,使用這個要注意了,不要對大數據用,不然全部載入到driver端會爆內存 m?=?sc.parallelize([(1,?2),?(3,?4)]).collectAsMap() m #?{1:?2,?3:?4}#?4.?reduce:?逐步對兩個元素進行操作 rdd?=?sc.parallelize(range(10),5) print(rdd.reduce(lambda?x,y:x+y)) #?45#?5.?countByKey/countByValue: rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)]) print(sorted(rdd.countByKey().items())) print(sorted(rdd.countByValue().items())) #?[('a',?2),?('b',?1)] #?[(('a',?1),?2),?(('b',?1),?1)]#?6.?take:?相當于取幾個數據到driver端 rdd?=?sc.parallelize([("a",?1),?("b",?1),?("a",?1)]) print(rdd.take(5)) #?[('a',?1),?('b',?1),?('a',?1)]#?7.?saveAsTextFile:?保存rdd成text文件到本地 text_file?=?"./data/rdd.txt" rdd?=?sc.parallelize(range(5)) rdd.saveAsTextFile(text_file)#?8.?takeSample:?隨機取數 rdd?=?sc.textFile("./test/data/hello_samshare.txt",?4)??#?這里的?4?指的是分區數量 rdd_sample?=?rdd.takeSample(True,?2,?0)??# withReplacement 參數1:代表是否是有放回抽樣 rdd_sample#?9.?foreach:?對每一個元素執行某種操作,不生成新的RDD rdd?=?sc.parallelize(range(10),?5) accum?=?sc.accumulator(0) rdd.foreach(lambda?x:?accum.add(x)) print(accum.value) #?45???? Spark SQL使用
在講Spark SQL前,先解釋下這個模塊。這個模塊是Spark中用來處理結構化數據的,提供一個叫SparkDataFrame的東西并且自動解析為分布式SQL查詢數據。我們之前用過Python的Pandas庫,也大致了解了DataFrame,這個其實和它沒有太大的區別,只是調用的API可能有些不同罷了。
我們通過使用Spark SQL來處理數據,會讓我們更加地熟悉,比如可以用SQL語句、用SparkDataFrame的API或者Datasets API,我們可以按照需求隨心轉換,通過SparkDataFrame API 和 SQL 寫的邏輯,會被Spark優化器Catalyst自動優化成RDD,即便寫得不好也可能運行得很快(如果是直接寫RDD可能就掛了哈哈)。
創建SparkDataFrame
開始講SparkDataFrame,我們先學習下幾種創建的方法,分別是使用RDD來創建、使用python的DataFrame來創建、使用List來創建、讀取數據文件來創建、通過讀取數據庫來創建。
1. 使用RDD來創建
主要使用RDD的toDF方法。
rdd?=?sc.parallelize([("Sam",?28,?88),?("Flora",?28,?90),?("Run",?1,?60)]) df?=?rdd.toDF(["name",?"age",?"score"]) df.show() df.printSchema()#?+-----+---+-----+ #?|?name|age|score| #?+-----+---+-----+ #?|??Sam|?28|???88| #?|Flora|?28|???90| #?|??Run|??1|???60| #?+-----+---+-----+ #?root #??|--?name:?string?(nullable?=?true) #??|--?age:?long?(nullable?=?true) #??|--?score:?long?(nullable?=?true)2. 使用python的DataFrame來創建
df?=?pd.DataFrame([['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]],columns=['name',?'age',?'score']) print(">>?打印DataFrame:") print(df) print("\n") Spark_df?=?spark.createDataFrame(df) print(">>?打印SparkDataFrame:") Spark_df.show() #?>>?打印DataFrame: #?????name??age??score #?0????Sam???28?????88 #?1??Flora???28?????90 #?2????Run????1?????60 #?>>?打印SparkDataFrame: #?+-----+---+-----+ #?|?name|age|score| #?+-----+---+-----+ #?|??Sam|?28|???88| #?|Flora|?28|???90| #?|??Run|??1|???60| #?+-----+---+-----+3. 使用List來創建
list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]] Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score']) Spark_df.show() #?+-----+---+-----+ #?|?name|age|score| #?+-----+---+-----+ #?|??Sam|?28|???88| #?|Flora|?28|???90| #?|??Run|??1|???60| #?+-----+---+-----+4. 讀取數據文件來創建
#?4.1?CSV文件 df?=?spark.read.option("header",?"true")\.option("inferSchema",?"true")\.option("delimiter",?",")\.csv("./test/data/titanic/train.csv") df.show(5) df.printSchema()#?4.2?json文件 df?=?spark.read.json("./test/data/hello_samshare.json") df.show(5) df.printSchema()5. 通過讀取數據庫來創建
#?5.1?讀取hive數據 spark.sql("CREATE?TABLE?IF?NOT?EXISTS?src?(key?INT,?value?STRING)?USING?hive") spark.sql("LOAD?DATA?LOCAL?INPATH?'data/kv1.txt'?INTO?TABLE?src") df?=?spark.sql("SELECT?key,?value?FROM?src?WHERE?key?<?10?ORDER?BY?key") df.show(5)#?5.2?讀取mysql數據 url?=?"jdbc:mysql://localhost:3306/test" df?=?spark.read.format("jdbc")?\.option("url",?url)?\.option("dbtable",?"runoob_tbl")?\.option("user",?"root")?\.option("password",?"8888")?\.load()\ df.show()常用的SparkDataFrame API
這里我大概是分成了幾部分來看這些APIs,分別是查看DataFrame的APIs、簡單處理DataFrame的APIs、DataFrame的列操作APIs、DataFrame的一些思路變換操作APIs、DataFrame的一些統計操作APIs,這樣子也有助于我們了解這些API的功能,以后遇見實際問題的時候可以解決。
首先我們這小節全局用到的數據集如下:
from?pyspark.sql?import?functions?as?F from?pyspark.sql?import?SparkSession # SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。 spark?=?SparkSession.builder?\.appName("sam_SamShare")?\.config("master",?"local[4]")?\.enableHiveSupport()?\.getOrCreate() sc?=?spark.sparkContext#?創建一個SparkDataFrame rdd?=?sc.parallelize([("Sam",?28,?88,?"M"),("Flora",?28,?90,?"F"),("Run",?1,?60,?None),("Peter",?55,?100,?"M"),("Mei",?54,?95,?"F")]) df?=?rdd.toDF(["name",?"age",?"score",?"sex"]) df.show() df.printSchema()#?+-----+---+-----+----+ #?|?name|age|score|?sex| #?+-----+---+-----+----+ #?|??Sam|?28|???88|???M| #?|Flora|?28|???90|???F| #?|??Run|??1|???60|null| #?|Peter|?55|??100|???M| #?|??Mei|?54|???95|???F| #?+-----+---+-----+----+ #?root #??|--?name:?string?(nullable?=?true) #??|--?age:?long?(nullable?=?true) #??|--?score:?long?(nullable?=?true) #??|--?sex:?string?(nullable?=?true)1. 查看DataFrame的APIs
#?DataFrame.collect #?以列表形式返回行 df.collect() #?[Row(name='Sam',?age=28,?score=88,?sex='M'), #?Row(name='Flora',?age=28,?score=90,?sex='F'), #?Row(name='Run',?age=1,?score=60,?sex=None), #?Row(name='Peter',?age=55,?score=100,?sex='M'), #?Row(name='Mei',?age=54,?score=95,?sex='F')]#?DataFrame.count df.count() #?5#?DataFrame.columns df.columns #?['name',?'age',?'score',?'sex']#?DataFrame.dtypes df.dtypes #?[('name',?'string'),?('age',?'bigint'),?('score',?'bigint'),?('sex',?'string')]#?DataFrame.describe #?返回列的基礎統計信息 df.describe(['age']).show() #?+-------+------------------+ #?|summary|???????????????age| #?+-------+------------------+ #?|??count|?????????????????5| #?|???mean|??????????????33.2| #?|?stddev|22.353970564532826| #?|????min|?????????????????1| #?|????max|????????????????55| #?+-------+------------------+ df.describe().show() #?+-------+-----+------------------+------------------+----+ #?|summary|?name|???????????????age|?????????????score|?sex| #?+-------+-----+------------------+------------------+----+ #?|??count|????5|?????????????????5|?????????????????5|???4| #?|???mean|?null|??????????????33.2|??????????????86.6|null| #?|?stddev|?null|22.353970564532826|15.582040944625966|null| #?|????min|Flora|?????????????????1|????????????????60|???F| #?|????max|??Sam|????????????????55|???????????????100|???M| #?+-------+-----+------------------+------------------+----+#?DataFrame.select #?選定指定列并按照一定順序呈現 df.select("sex",?"score").show()#?DataFrame.first #?DataFrame.head #?查看第1條數據 df.first() #?Row(name='Sam',?age=28,?score=88,?sex='M') df.head(1) #?[Row(name='Sam',?age=28,?score=88,?sex='M')]#?DataFrame.freqItems #?查看指定列的枚舉值 df.freqItems(["age","sex"]).show() #?+---------------+-------------+ #?|??age_freqItems|sex_freqItems| #?+---------------+-------------+ #?|[55,?1,?28,?54]|??????[M,?F,]| #?+---------------+-------------+#?DataFrame.summary df.summary().show() #?+-------+-----+------------------+------------------+----+ #?|summary|?name|???????????????age|?????????????score|?sex| #?+-------+-----+------------------+------------------+----+ #?|??count|????5|?????????????????5|?????????????????5|???4| #?|???mean|?null|??????????????33.2|??????????????86.6|null| #?|?stddev|?null|22.353970564532826|15.582040944625966|null| #?|????min|Flora|?????????????????1|????????????????60|???F| #?|????25%|?null|????????????????28|????????????????88|null| #?|????50%|?null|????????????????28|????????????????90|null| #?|????75%|?null|????????????????54|????????????????95|null| #?|????max|??Sam|????????????????55|???????????????100|???M| #?+-------+-----+------------------+------------------+----+#?DataFrame.sample #?按照一定規則從df隨機抽樣數據 df.sample(0.5).show() #?+-----+---+-----+----+ #?|?name|age|score|?sex| #?+-----+---+-----+----+ #?|??Sam|?28|???88|???M| #?|??Run|??1|???60|null| #?|Peter|?55|??100|???M| #?+-----+---+-----+----+2. 簡單處理DataFrame的APIs
#?DataFrame.distinct #?對數據集進行去重 df.distinct().show()#?DataFrame.dropDuplicates #?對指定列去重 df.dropDuplicates(["sex"]).show() #?+-----+---+-----+----+ #?|?name|age|score|?sex| #?+-----+---+-----+----+ #?|Flora|?28|???90|???F| #?|??Run|??1|???60|null| #?|??Sam|?28|???88|???M| #?+-----+---+-----+----+#?DataFrame.exceptAll #?DataFrame.subtract #?根據指定的df對df進行去重 df1?=?spark.createDataFrame([("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"]) df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["C1",?"C2"]) df3?=?df1.exceptAll(df2)??#?沒有去重的功效 df4?=?df1.subtract(df2)??#?有去重的奇效 df1.show() df2.show() df3.show() df4.show() #?+---+---+ #?|?C1|?C2| #?+---+---+ #?|??a|??1| #?|??a|??1| #?|??b|??3| #?|??c|??4| #?+---+---+ #?+---+---+ #?|?C1|?C2| #?+---+---+ #?|??a|??1| #?|??b|??3| #?+---+---+ #?+---+---+ #?|?C1|?C2| #?+---+---+ #?|??a|??1| #?|??c|??4| #?+---+---+ #?+---+---+ #?|?C1|?C2| #?+---+---+ #?|??c|??4| #?+---+---+#?DataFrame.intersectAll #?返回兩個DataFrame的交集 df1?=?spark.createDataFrame([("a",?1),?("a",?1),?("b",??3),?("c",?4)],?["C1",?"C2"]) df2?=?spark.createDataFrame([("a",?1),?("b",?4)],?["C1",?"C2"]) df1.intersectAll(df2).show() #?+---+---+ #?|?C1|?C2| #?+---+---+ #?|??a|??1| #?+---+---+#?DataFrame.drop #?丟棄指定列 df.drop('age').show()#?DataFrame.withColumn #?新增列 df1?=?df.withColumn("birth_year",?2021?-?df.age) df1.show() #?+-----+---+-----+----+----------+ #?|?name|age|score|?sex|birth_year| #?+-----+---+-----+----+----------+ #?|??Sam|?28|???88|???M|??????1993| #?|Flora|?28|???90|???F|??????1993| #?|??Run|??1|???60|null|??????2020| #?|Peter|?55|??100|???M|??????1966| #?|??Mei|?54|???95|???F|??????1967| #?+-----+---+-----+----+----------+#?DataFrame.withColumnRenamed #?重命名列名 df1?=?df.withColumnRenamed("sex",?"gender") df1.show() #?+-----+---+-----+------+ #?|?name|age|score|gender| #?+-----+---+-----+------+ #?|??Sam|?28|???88|?????M| #?|Flora|?28|???90|?????F| #?|??Run|??1|???60|??null| #?|Peter|?55|??100|?????M| #?|??Mei|?54|???95|?????F| #?+-----+---+-----+------+#?DataFrame.dropna #?丟棄空值,DataFrame.dropna(how='any',?thresh=None,?subset=None) df.dropna(how='all',?subset=['sex']).show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|??Sam|?28|???88|??M| #?|Flora|?28|???90|??F| #?|Peter|?55|??100|??M| #?|??Mei|?54|???95|??F| #?+-----+---+-----+---+#?DataFrame.fillna #?空值填充操作 df1?=?spark.createDataFrame([("a",?None),?("a",?1),?(None,??3),?("c",?4)],?["C1",?"C2"]) #?df2?=?df1.na.fill({"C1":?"d",?"C2":?99}) df2?=?df1.fillna({"C1":?"d",?"C2":?99}) df1.show() df2.show()#?DataFrame.filter #?根據條件過濾 df.filter(df.age>50).show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|Peter|?55|??100|??M| #?|??Mei|?54|???95|??F| #?+-----+---+-----+---+ df.where(df.age==28).show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|??Sam|?28|???88|??M| #?|Flora|?28|???90|??F| #?+-----+---+-----+---+ df.filter("age<18").show() #?+----+---+-----+----+ #?|name|age|score|?sex| #?+----+---+-----+----+ #?|?Run|??1|???60|null| #?+----+---+-----+----+#?DataFrame.join #?這個不用多解釋了,直接上案例來看看具體的語法即可,DataFrame.join(other,?on=None,?how=None) df1?=?spark.createDataFrame([("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num1"]) df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num2"]) df1.join(df2,?df1.id?==?df2.id,?'left').select(df1.id.alias("df1_id"),df1.num1.alias("df1_num"),df2.num2.alias("df2_num")).sort(["df1_id"],?ascending=False)\.show()#?DataFrame.agg(*exprs) #?聚合數據,可以寫多個聚合方法,如果不寫groupBy的話就是對整個DF進行聚合 #?DataFrame.alias #?設置列或者DataFrame別名 #?DataFrame.groupBy #?根據某幾列進行聚合,如有多列用列表寫在一起,如?df.groupBy(["sex",?"age"]) df.groupBy("sex").agg(F.min(df.age).alias("最小年齡"),F.expr("avg(age)").alias("平均年齡"),F.expr("collect_list(name)").alias("姓名集合")).show() #?+----+--------+--------+------------+ #?|?sex|最小年齡|平均年齡|????姓名集合| #?+----+--------+--------+------------+ #?|???F|??????28|????41.0|[Flora,?Mei]| #?|null|???????1|?????1.0|???????[Run]| #?|???M|??????28|????41.5|[Sam,?Peter]| #?+----+--------+--------+------------+#?DataFrame.foreach #?對每一行進行函數方法的應用 def?f(person):print(person.name) df.foreach(f) #?Peter #?Run #?Sam #?Flora #?Mei#?DataFrame.replace #?修改df里的某些值 df1?=?df.na.replace({"M":?"Male",?"F":?"Female"}) df1.show()#?DataFrame.union #?相當于SQL里的union?all操作 df1?=?spark.createDataFrame([("a",?1),?("d",?1),?("b",??3),?("c",?4)],?["id",?"num"]) df2?=?spark.createDataFrame([("a",?1),?("b",?3)],?["id",?"num"]) df1.union(df2).show() df1.unionAll(df2).show() #?這里union沒有去重,不知道為啥,有知道的朋友麻煩解釋下,謝謝了。 #?+---+---+ #?|?id|num| #?+---+---+ #?|??a|??1| #?|??d|??1| #?|??b|??3| #?|??c|??4| #?|??a|??1| #?|??b|??3| #?+---+---+#?DataFrame.unionByName #?根據列名來進行合并數據集 df1?=?spark.createDataFrame([[1,?2,?3]],?["col0",?"col1",?"col2"]) df2?=?spark.createDataFrame([[4,?5,?6]],?["col1",?"col2",?"col0"]) df1.unionByName(df2).show() #?+----+----+----+ #?|col0|col1|col2| #?+----+----+----+ #?|???1|???2|???3| #?|???6|???4|???5| #?+----+----+----+3. DataFrame的列操作APIs
這里主要針對的是列進行操作,比如說重命名、排序、空值判斷、類型判斷等,這里就不展開寫demo了,看看語法應該大家都懂了。
Column.alias(*alias,?**kwargs)??#?重命名列名 Column.asc()??#?按照列進行升序排序 Column.desc()??#?按照列進行降序排序 Column.astype(dataType)??#?類型轉換 Column.cast(dataType)??#?強制轉換類型 Column.between(lowerBound,?upperBound)??#?返回布爾值,是否在指定區間范圍內 Column.contains(other)??#?是否包含某個關鍵詞 Column.endswith(other)??#?以什么結束的值,如?df.filter(df.name.endswith('ice')).collect() Column.isNotNull()??#?篩選非空的行 Column.isNull() Column.isin(*cols)??#?返回包含某些值的行?df[df.name.isin("Bob",?"Mike")].collect() Column.like(other)??#?返回含有關鍵詞的行 Column.when(condition,?value)??#?給True的賦值 Column.otherwise(value)??#?與when搭配使用,df.select(df.name,?F.when(df.age?>?3,?1).otherwise(0)).show() Column.rlike(other)??#?可以使用正則的匹配?df.filter(df.name.rlike('ice$')).collect() Column.startswith(other)??#?df.filter(df.name.startswith('Al')).collect() Column.substr(startPos,?length)??#?df.select(df.name.substr(1,?3).alias("col")).collect()4. DataFrame的一些思路變換操作APIs
#?DataFrame.createOrReplaceGlobalTempView #?DataFrame.dropGlobalTempView #?創建全局的試圖,注冊后可以使用sql語句來進行操作,生命周期取決于Spark?application本身 df.createOrReplaceGlobalTempView("people") spark.sql("select?*?from?global_temp.people?where?sex?=?'M'?").show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|??Sam|?28|???88|??M| #?|Peter|?55|??100|??M| #?+-----+---+-----+---+#?DataFrame.createOrReplaceTempView #?DataFrame.dropTempView #?創建本地臨時試圖,生命周期取決于用來創建此數據集的SparkSession df.createOrReplaceTempView("tmp_people") spark.sql("select?*?from?tmp_people?where?sex?=?'F'?").show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|Flora|?28|???90|??F| #?|??Mei|?54|???95|??F| #?+-----+---+-----+---+#?DataFrame.cache\DataFrame.persist #?可以把一些數據放入緩存中,default?storage?level?(MEMORY_AND_DISK). df.cache() df.persist() df.unpersist()#?DataFrame.crossJoin #?返回兩個DataFrame的笛卡爾積關聯的DataFrame df1?=?df.select("name",?"sex") df2?=?df.select("name",?"sex") df3?=?df1.crossJoin(df2) print("表1的記錄數",?df1.count()) print("表2的記錄數",?df2.count()) print("笛卡爾積后的記錄數",?df3.count()) #?表1的記錄數?5 #?表2的記錄數?5 #?笛卡爾積后的記錄數?25#?DataFrame.toPandas #?把SparkDataFrame轉為?Pandas的DataFrame df.toPandas()#?DataFrame.rdd #?把SparkDataFrame轉為rdd,這樣子可以用rdd的語法來操作數據 df.rdd5. DataFrame的一些統計操作APIs
#?DataFrame.cov #?計算指定兩列的樣本協方差 df.cov("age",?"score") #?324.59999999999997#?DataFrame.corr #?計算指定兩列的相關系數,DataFrame.corr(col1,?col2,?method=None),目前method只支持Pearson相關系數 df.corr("age",?"score",?method="pearson") #?0.9319004030498815#?DataFrame.cube #?創建多維度聚合的結果,通常用于分析數據,比如我們指定兩個列進行聚合,比如name和age,那么這個函數返回的聚合結果會 #?groupby("name",?"age") #?groupby("name") #?groupby("age") #?groupby(all) #?四個聚合結果的union?all?的結果df1?=?df.filter(df.name?!=?"Run") print(df1.show()) df1.cube("name",?"sex").count().show() #?+-----+---+-----+---+ #?|?name|age|score|sex| #?+-----+---+-----+---+ #?|??Sam|?28|???88|??M| #?|Flora|?28|???90|??F| #?|Peter|?55|??100|??M| #?|??Mei|?54|???95|??F| #?+-----+---+-----+---+ #?cube?聚合之后的結果 #?+-----+----+-----+ #?|?name|?sex|count| #?+-----+----+-----+ #?|?null|???F|????2| #?|?null|null|????4| #?|Flora|null|????1| #?|Peter|null|????1| #?|?null|???M|????2| #?|Peter|???M|????1| #?|??Sam|???M|????1| #?|??Sam|null|????1| #?|??Mei|???F|????1| #?|??Mei|null|????1| #?|Flora|???F|????1| #?+-----+----+-----+保存數據/寫入數據庫
這里的保存數據主要是保存到Hive中的栗子,主要包括了overwrite、append等方式。
1. 當結果集為SparkDataFrame的時候
import?pandas?as?pd from?datetime?import?datetime from?pyspark?import?SparkConf from?pyspark?import?SparkContext from?pyspark.sql?import?HiveContextconf?=?SparkConf()\.setAppName("test")\.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態寫入hive分區表 sc?=?SparkContext(conf=conf) hc?=?HiveContext(sc) sc.setLogLevel("ERROR")list_values?=?[['Sam',?28,?88],?['Flora',?28,?90],?['Run',?1,?60]] Spark_df?=?spark.createDataFrame(list_values,?['name',?'age',?'score']) print(Spark_df.show()) save_table?=?"tmp.samshare_pyspark_savedata"#?方式1:直接寫入到Hive Spark_df.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式 print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數據寫入到表"?+?save_table)#?方式2:注冊為臨時表,使用SparkSQL來寫入分區表 Spark_df.createOrReplaceTempView("tmp_table") write_sql?=?""" insert?overwrite?table?{0}?partitions?(pt_date='{1}') select?*?from?tmp_table """.format(save_table,?"20210520") hc.sql(write_sql) print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數據寫入到表"?+?save_table)2. 當結果集為Python的DataFrame的時候
如果是Python的DataFrame,我們就需要多做一步把它轉換為SparkDataFrame,其余操作就一樣了。
import?pandas?as?pd from?datetime?import?datetime from?pyspark?import?SparkConf from?pyspark?import?SparkContext from?pyspark.sql?import?HiveContextconf?=?SparkConf()\.setAppName("test")\.set("hive.exec.dynamic.partition.mode",?"nonstrict")?#?動態寫入hive分區表 sc?=?SparkContext(conf=conf) hc?=?HiveContext(sc) sc.setLogLevel("ERROR")result_df?=?pd.DataFrame([1,2,3],?columns=['a']) save_table?=?"tmp.samshare_pyspark_savedata"#?獲取DataFrame的schema c1?=?list(result_df.columns) #?轉為SparkDataFrame result?=?hc.createDataFrame(result_df.astype(str),?c1) result.write.format("hive").mode("overwrite").saveAsTable(save_table)?#?或者改成append模式 print(datetime.now().strftime("%y/%m/%d?%H:%M:%S"),?"測試數據寫入到表"?+?save_table)???? Spark調優思路
這一小節的內容算是對pyspark入門的一個ending了,全文主要是參考學習了美團Spark性能優化指南的基礎篇和高級篇內容,主體脈絡和這兩篇文章是一樣的,只不過是基于自己學習后的理解進行了一次總結復盤,而原文中主要是用Java來舉例的,我這邊主要用pyspark來舉例。文章主要會從4個方面(或者說4個思路)來優化我們的Spark任務,主要就是下面的圖片所示:
開發習慣調優
1. 盡可能復用同一個RDD,避免重復創建,并且適當持久化數據
這種開發習慣是需要我們對于即將要開發的應用邏輯有比較深刻的思考,并且可以通過code review來發現的,講白了就是要記得我們創建過啥數據集,可以復用的盡量廣播(broadcast)下,能很好提升性能。
#?最低級寫法,相同數據集重復創建。 rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區數量 rdd2?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區數量 print(rdd1.take(10)) print(rdd2.map(lambda?x:x[0:1]).take(10))#?稍微進階一些,復用相同數據集,但因中間結果沒有緩存,數據會重復計算 rdd1?=?sc.textFile("./test/data/hello_samshare.txt",?4)?#?這里的?4?指的是分區數量 print(rdd1.take(10)) print(rdd1.map(lambda?x:x[0:1]).take(10))#?相對比較高效,使用緩存來持久化數據 rdd?=?sc.parallelize(range(1,?11),?4).cache()??#?或者persist() rdd_map?=?rdd.map(lambda?x:?x*2) rdd_reduce?=?rdd.reduce(lambda?x,?y:?x+y) print(rdd_map.take(10)) print(rdd_reduce)下面我們就來對比一下使用緩存能給我們的Spark程序帶來多大的效率提升吧,我們先構造一個程序運行時長測量器。
import?time #?統計程序運行時間 def?time_me(info="used"):def?_time_me(fn):@functools.wraps(fn)def?_wrapper(*args,?**kwargs):start?=?time.time()fn(*args,?**kwargs)print("%s?%s?%s"?%?(fn.__name__,?info,?time.time()?-?start),?"second")return?_wrapperreturn?_time_me下面我們運行下面的代碼,看下使用了cache帶來的效率提升:
@time_me() def?test(types=0):if?types?==?1:print("使用持久化緩存")rdd?=?sc.parallelize(range(1,?10000000),?4)rdd1?=?rdd.map(lambda?x:?x*x?+?2*x?+?1).cache()??#?或者?persist(StorageLevel.MEMORY_AND_DISK_SER)print(rdd1.take(10))rdd2?=?rdd1.reduce(lambda?x,?y:?x+y)rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)print(rdd5)else:print("不使用持久化緩存")rdd?=?sc.parallelize(range(1,?10000000),?4)rdd1?=?rdd.map(lambda?x:?x?*?x?+?2?*?x?+?1)print(rdd1.take(10))rdd2?=?rdd1.reduce(lambda?x,?y:?x?+?y)rdd3?=?rdd1.reduce(lambda?x,?y:?x?+?y)rdd4?=?rdd1.reduce(lambda?x,?y:?x?+?y)rdd5?=?rdd1.reduce(lambda?x,?y:?x?+?y)print(rdd5)test()???#?不使用持久化緩存 time.sleep(10) test(1)??#?使用持久化緩存 #?output: #?使用持久化緩存 #?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121] #?333333383333334999999 #?test?used?26.36529278755188?second #?使用持久化緩存 #?[4,?9,?16,?25,?36,?49,?64,?81,?100,?121] #?333333383333334999999 #?test?used?17.49532413482666?second同時我們打開YARN日志來看看:http://localhost:4040/jobs/
因為我們的代碼是需要重復調用RDD1的,當沒有對RDD1進行持久化的時候,每次當它被action算子消費了之后,就釋放了,等下一個算子計算的時候要用,就從頭開始計算一下RDD1。代碼中需要重復調用RDD1 五次,所以沒有緩存的話,差不多每次都要6秒,總共需要耗時26秒左右,但是,做了緩存,每次就只需要3s不到,總共需要耗時17秒左右。
另外,這里需要提及一下一個知識點,那就是持久化的級別,一般cache的話就是放入內存中,就沒有什么好說的,需要講一下的就是另外一個 persist(),它的持久化級別是可以被我們所配置的:
| MEMORY_ONLY | 將數據保存在內存中。如果內存不夠存放所有的數據,則數據可能就不會進行持久化。使用cache()方法時,實際就是使用的這種持久化策略,性能也是最高的。 |
| MEMORY_AND_DISK | 優先嘗試將數據保存在內存中,如果內存不夠存放所有的數據,會將數據寫入磁盤文件中。 |
| MEMORY_ONLY_SER | 基本含義同MEMORY_ONLY。唯一的區別是,會將RDD中的數據進行序列化,RDD的每個partition會被序列化成一個字節數組。這種方式更加節省內存,從而可以避免持久化的數據占用過多內存導致頻繁GC。 |
| MEMORY_AND_DISK_SER | 基本含義同MEMORY_AND_DISK。唯一的區別是會先序列化,節約內存。 |
| DISK_ONLY | 使用未序列化的Java對象格式,將數據全部寫入磁盤文件中。一般不推薦使用。 |
| MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 對于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個持久化的數據,都復制一份副本,并將副本保存到其他節點上。這種基于副本的持久化機制主要用于進行容錯。假如某個節點掛掉,節點的內存或磁盤中的持久化數據丟失了,那么后續對RDD計算時還可以使用該數據在其他節點上的副本。如果沒有副本的話,就只能將這些數據從源頭處重新計算一遍了。一般也不推薦使用。 |
2. 盡量避免使用低性能算子
shuffle類算子算是低性能算子的一種代表,所謂的shuffle類算子,指的是會產生shuffle過程的操作,就是需要把各個節點上的相同key寫入到本地磁盤文件中,然后其他的節點通過網絡傳輸拉取自己需要的key,把相同key拉到同一個節點上進行聚合計算,這種操作必然就是有大量的數據網絡傳輸與磁盤讀寫操作,性能往往不是很好的。
那么,Spark中有哪些算子會產生shuffle過程呢?
| 分區操作 | repartition()、repartitionAndSortWithinPartitions()、coalesce(shuffle=true) | 重分區操作一般都會shuffle,因為需要對所有的分區數據進行打亂。 |
| 聚合操作 | reduceByKey、groupByKey、sortByKey | 需要對相同key進行操作,所以需要拉到同一個節點上。 |
| 關聯操作 | join類操作 | 需要把相同key的數據shuffle到同一個節點然后進行笛卡爾積 |
| 去重操作 | distinct等 | 需要對相同key進行操作,所以需要shuffle到同一個節點上。 |
| 排序操作 | sortByKey等 | 需要對相同key進行操作,所以需要shuffle到同一個節點上。 |
這里進一步介紹一個替代join的方案,因為join其實在業務中還是蠻常見的。
#?原則2:盡量避免使用低性能算子 rdd1?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)]) rdd2?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]) #?低效的寫法,也是傳統的寫法,直接join rdd_join?=?rdd1.join(rdd2) print(rdd_join.collect()) #?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A1',?(211,?11)),?('A1',?(212,?11))] rdd_left_join?=?rdd1.leftOuterJoin(rdd2) print(rdd_left_join.collect()) #?[('A4',?(24,?14)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))] rdd_full_join?=?rdd1.fullOuterJoin(rdd2) print(rdd_full_join.collect()) #?[('A4',?(24,?14)),?('A3',?(None,?13)),?('A2',?(22,?12)),?('A5',?(25,?None)),?('A1',?(211,?11)),?('A1',?(212,?11))]#?高效的寫法,使用廣播+map來實現相同效果 #?tips1:?這里需要注意的是,用來broadcast的RDD不可以太大,最好不要超過1G #?tips2:?這里需要注意的是,用來broadcast的RDD不可以有重復的key的 rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]) rdd2?=?sc.parallelize([('A1',?211),?('A1',?212),?('A2',?22),?('A4',?24),?('A5',?25)])# step1:?先將小表進行廣播,也就是collect到driver端,然后廣播到每個Executor中去。 rdd_small_bc?=?sc.broadcast(rdd1.collect())# step2:從Executor中獲取存入字典便于后續map操作 rdd_small_dict?=?dict(rdd_small_bc.value)# step3:定義join方法 def?broadcast_join(line,?rdd_small_dict,?join_type):k?=?line[0]v?=?line[1]small_table_v?=?rdd_small_dict[k]?if?k?in?rdd_small_dict?else?Noneif?join_type?==?'join':return?(k,?(v,?small_table_v))?if?k?in?rdd_small_dict?else?Noneelif?join_type?==?'left_join':return?(k,?(v,?small_table_v?if?small_table_v?is?not?None?else?None))else:print("not?support?join?type!")# step4:使用 map 實現?兩個表join的功能 rdd_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"join")).filter(lambda?line:?line?is?not?None) rdd_left_join?=?rdd2.map(lambda?line:?broadcast_join(line,?rdd_small_dict,?"left_join")).filter(lambda?line:?line?is?not?None) print(rdd_join.collect()) print(rdd_left_join.collect()) #?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14))] #?[('A1',?(211,?11)),?('A1',?(212,?11)),?('A2',?(22,?12)),?('A4',?(24,?14)),?('A5',?(25,?None))]上面的RDD join被改寫為 broadcast+map的PySpark版本實現,不過里面有兩個點需要注意:
tips1: 用來broadcast的RDD不可以太大,最好不要超過1G
tips2: 用來broadcast的RDD不可以有重復的key的
3. 盡量使用高性能算子
上一節講到了低效算法,自然地就會有一些高效的算子。
| map | mapPartitions | 直接map的話,每次只會處理一條數據,而mapPartitions則是每次處理一個分區的數據,在某些場景下相對比較高效。(分區數據量不大的情況下使用,如果有數據傾斜的話容易發生OOM) |
| groupByKey | reduceByKey/aggregateByKey | 這類算子會在原節點先map-side預聚合,相對高效些。 |
| foreach | foreachPartitions | 同第一條記錄一樣。 |
| filter | filter+coalesce | 當我們對數據進行filter之后,有很多partition的數據會劇減,然后直接進行下一步操作的話,可能就partition數量很多但處理的數據又很少,task數量沒有減少,反而整體速度很慢;但如果執行了coalesce算子,就會減少一些partition數量,把數據都相對壓縮到一起,用更少的task處理完全部數據,一定場景下還是可以達到整體性能的提升。 |
| repartition+sort | repartitionAndSortWithinPartitions | 直接用就是了。 |
4. 廣播大變量
如果我們有一個數據集很大,并且在后續的算子執行中會被反復調用,那么就建議直接把它廣播(broadcast)一下。當變量被廣播后,會保證每個executor的內存中只會保留一份副本,同個executor內的task都可以共享這個副本數據。如果沒有廣播,常規過程就是把大變量進行網絡傳輸到每一個相關task中去,這樣子做,一來頻繁的網絡數據傳輸,效率極其低下;二來executor下的task不斷存儲同一份大數據,很有可能就造成了內存溢出或者頻繁GC,效率也是極其低下的。
#?原則4:廣播大變量 rdd1?=?sc.parallelize([('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]) rdd1_broadcast?=?sc.broadcast(rdd1.collect()) print(rdd1.collect()) print(rdd1_broadcast.value) #?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)] #?[('A1',?11),?('A2',?12),?('A3',?13),?('A4',?14)]資源參數調優
如果要進行資源調優,我們就必須先知道Spark運行的機制與流程。
下面我們就來講解一些常用的Spark資源配置的參數吧,了解其參數原理便于我們依據實際的數據情況進行配置。
1)num-executors
指的是執行器的數量,數量的多少代表了并行的stage數量(假如executor是單核的話),但也并不是越多越快,受你集群資源的限制,所以一般設置50-100左右吧。
2)executor-memory
這里指的是每一個執行器的內存大小,內存越大當然對于程序運行是很好的了,但是也不是無節制地大下去,同樣受我們集群資源的限制。假設我們集群資源為500core,一般1core配置4G內存,所以集群最大的內存資源只有2000G左右。num-executors x executor-memory 是不能超過2000G的,但是也不要太接近這個值,不然的話集群其他同事就沒法正常跑數據了,一般我們設置4G-8G。
3)executor-cores
這里設置的是executor的CPU core數量,決定了executor進程并行處理task的能力。
4)driver-memory
設置driver的內存,一般設置2G就好了。但如果想要做一些Python的DataFrame操作可以適當地把這個值設大一些。
5)driver-cores
與executor-cores類似的功能。
6)spark.default.parallelism
設置每個stage的task數量。一般Spark任務我們設置task數量在500-1000左右比較合適,如果不去設置的話,Spark會根據底層HDFS的block數量來自行設置task數量。有的時候會設置得偏少,這樣子程序就會跑得很慢,即便你設置了很多的executor,但也沒有用。
下面說一個基本的參數設置的shell腳本,一般我們都是通過一個shell腳本來設置資源參數配置,接著就去調用我們的主函數。
#!/bin/bash basePath=$(cd?"$(dirname?)"$(cd?"$(dirname?"$0"):?pwd)")":?pwd)spark-submit?\--master?yarn?\--queue?samshare?\--deploy-mode?client?\--num-executors?100?\--executor-memory?4G?\--executor-cores?4?\--driver-memory?2G?\--driver-cores?2?\--conf?spark.default.parallelism=1000?\--conf?spark.yarn.executor.memoryOverhead=8G?\--conf?spark.sql.shuffle.partitions=1000?\--conf?spark.network.timeout=1200?\--conf?spark.python.worker.memory=64m?\--conf?spark.sql.catalogImplementation=hive?\--conf?spark.sql.crossJoin.enabled=True?\--conf?spark.dynamicAllocation.enabled=True?\--conf?spark.shuffle.service.enabled=True?\--conf?spark.scheduler.listenerbus.eventqueue.size=100000?\--conf?spark.pyspark.driver.python=python3?\--conf?spark.pyspark.python=python3?\--conf?spark.yarn.appMasterEnv.PYSPARK_PYTHON=python3?\--conf?spark.sql.pivotMaxValues=500000?\--conf?spark.hadoop.hive.exec.dynamic.partition=True?\--conf?spark.hadoop.hive.exec.dynamic.partition.mode=nonstrict?\--conf?spark.hadoop.hive.exec.max.dynamic.partitions.pernode=100000?\--conf?spark.hadoop.hive.exec.max.dynamic.partitions=100000?\--conf?spark.hadoop.hive.exec.max.created.files=100000?\${bashPath}/project_name/main.py?$v_var1?$v_var2數據傾斜調優
相信我們對于數據傾斜并不陌生了,很多時間數據跑不出來有很大的概率就是出現了數據傾斜,在Spark開發中無法避免的也會遇到這類問題,而這不是一個嶄新的問題,成熟的解決方案也是有蠻多的,今天來簡單介紹一些比較常用并且有效的方案。
首先我們要知道,在Spark中比較容易出現傾斜的操作,主要集中在distinct、groupByKey、reduceByKey、aggregateByKey、join、repartition等,可以優先看這些操作的前后代碼。而為什么使用了這些操作就容易導致數據傾斜呢?大多數情況就是進行操作的key分布不均,然后使得大量的數據集中在同一個處理節點上,從而發生了數據傾斜。
查看Key 分布
#?針對Spark?SQL hc.sql("select?key,?count(0)?nums?from?table_name?group?by?key")#?針對RDD RDD.countByKey()Plan A: 過濾掉導致傾斜的key
這個方案并不是所有場景都可以使用的,需要結合業務邏輯來分析這個key到底還需要不需要,大多數情況可能就是一些異常值或者空串,這種就直接進行過濾就好了。
Plan B: 提前處理聚合
如果有些Spark應用場景需要頻繁聚合數據,而數據key又少的,那么我們可以把這些存量數據先用hive算好(每天算一次),然后落到中間表,后續Spark應用直接用聚合好的表+新的數據進行二度聚合,效率會有很高的提升。
Plan C: 調高shuffle并行度
#?針對Spark?SQL? --conf?spark.sql.shuffle.partitions=1000??#?在配置信息中設置參數 #?針對RDD rdd.reduceByKey(1000)?#?默認是200Plan D: 分配隨機數再聚合
大概的思路就是對一些大量出現的key,人工打散,從而可以利用多個task來增加任務并行度,以達到效率提升的目的,下面是代碼demo,分別從RDD 和 SparkSQL來實現。
#?Way1:?PySpark?RDD實現 import?pyspark from?pyspark?import?SparkContext,?SparkConf,?HiveContext from?random?import?randint import?pandas?as?pd# SparkSQL的許多功能封裝在SparkSession的方法接口中, SparkContext則不行的。 from?pyspark.sql?import?SparkSession spark?=?SparkSession.builder?\.appName("sam_SamShare")?\.config("master",?"local[4]")?\.enableHiveSupport()?\.getOrCreate()conf?=?SparkConf().setAppName("test_SamShare").setMaster("local[4]") sc?=?SparkContext(conf=conf) hc?=?HiveContext(sc)#?分配隨機數再聚合 rdd1?=?sc.parallelize([('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1),?('sam',?1)])#?給key分配隨機數后綴 rdd2?=?rdd1.map(lambda?x:?(x[0]?+?"_"?+?str(randint(1,5)),?x[1])) print(rdd.take(10)) #?[('sam_5',?1),?('sam_5',?1),?('sam_3',?1),?('sam_5',?1),?('sam_5',?1),?('sam_3',?1)]#?局部聚合 rdd3?=?rdd2.reduceByKey(lambda?x,y?:?(x+y)) print(rdd3.take(10)) #?[('sam_5',?4),?('sam_3',?2)]#?去除后綴 rdd4?=?rdd3.map(lambda?x:?(x[0][:-2],?x[1])) print(rdd4.take(10)) #?[('sam',?4),?('sam',?2)]#?全局聚合 rdd5?=?rdd4.reduceByKey(lambda?x,y?:?(x+y)) print(rdd5.take(10)) #?[('sam',?6)]#?Way2:?PySpark?SparkSQL實現 df?=?pd.DataFrame(5*[['Sam',?1],['Flora',?1]],columns=['name',?'nums']) Spark_df?=?spark.createDataFrame(df) print(Spark_df.show(10))Spark_df.createOrReplaceTempView("tmp_table")?#?注冊為視圖供SparkSQl使用sql?=?""" with?t1?as?(select?concat(name,"_",int(10*rand()))?as?new_name,?name,?numsfrom?tmp_table ), t2?as?(select?new_name,?sum(nums)?as?nfrom?t1group?by?new_name ), t3?as?(select?substr(new_name,0,length(new_name)?-2)?as?name,?sum(n)?as?nums_sum?from?t2group?by?substr(new_name,0,length(new_name)?-2) ) select?* from?t3 """ tt?=?hc.sql(sql).toPandas() tt下面是原理圖。
全文終!
往期精彩回顧適合初學者入門人工智能的路線及資料下載機器學習及深度學習筆記等資料打印機器學習在線手冊深度學習筆記專輯《統計學習方法》的代碼復現專輯 AI基礎下載機器學習的數學基礎專輯黃海廣老師《機器學習課程》視頻課 本站qq群851320808,加入微信群請掃碼: 與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的【机器学习】3万字长文,PySpark入门级学习教程,框架思维的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++17新特性之std::string
- 下一篇: 教师节,收到学生的礼物和祝福,开心