PySpark入门
PySpark Documentation — PySpark 3.3.0 documentationhttps://spark.apache.org/docs/latest/api/python/
?前言 - Spark
1. 運行pyspark的各種方式
1,通過pyspark進入pyspark單機交互式環境。
這種方式一般用來測試代碼。
也可以指定jupyter或者ipython為交互環境。
2,通過spark-submit提交Spark任務到集群運行。
這種方式可以提交Python腳本或者Jar包到集群上讓成百上千個機器運行任務。
這也是工業界生產中通常使用spark的方式。
3,通過zepplin notebook交互式執行。
zepplin是jupyter notebook的apache對應產品。
4, Python安裝findspark和pyspark庫。
可以在jupyter和其它Python環境中像調用普通庫一樣地調用pyspark庫。
2. Spark基本概念
- RDD:是彈性分布式數據集(Resilient Distributed Dataset)的簡稱,是分布式內存的一個抽象概念,提供了一種高度受限的共享內存模型。
- DAG:是Directed Acyclic Graph(有向無環圖)的簡稱,反映RDD之間的依賴關系。
- Driver Program:控制程序,負責為Application構建DAG圖。
- Cluster Manager:集群資源管理中心,負責分配計算資源。
- Worker Node:工作節點,負責完成具體計算。
- Executor:是運行在工作節點(Worker Node)上的一個進程,負責運行Task,并為應用程序存儲數據。
- Application:用戶編寫的Spark應用程序,一個Application包含多個Job。
- Job:作業,一個Job包含多個RDD及作用于相應RDD上的各種操作。
- Stage:階段,是作業的基本調度單位,一個作業會分為多組任務,每組任務被稱為“階段”。
- Task:任務,運行在Executor上的工作單元,是Executor中的一個線程。
- 總結:Application由多個Job組成,Job由多個Stage組成,Stage由多個Task組成。Stage是作業調度的基本單位。
3. PySpark架構設計
pyspark,為了不破壞Spark已有的運行時架構,Spark在外圍包裝一層Python API。在Driver端,借助Py4j實現Python和Java的交互,進而實現通過Python編寫Spark應用程序。在Executor端,則不需要借助Py4j,因為Executor端運行的Task邏輯是由Driver發過來的,那是序列化后的字節碼。
4.?Spark運行流程
5.?RDD數據結構
RDD,Resilient Distributed Dataset,彈性分布式數據集,它是記錄的只讀分區集合,是Spark的基本數據結構。RDD代表一個不可變、可分區、里面的元素可并行計算的集合。
一般有兩種方式創建RDD,第一種是讀取文件中的數據生成RDD,第二種則是通過將內存中的對象并行化得到RDD。
RDD的操作有兩種類型,即Transformation操作和Action操作。轉換操作是從已經存在的RDD創建一個新的RDD,而行動操作是在RDD上進行計算后返回結果到 Driver。
Transformation操作都具有 Lazy 特性,即 Spark 不會立刻進行實際的計算,只會記錄執行的軌跡,只有觸發Action操作的時候,它才會根據 DAG 圖真正執行。
操作確定了RDD之間的依賴關系。
RDD之間的依賴關系有兩種類型,即窄依賴和寬依賴。窄依賴時,父RDD的分區和子RDD的分區的關系是一對一或者多對一的關系。而寬依賴時,父RDD的分區和子RDD的分區是一對多或者多對多的關系。
寬依賴關系相關的操作一般具有shuffle過程,即通過一個Patitioner函數將父RDD中每個分區上key不同的記錄分發到不同的子RDD分區。
依賴關系確定了DAG切分成Stage的方式。
切割規則:從后往前,遇到寬依賴就切割Stage。
RDD之間的依賴關系形成一個DAG有向無環圖,DAG會提交給DAGScheduler,DAGScheduler會把DAG劃分成相互依賴的多個stage,劃分stage的依據就是RDD之間的寬窄依賴。遇到寬依賴就劃分stage,每個stage包含一個或多個task任務。然后將這些task以taskSet的形式提交給TaskScheduler運行。
?
6. 代碼例子
import pyspark from pyspark import SparkContext, SparkConfconf = SparkConf().setAppName("test").setMaster("local[4]") sc = SparkContext(conf=conf) print("spark version:",pyspark.__version__) rdd = sc.parallelize(["hello","spark"]) print(rdd.reduce(lambda x,y:x+' '+y))#wordcount例子 rdd_line = sc.textFile("/home/data/hello.txt") rdd_word = rdd_line.flatMap(lambda x:x.split(" ")) rdd_one = rdd_word.map(lambda t:(t,1)) rdd_count = rdd_one.reduceByKey(lambda x,y:x+y) rdd_count.collect()一、RDD編程
- 創建RDD
- 常用Action操作
- 常用Transformation操作
- 常用PairRDD的轉換操作
- 緩存操作
- 共享變量
- 分區操作
二、SparkSQL編程
- RDD和DataFrame的對比
- 創建DataFrame
- DataFrame保存成文件
- DataFrame的API交互
- DataFrame的SQL交互
三、Spark性能調優方法
一般來說,如果有可能,用戶應當盡可能多地使用SparkSQL以取得更好的性能。
主要原因是SparkSQL是一種聲明式編程風格,背后的計算引擎會自動做大量的性能優化工作。基于RDD的Spark的性能調優屬于坑非常深的領域,并且很容易踩到。
可以用下面三個公式來近似估計spark任務的執行時間:
可以用下面二個公式來說明spark在executor上的內存分配:
四、分布式實現代碼?
五、MLlib機器學習
- DataFrame: MLlib中數據的存儲形式,其列可以存儲特征向量,標簽,以及原始的文本,圖像。
- Transformer:轉換器。具有transform方法。通過附加一個或多個列將一個DataFrame轉換成另外一個DataFrame。
- Estimator:估計器。具有fit方法。它接受一個DataFrame數據作為輸入后經過訓練,產生一個轉換器Transformer。
- Pipeline:流水線。具有setStages方法。順序將多個Transformer和1個Estimator串聯起來,得到一個流水線模型。
Spark MLlib的分布式訓練方法有下面幾個弊端:
1. 數據操作
Sql.dataframe?
spark = SparkSession.builder \.appName("spark") \.getOrCreate()df = spark.read.csv('fraudTrain.csv',header=True) df.printSchema()數據查看
df.count() #查看計數 df.show(truncate=False) #dataframe顯示數據處理
df.take(1) df.select('col')2. 特征工程
缺失值
df.na.drop() # df.dropna() df.na.fill()編碼
from pyspark.ml.feature import Binarizer#二值化 binarizer = Binarizer(threshold=10, inputCol='humidty', outputCol= 'label') df = binarizer.transform(df) df.select('humidty','label').show(4)標準化/歸一化
#特征 from pyspark.ml.feature import VectorAssembler assemble = VectorAssembler(inputCols=featuralist, outputCol='features') df = assemble.transform(df)#標準化 from pyspark.ml.feature import StandardScaler scale = StandardScaler(inputCol='features', outputCol='standardized') scaler = scale.fit(df) df = scaler.transform(df)3. 特征選擇
Chi-Square selector
4. 機器學習模型
from pyspark.ml import Pipelinefrom pyspark.ml.clustering import KMeans from pyspark.ml.classification import LogisticRegressiondt from pyspark.ml.regression import DecisionTreeRegressor#邏輯回歸 blor = LogisticRegression() blorModel = blor.fit(df) blorModel.evaluate(df).accuracy == blorModel.summary.accuracyblorModel.predict(df) blorModel.transform(df).prediction#決策樹回歸 dt = DecisionTreeRegressor(maxDepth=2) model = dt.fit(df) model.featureImportances5. 模型評估
from pyspark.ml.evaluation import BinaryClassificationEvaluatorevaluator = BinaryClassificationEvaluator(rawPredictionCol= 'rawPrediction') evaluator.evaluate(dataset)6. 文本挖掘
詞頻統計?TF-IDF
TF是詞頻(Term Frequency),IDF是逆文本頻率指數(Inverse Document Frequency)
TF-IDF的主要思想是:如果某個詞或短語在一篇文章中出現的頻率TF高,并且在其他文章中很少出現,則認為此詞或者短語具有很好的類別區分能力,適合用來分類。
TF-IDF實際上是:TF * IDF。某一特定文件內的高詞語頻率,以及該詞語在整個文件集合中的低文件頻率,可以產生出高權重的TF-IDF。因此,TF-IDF傾向于過濾掉常見的詞語,保留重要的詞語。
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer from pyspark.ml.feature import StringIndexer#1. HashingTF + IDF + Logistic Regression. #2. CountVectorizer + IDF + Logistic Regression.#分詞 tokenizer = Tokenizer(inputCol='text',outputCol="words") df= tokenizer.transform(df)# numFeatures特征數上限,不同word的數量 hashtf = HashingTF(numFeatures=2**16, inputCol='words', outputCol="tf") hashingTF.transform(df).head().features # => SparseVector(10, {5: 1.0, 7: 1.0, 8: 1.0})# 逆向文件頻率 idf= IDF(minDocFreq=5, inputCol='tf', outputCol="features") model = idf.fit(df) model.transform(df)# 將string列映射為 label [0, numLabels) 0是頻率最高的 # 也可以將numeric列映射為帶標簽string indexer=StringIndexer(inputCol="target", outputCol="label",stringOrderType="frequencyDesc") model = indexer.fit(df) df= model.transform(df)# CountVectorizer()六、基于pySpark的Pandas API
References
PySpark | Spark框架簡述 | Spark環境搭建_跟烏龜賽跑的博客-CSDN博客_pyspark
【Pyspark教程】SQL、MLlib、Core等模塊基礎使用_山頂夕景的博客-CSDN博客_pyspark
【PySpark調優】 3萬字長文,PySpark入門級學習教程,框架思維_CSDN博客
總結
- 上一篇: WORKNC 2020.0 win1
- 下一篇: this指针介绍