PySpark
一、概念
每個spark?應(yīng)用都由一個驅(qū)動器程序(driver program)來發(fā)起集群上的各種并行操作
- driver program 包含了應(yīng)用的main 函數(shù),并且定義了集群上的分布式數(shù)據(jù)集,還對這些分布式數(shù)據(jù)集應(yīng)用了相關(guān)操作
- driver program 通過一個SparkContext 對象來訪問spark
- driver program 一般要管理多個執(zhí)行器(executor) 節(jié)點
SparkContext:該對象代表了對計算集群的一個連接
- 在pyspark shell 中,當(dāng)shell 啟動時,已經(jīng)自動創(chuàng)建了一個SparkContext 對象,它叫做sc。
- 通常可以用它來創(chuàng)建RDD
二、PySpark shell與獨立應(yīng)用
1.PySpark shell
spark?帶有交互式的?shell,可以用于即時數(shù)據(jù)分析
? ? ? ?(1)spark shell 可以與分布式存儲在許多機器的內(nèi)存或者硬盤上的數(shù)據(jù)進行交互,處理過程由spark 自動控制
? (2)pyspark shell 是 spark shell 的python 版本
? ? ? ? 使用pyspark shell:進入spark?的安裝目錄,然后執(zhí)行bin/pyspark?。若已添加環(huán)境變量,可直接輸入:pyspark即可啟動pyspark shell。
2.獨立應(yīng)用
獨立應(yīng)用與pyspark shell?的主要區(qū)別在于:你需要自行初始化SparkContext,除此之外二者使用的API?完全相同。
在python?中,你需要把獨立應(yīng)用寫成python?腳本,然后使用Spark?自帶的bin/spark-submit?腳本來運行:
bin/spark-submit my_script.pyspark-submit會引入python程序的spark依賴
在獨立應(yīng)用中,通常使用下面方法初始化SparkContext:
from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster('local').setAppName('My App') sc = SparkContext(conf = conf)首先創(chuàng)建一個SparkConf?對象來配置應(yīng)用,然后基于該SparkConf?來創(chuàng)建一個SparkContext?對象。
(1)setMaster() 給出了集群的URL,告訴spark 如何連接到集群上。這里'local' 表示讓spark 運行在單機單線程上。(2)setAppName() 給出了應(yīng)用的名字。當(dāng)連接到一個集群上時,這個值可以幫助你在集群管理器的用戶界面上找到你的應(yīng)用。如果需要使用python3,則使用export PYSPARK_PYTHON=python3?來導(dǎo)出環(huán)境變量。
(1)或者在代碼中使用os.environ["PYSPARK_PYTHON"]="python3"三、RDD
1.例子:
? ? (1)單機模式
import pyspark sc = pyspark.SparkContext('local[*]')txt = sc.textFile('file:usr/share/doc/python/copyright') print(txt.count())python_lines = txt.filter(lambda line: 'python' in line.lower()) print(python_lines.count())? ? ? 任何PySpark程序的入口都是SparkContext對象。 該對象允許連接到Spark集群并創(chuàng)建RDD。 local [*]字符串是一個特殊字符串,表示正在使用本地集群,這是正在單機模式下運行的另一種表達方式。 *告訴Spark在計算機上創(chuàng)建與邏輯核心(logical cores)一樣多的工作線程。
? ? (2)分布式
? ? ? 使用群集時,創(chuàng)建SparkContext可能會涉及更多。 要連接到Spark集群,可能需要處理身份驗證以及集群特定的其他一些信息。 可以參照以下內(nèi)容設(shè)置這些詳細信息:
conf = pyspark.SparkConf() conf.setMaster('spark://head_node:56887') conf.set('spark.authenticate', True) conf.set('spark.authenticate.secret', 'secret-key') sc = SparkContext(conf=conf)? ? ? ? 四、創(chuàng)建RDD方法
? ? ? ? 一旦擁有SparkContext對象,就可以開始創(chuàng)建RDD。
? ? ? ? (1)可以通過多種方式創(chuàng)建RDD,但是一種常見的方式是PySpark parallelize()函數(shù)。 parallelize()可以將某些Python數(shù)據(jù)結(jié)構(gòu)(如列表和元組)轉(zhuǎn)換為RDD,從而使它們具有容錯性和分布式功能。
? ? ? ? 為了更好地理解RDD,請考慮另一個示例。 以下代碼創(chuàng)建一個由10,000個元素組成的迭代器,然后使用parallelize()將數(shù)據(jù)分布到2個分區(qū)中:
>>> big_list = range(10000) >>> rdd = sc.parallelize(big_list, 2) >>> odds = rdd.filter(lambda x: x % 2 != 0) >>> odds.take(5) [1, 3, 5, 7, 9]? ? ? ? parallelize()將迭代器轉(zhuǎn)換為一組分布式數(shù)字,并提供了Spark基礎(chǔ)架構(gòu)的所有功能。
? ? ? ? ?請注意,此代碼使用RDD的filter()方法,而不是Python的內(nèi)置filter()。 結(jié)果是一樣的,但是幕后發(fā)生的事情卻大不相同。 通過使用RDD filter()方法,該操作在多個CPU或計算機之間以分布式方式進行。
? ? ? ? 再次,將其想象為Spark完成multiprocessing?工作,所有這些工作都封裝在RDD數(shù)據(jù)結(jié)構(gòu)中。
? ? ? ? take()是查看RDD內(nèi)容的一種方法,但只能查看一小部分。 take()將數(shù)據(jù)子集從分布式系統(tǒng)中拉到單臺計算機上。
? ? ? ? take()對于調(diào)試很重要,因為可能無法在一臺計算機上檢查整個數(shù)據(jù)集。 RDD已針對在大數(shù)據(jù)上使用進行了優(yōu)化,因此在現(xiàn)實世界中,一臺計算機可能沒有足夠的RAM來容納整個數(shù)據(jù)集。
? ? ? ?(2) 創(chuàng)建RDD的另一種方法是使用textFile()讀取文件,在前面的示例中已經(jīng)看到過。 RDD是使用PySpark的基礎(chǔ)數(shù)據(jù)結(jié)構(gòu)之一,因此API中的許多函數(shù)都返回RDD。
? ? ? ? RDD與其他數(shù)據(jù)結(jié)構(gòu)之間的主要區(qū)別之一是處理被延遲直到結(jié)果被請求為止。這類似于Python生成器。 Python生態(tài)系統(tǒng)中通常使用懶惰評估(lazy evaluation)一詞來解釋這種行為。
? ? ? ? ?可以在同一RDD上堆疊多個轉(zhuǎn)換,而無需進行任何處理。之所以可以使用此功能,是因為Spark會維護轉(zhuǎn)換的有向無環(huán)圖。僅在最終結(jié)果被請求時才激活基礎(chǔ)圖。在前面的示例中,直到通過調(diào)用take()請求結(jié)果之后,才進行計算。
? ? ? ? 有多種方法可以向RDD請求結(jié)果。可以使用RDD上的collect()顯式請求評估結(jié)果并將其收集到單個群集節(jié)點。還可以通過各種方式隱式請求結(jié)果,如先前所見,其中之一就是使用count()。
參考:
1.https://realpython.com/pyspark-intro/
2.http://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/spark/chapters/01_basic.html
總結(jié)
- 上一篇: 【自动驾驶】车道线拟合算法---最小二乘
- 下一篇: 现在工作所具备的计算机操作,信息科科长职