[大数据]PySpark原理与基本操作
一 PySpark
Spark運(yùn)行時架構(gòu)
首先我們先回顧下Spark的基本運(yùn)行時架構(gòu),如下圖所示,其中橙色部分表示為JVM,Spark應(yīng)用程序運(yùn)行時主要分為Driver和Executor,Driver負(fù)載總體調(diào)度及UI展示,Executor負(fù)責(zé)Task運(yùn)行,Spark可以部署在多種資源管理系統(tǒng)中,例如Yarn、Mesos等,同時Spark自身也實(shí)現(xiàn)了一種簡單的Standalone(獨(dú)立部署)資源管理系統(tǒng),可以不用借助其他資源管理系統(tǒng)即可運(yùn)行。
更多細(xì)節(jié)請參考 Spark Scheduler內(nèi)部原理剖析。
用戶的Spark應(yīng)用程序運(yùn)行在Driver上(某種程度上說,用戶的程序就是Spark Driver程序),經(jīng)過Spark調(diào)度封裝成一個個Task,再將這些Task信息發(fā)給Executor執(zhí)行,Task信息包括代碼邏輯以及數(shù)據(jù)信息,Executor不直接運(yùn)行用戶的代碼。
?
PySpark運(yùn)行時架構(gòu)
為了不破壞Spark已有的運(yùn)行時架構(gòu),Spark在外圍包裝一層Python API,借助Py4j實(shí)現(xiàn)Python和Java的交互,進(jìn)而實(shí)現(xiàn)通過Python編寫Spark應(yīng)用程序,其運(yùn)行時架構(gòu)如下圖所示。
其中白色部分是新增的Python進(jìn)程,在Driver端,通過Py4j實(shí)現(xiàn)在Python中調(diào)用Java的方法,即將用戶寫的PySpark程序”映射”到JVM中,例如,用戶在PySpark中實(shí)例化一個Python的SparkContext對象,最終會在JVM中實(shí)例化Scala的SparkContext對象;在Executor端,則不需要借助Py4j,因?yàn)镋xecutor端運(yùn)行的Task邏輯是由Driver發(fā)過來的,那是序列化后的字節(jié)碼,雖然里面可能包含有用戶定義的Python函數(shù)或Lambda表達(dá)式,Py4j并不能實(shí)現(xiàn)在Java里調(diào)用Python的方法,為了能在Executor端運(yùn)行用戶定義的Python函數(shù)或Lambda表達(dá)式,則需要為每個Task單獨(dú)啟一個Python進(jìn)程,通過socket通信方式將Python函數(shù)或Lambda表達(dá)式發(fā)給Python進(jìn)程執(zhí)行。語言層面的交互總體流程如下圖所示,實(shí)線表示方法調(diào)用,虛線表示結(jié)果返回。
PySpark官方文檔:https://spark.apache.org/docs/latest/api/python/index.html
pyspark編程指南(英文):https://www.datacamp.com/community/tutorials/apache-spark-python#PySpark
二? PySpark 的操作
備忘清單:https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf
?
?
三 spark-submit提交任務(wù)模板示例
spark-submit提交方式官網(wǎng):http://spark.apache.org/docs/latest/submitting-applications.html
spark-submit \--master yarn \--deploy-mode client \--num-executors 10 \--executor-memory 10g \--executor-cores 8 \--driver-memory 10g \--conf spark.pyspark.python=python3 \--conf spark.pyspark.driver.python=python3 \--py-files depend.zip \demo.py 2020-08-23說明:
1、depend.zip是demo.py的依賴包,注:depend.zip不包含demo.py
2、demo.py中可以直接使用import depend.xx.xx 或 from depend.xx.xx import xx類似語句引入依賴包
3、上述示例中的2020-08-23是傳給demo.py的參數(shù)
參考:
pyspark spark-submit 集群提交任務(wù)以及引入虛擬環(huán)境依賴包攻略:https://www.cnblogs.com/piperck/p/10121097.html
?
四 代碼示例
1 WordCount詞頻統(tǒng)計(jì)
# -*- coding: utf-8 -*- import sys import os import datetime from pyspark import SparkConf,SparkContextsc = SparkConf().setAppName("wordcount") spark = SparkContext(conf=sc)text_file = spark.textFile("hdfs://examples/pyspark/words.txt") word_cnt_rdd = text_file.flatMap(lambda line : line.split(' ')).map(lambda word : (word, 1)).reduceByKey(lambda x, y: x + y) word_cnt_rdd.saveAsTextFile('hdfs://user/wordcount_result')?
#spark-cluster-mode ./spark-submit \ --verbose \ --master yarn \ --deploy-mode cluster \ --num-executors 10 \ --executor-cores 1 \ --executor-memory 8G \ --driver-memory 4G \ --conf spark.pyspark.python=python3 \ wordcount.py#spark-client-mode ./spark-submit \ --verbose \ --master yarn \ --deploy-mode client \ --num-executors 10 \ --executor-cores 1 \ --executor-memory 8G \ --driver-memory 4G \ --conf spark.pyspark.python=python3 \ --conf spark.pyspark.driver.python=python3 \ wordcount.py?
五、RDD操作示例
1 flatMap, map
flatMap有著一對多的表現(xiàn),輸入一輸出多。并且會將每一個輸入對應(yīng)的多個輸出整合成一個大的集合,當(dāng)然不用擔(dān)心這個集合會超出內(nèi)存的范圍,因?yàn)閟park會自覺地將過多的內(nèi)容溢寫到磁盤。當(dāng)然如果對運(yùn)行的機(jī)器的內(nèi)存有著足夠的信心,也可以將內(nèi)容存儲到內(nèi)存中。
用同樣的方法來展示map操作,與flatMap不同的是,map通常是一對一,即輸入一個,對應(yīng)輸出一個。但是輸出的結(jié)果可以是一個元組,一個元組則可能包含多個數(shù)據(jù),但是一個元組是一個整體,因此算是一個元素。
2 spark的filter
3 reduce和reduceByKey的區(qū)別
educe和reduceByKey是spark中使用地非常頻繁的。那么reduce和reduceBykey的區(qū)別在哪呢?
reduce處理數(shù)據(jù)時有著一對一的特性,而reduceByKey則有著多對一的特性。比如reduce中會把數(shù)據(jù)集合中每一個元素都處理一次,并且每一個元素都對應(yīng)著一個輸出。而reduceByKey則不同,它會把所有key相同的值處理并且進(jìn)行歸并,其中歸并的方法可以自己定義。
在單詞統(tǒng)計(jì)中,我們采用的就是reduceByKey,對于每一個單詞我們設(shè)置成一個鍵值對(key,value),我們把單詞作為key,即key=word,而value=1,因?yàn)楸闅v過程中,每個單詞的出現(xiàn)一次,則標(biāo)注1。那么reduceByKey則會把key相同的進(jìn)行歸并,然后根據(jù)我們定義的歸并方法即對value進(jìn)行累加處理,最后得到每個單詞出現(xiàn)的次數(shù)。而reduce則沒有相同Key歸并的操作,而是將所有值統(tǒng)一歸并,一并處理。
?
參考:
http://sharkdtu.com/posts/pyspark-internal.html
?
?
總結(jié)
以上是生活随笔為你收集整理的[大数据]PySpark原理与基本操作的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 服务器出现宕机的原因有哪些
- 下一篇: DedeCMS的织梦专题功能如何实现