(1)spark核心RDD的概念解析、创建、以及相关操作
spark核心之RDD
什么是RDD
RDD指的是彈性分布式數(shù)據(jù)集(Resilient Distributed Dataset),它是spark計(jì)算的核心。盡管后面我們會(huì)使用DataFrame、Dataset進(jìn)行編程,但是它們的底層依舊是依賴于RDD的。我們來解釋一下RDD(Resilient Distributed Dataset)的這幾個(gè)單詞含義。
彈性:在計(jì)算上具有容錯(cuò)性,spark是一個(gè)計(jì)算框架,如果某一個(gè)節(jié)點(diǎn)掛了,可以自動(dòng)進(jìn)行計(jì)算之間血緣關(guān)系的跟蹤
分布式:很好理解,hdfs上數(shù)據(jù)是跨節(jié)點(diǎn)的,那么spark的計(jì)算也是要跨節(jié)點(diǎn)的。
數(shù)據(jù)集:可以將數(shù)組、文件等一系列數(shù)據(jù)的集合轉(zhuǎn)換為RDD
RDD是spark的一個(gè)最基本的抽象(如果你看一下源碼的話,你會(huì)發(fā)現(xiàn)RDD在底層是一個(gè)抽象類,抽象類顯然不能直接使用,必須要繼承它然后實(shí)現(xiàn)它內(nèi)部的一些方法才可以使用,它代表了不可變的、元素的分區(qū)(partition)集合,這些分區(qū)可以被并行操作。假設(shè)我們有一個(gè)300萬元素的數(shù)組,那么我們就可以將300萬個(gè)元素分成3份,每一個(gè)份就是一個(gè)分區(qū),每個(gè)分區(qū)都可以在不同的機(jī)器上進(jìn)行運(yùn)算,這樣就能提高運(yùn)算效率。
RDD支持很多操作,比如:map、filter等等,我們后面會(huì)慢慢介紹。當(dāng)然,RDD在scala的底層是一個(gè)類,但是我們后面有時(shí)候會(huì)把RDD和RDD實(shí)例對(duì)象都叫做RDD,沒有刻意區(qū)分,心里面清楚就可以啦。
RDD特性
RDD有如下五大特性:
RDD是一系列分區(qū)的集合。我們說了對(duì)于大的數(shù)據(jù)集我們可以切分成多份,每一份就是一個(gè)分區(qū),可以每一個(gè)分區(qū)單獨(dú)計(jì)算,所以RDD就是這些所有分區(qū)的集合。就類似于hdfs中的block,一個(gè)大文件也可以切分成多個(gè)block
RDD計(jì)算會(huì)對(duì)每一個(gè)分區(qū)進(jìn)行計(jì)算。假設(shè)我們對(duì)RDD做一個(gè)map操作,顯然是對(duì)RDD內(nèi)部的每一個(gè)分區(qū)都進(jìn)行相同的map操作。
RDD會(huì)依賴于一系列其它的RDD。假設(shè)我們對(duì)RDD1進(jìn)行操作得到了RDD2,然后對(duì)RDD2操作得到了RDD3,同理再得到RDD4。而我們說RDD是不可變的,對(duì)RDD進(jìn)行操作會(huì)形成新的RDD,所以RDD2依賴于RDD1,RDD3依賴于RDD2,RDD4依賴于RDD3,RDD1 => RDD2 => RDD3 => RDD4,所以RDD在轉(zhuǎn)換期間就如同流水線一樣,RDD之間是存在依賴關(guān)系的。這些依賴關(guān)系是非常重要的,假設(shè)RDD1有五個(gè)分區(qū),那么顯然RDD2、3、4也是有五個(gè)分區(qū)的,假設(shè)在計(jì)算RDD3的時(shí)候RDD2的第三個(gè)分區(qū)數(shù)據(jù)丟失了,那么spark會(huì)通過RDD之間血緣關(guān)系,知道RDD2依賴于RDD1,那么會(huì)通過RDD1重新進(jìn)行之前的計(jì)算得到RDD2第三個(gè)分區(qū)的數(shù)據(jù),注意:這種情況只會(huì)計(jì)算丟失的分區(qū)的數(shù)據(jù)。所以我們說RDD具有容錯(cuò)性,如果第n個(gè)操作失敗了,那么會(huì)從第n-1個(gè)操作重新開始。
可選,針對(duì)于key-value類型的RDD,會(huì)有一個(gè)partitioner,來表示這個(gè)RDD如何進(jìn)行分區(qū),比如:基于哈希進(jìn)行分區(qū)。如果不是這種類型的RDD,那么這個(gè)partitioner顯然就是空了。
可選,用于計(jì)算每一個(gè)分區(qū)最好位置。怎么理解呢?我們說數(shù)據(jù)和計(jì)算都是分布式的,如果該分區(qū)對(duì)應(yīng)的數(shù)據(jù)在A機(jī)器上,那么顯然計(jì)算該分區(qū)的最好位置就是A機(jī)器。如果計(jì)算和數(shù)據(jù)不在同一個(gè)機(jī)器或者說是節(jié)點(diǎn)上,那么我們會(huì)把計(jì)算移動(dòng)到相應(yīng)的節(jié)點(diǎn)上,因?yàn)樵诖髷?shù)據(jù)中是有說法的,移動(dòng)計(jì)算優(yōu)于移動(dòng)數(shù)據(jù)。所以RDD第五個(gè)特性就是具有計(jì)算每一個(gè)分區(qū)最好位置的集合。
圖解RDD
spark在運(yùn)行的時(shí)候,每一個(gè)計(jì)算任務(wù)就是一個(gè)task,另外:對(duì)于RDD而言,不是一個(gè)RDD計(jì)算對(duì)應(yīng)一個(gè)task,而是RDD內(nèi)部的每一個(gè)分區(qū)計(jì)算都會(huì)對(duì)應(yīng)一個(gè)task。假設(shè)這個(gè)RDD具有5個(gè)分區(qū),那么對(duì)這個(gè)RDD進(jìn)行一個(gè)map操作,就會(huì)生成5個(gè)task。另外,分區(qū)的數(shù)據(jù)是可以進(jìn)行persist(持久化)的,比如:內(nèi)存、磁盤、內(nèi)存+磁盤、多副本、序列化。
關(guān)于RDD計(jì)算,我們畫一下圖
SparkContext和SparkConf
在介紹RDD之前,我們需要了解一下什么SparkContext和SparkConf,因?yàn)槲覀兛隙ㄒ冗B接到spark集群,才可以創(chuàng)建RDD進(jìn)行編程。
SparkContext是pyspark的編程入口,作業(yè)的提交,任務(wù)的分發(fā),應(yīng)用的注冊(cè)都會(huì)在SparkContext中進(jìn)行。一個(gè)SparkContext實(shí)例對(duì)象代表了和spark的一個(gè)連接,只有建立的連接才可以把作業(yè)提交到spark集群當(dāng)中去。實(shí)例化了SparkContext之后才能創(chuàng)建RDD、以及我們后面會(huì)介紹的Broadcast廣播變量。
SparkConf是用來設(shè)置配置的,然后傳遞給SparkContext。
對(duì)于創(chuàng)建一個(gè)SparkContext對(duì)象,首先我們可以通過pyspark模塊來創(chuàng)建:
from pyspark import SparkContext
from pyspark import SparkConf
# setAppName是設(shè)置展示在webUI上的名字,setMaster表示運(yùn)行模式
# 但是我們目前是硬編碼,官方推薦在提交任務(wù)的時(shí)候傳遞。當(dāng)然我們后面說,現(xiàn)在有個(gè)印象即可
conf = SparkConf().setAppName("satori").setMaster("local")
# 此時(shí)我們就實(shí)例化出來一個(gè)SparkContext對(duì)象了,傳遞SparkConf對(duì)象
sc = SparkContext(conf=conf)
# 我們就可以使用sc來創(chuàng)建RDD
# 總之記住:SparkContext是用來實(shí)例化一個(gè)對(duì)象和spark集群建立連接的
# SparkConf是用來設(shè)置一些配置的,傳遞給SparkContext
其次我們通過shell進(jìn)行操作,我們直接啟動(dòng)pyspark:
當(dāng)我們啟動(dòng)之后,輸入sc,我們看到pyspark shell直接為我們創(chuàng)建了一個(gè)默認(rèn)的SparkContext實(shí)例對(duì)象,master叫做local[*](*表示使用計(jì)算機(jī)所以的核),appName叫做PySparkShell。我們?cè)诮榻BRDD相關(guān)操作的時(shí)候,會(huì)先使用shell的方式進(jìn)行演示,當(dāng)然使用py腳本編程的時(shí)候也是一樣的。另外,pyspark使用的是原生的Cpython解釋器,所以像numpy、pandas之類的包,原生python可以導(dǎo)入的,在pyspark shell里面也是可以導(dǎo)入的。
我們通過sc.getConf()也能拿到對(duì)應(yīng)的SparkConf實(shí)例對(duì)象。
那么我們可不可以在創(chuàng)建的時(shí)候手動(dòng)指定master和name呢?答案顯然是可以的。
我們看到我們?cè)趧?chuàng)建的時(shí)候手動(dòng)設(shè)置的master和name生效了,我們?cè)偻ㄟ^webUI來看一下,pyspark的webUI默認(rèn)是4040。
創(chuàng)建RDD
我們說RDD是spark的核心,那么如何創(chuàng)建一個(gè)RDD呢?答案顯然是通過SparkContext實(shí)例對(duì)象,因?yàn)樯厦嬉呀?jīng)說了。你可以通過編寫py文件的方式(我們后面會(huì)說)、手動(dòng)創(chuàng)建一個(gè)SparkContext實(shí)例對(duì)象,也可以通過啟動(dòng)pyspark shell,直接使用默認(rèn)為你創(chuàng)建好的,對(duì),就是那個(gè)sc。由于SparkContext實(shí)例對(duì)象操作方式都是一樣的,所以我們目前就先使用pyspark shell來進(jìn)行編程。后面我們會(huì)說如何通過編寫腳本的方式進(jìn)行spark編程,以及作業(yè)如何提交到spark上運(yùn)行。
通過sc(為了方便,sc就代指了SparkContext實(shí)例對(duì)象)創(chuàng)建RDD有兩種方式。
將一個(gè)已經(jīng)存在的集合轉(zhuǎn)成RDD
通過讀取存儲(chǔ)系統(tǒng)里面的文件,轉(zhuǎn)成RDD。這個(gè)存儲(chǔ)系統(tǒng)可以是本地、hdfs、hbase、s3等等,甚至可以是mysql等關(guān)系型數(shù)據(jù)庫。
下面我們就來代碼操作如何創(chuàng)建RDD,注意:現(xiàn)在我們是在pyspark shell中進(jìn)行操作的。所以sc是創(chuàng)建好的,不要看到了sc覺得納悶,為什么變量沒定義就可以使用;還有由于是交互式環(huán)境,我們也不需要print,如果是可打印的,會(huì)自動(dòng)打印。
從已經(jīng)存在的集合創(chuàng)建
>>> data = range(10)
>>> rdd1 = sc.parallelize(data) # 調(diào)用sc.parallelize方法,可以將已經(jīng)存在的集合轉(zhuǎn)為RDD
>>> data
range(0, 10)
>>> rdd1 # 輸出得到的是一個(gè)RDD對(duì)象
PythonRDD[1] at RDD at PythonRDD.scala:53
>>> rdd1.collect() # 如果想輸出的話,調(diào)用collect方法,這些后面會(huì)說。
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> # 進(jìn)行map操作得到rdd2
>>> rdd2 = rdd1.map(lambda x: x + 1)
>>> rdd2.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
>>> # 進(jìn)行reduce操作
>>> rdd2.reduce(lambda x, y: x + y)
55
>>> # 這些RDD相關(guān)的操作函數(shù)我們后面會(huì)說,但是從python的內(nèi)置函數(shù)map、reduce顯然也能明白是干什么的
我們看一下web界面
上面顯示了三個(gè)任務(wù),為什么是三個(gè),我們后面會(huì)說。另外我們通過parallelize創(chuàng)建RDD的時(shí)候是可以指定分區(qū)的。
>>> rdd3 = sc.parallelize(data, 5)
>>> rdd3.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
雖然結(jié)果沒有變化,但是我們來看一下web界面。
我們看到任務(wù)數(shù)量變成了5,因?yàn)橹付?個(gè)分區(qū),至于下面的2,說明默認(rèn)是兩個(gè)分區(qū)。因?yàn)榉謪^(qū)可大可小,如果每一個(gè)節(jié)點(diǎn)的cpu只執(zhí)行一個(gè)分區(qū)可能有點(diǎn)浪費(fèi),如果跑的快的、或者分區(qū)的數(shù)據(jù)集比較少的,很快就跑完了,那么容易造成資源浪費(fèi),因此spark官方建議每隔CPU對(duì)應(yīng)2到4個(gè)分區(qū),這樣可以把資源充分利用起來。至于具體設(shè)置多少個(gè),這個(gè)就取決于實(shí)際項(xiàng)目、以及規(guī)定的處理時(shí)間、節(jié)點(diǎn)對(duì)應(yīng)的機(jī)器性能等等,所以如果你根據(jù)業(yè)務(wù)找到了比較好的分區(qū)個(gè)數(shù),那么就傳遞給parallelize的第二個(gè)參數(shù)即可。
從存儲(chǔ)系統(tǒng)里面的文件創(chuàng)建
我們還可以讀取存儲(chǔ)系統(tǒng)里面的文件來創(chuàng)建RDD。我們演示一下從本地讀取文件、和從hdfs上讀取文件。
在本地創(chuàng)建一個(gè)satori.txt,內(nèi)容如下,并上傳到hdfs上面。
>>> # 讀取文件使用textFile,接收一個(gè)文件路徑,當(dāng)然同時(shí)也可以指定分區(qū)
>>> # 我們可以從本地讀取,讀取的格式為"file://文件路徑"
>>> rdd1 = sc.textFile("file:///root/satori.txt")
>>> rdd1.collect() # 我們看到默認(rèn)是以
分隔的
['hello golang', 'hello java', 'hello python', 'hello scala']
>>>
>>> # 從hdfs上讀取,格式為"hdfs://ip:port文件路徑",port就是hdfs集群上的端口,就是你在core-site.xml里面設(shè)置的
>>> rdd2 = sc.textFile("hdfs://localhost:9000/satori.txt", 4)
>>> rdd2.collect()
['hello golang', 'hello java', 'hello python', 'hello scala']
>>>
>>> rdd2.map(lambda x: len(x)).collect()
[12, 10, 12, 11]
>>> rdd2.map(lambda x: len(x)).reduce(lambda x, y: x + y)
45
>>>
我們看到通過textFile讀取外部文件的方式創(chuàng)建RDD也是沒有問題的,但是需要注意的是:如果你是spark集群,并且還是通過本地文件的方式,那么你要保證所有節(jié)點(diǎn)上相同路徑都存在該文件。
我目前都是單節(jié)點(diǎn)的,當(dāng)然對(duì)于學(xué)習(xí)來講單節(jié)點(diǎn)和多節(jié)點(diǎn)都是差不多的,不可能因?yàn)橛玫亩喙?jié)點(diǎn),語法就變了,只是多節(jié)點(diǎn)在操作的時(shí)候要考慮到通信、資源等問題。比如:我們這里讀取的是本地的/root/satori.txt,這就表示訪問本地的/root/satori.txt文件,如果你搭建的是集群,那么你要保證每個(gè)節(jié)點(diǎn)都存在/root/satori.txt,否則節(jié)點(diǎn)根本獲取不到這個(gè)數(shù)據(jù)。因此這種情況需要也別注意了,所以在學(xué)習(xí)語法的時(shí)候我個(gè)人不建議搭建spark集群(也就是所謂的standalone模式),公司生產(chǎn)上面也很少使用這種模式,當(dāng)然不是沒有,只是很少,絕大部分都是跑在yarn上面的。關(guān)于spark的運(yùn)行模式,資源管理以及調(diào)度、我們后面也會(huì)慢慢聊。
因此解決辦法就是把文件拷貝到每一個(gè)節(jié)點(diǎn)上面,或者使用網(wǎng)絡(luò)共享的文件系統(tǒng)。
另外textFile不光可以讀取文件,還可以讀取目錄:/dir、模糊匹配:/dir/*.txt、以及讀取gz壓縮包都是支持的。
除了textFile,還可以使用wholeTextFiles讀取目錄。
wholeTextFiles:接收一個(gè)目錄,會(huì)把里面所有的文件內(nèi)容讀取出來,以[("文件名", "文件內(nèi)容"), ("文件名", "文件內(nèi)容")...]的格式返回
>>> sc.wholeTextFiles("hdfs://localhost:9000/").collect()
[('hdfs://localhost:9000/satori.txt', 'hello golang
hello java
hello python
hello scala
')]
>>> # 我這里/目錄下面只有一個(gè)文件,把文件內(nèi)容全部讀取出來了
我們現(xiàn)在知道如何讀取文件轉(zhuǎn)化為RDD,那么我們?nèi)绾螌DD保存為文件呢?可以使用saveAsTextFile
>>> data = [1, 2, 3, 4, 5]
>>> rdd1 = rdd.map(lambda x: f"古明地覺{x}號(hào)")
>>> # 默認(rèn)是本地,當(dāng)然也可以指定file://
>>> rdd1.saveAsTextFile("/root/a.txt")
>>> # 保存到hdfs上面
>>> rdd1.saveAsTextFile("hdfs://localhost:9000/a.txt")
但是我們發(fā)現(xiàn)保存的a.txt并不是一個(gè)文件,并不是說把整個(gè)rdd都保存一個(gè)文件,這個(gè)是由你的分區(qū)決定的。保存的是一個(gè)目錄,文件在目錄里面,我們看到有兩部分,因?yàn)槭莾蓚€(gè)分區(qū)。
>>> data = [1, 2, 3, 4, 5]
>>> # 這里我們創(chuàng)建rdd的時(shí)候,指定5個(gè)分區(qū)
>>> rdd = sc.parallelize(data, 5)
>>> rdd1 = rdd.map(lambda x: f"古明地覺{x}號(hào)")
>>> # 保存為b.txt,顯然這個(gè)b.txt是個(gè)目錄
>>> rdd1.saveAsTextFile("/root/b.txt")
>>>
結(jié)果跟我們預(yù)想的是一樣的,有多少個(gè)分區(qū)就會(huì)有多少個(gè)part,因?yàn)閟park是把每個(gè)分區(qū)單獨(dú)寫入一個(gè)文件里面。至于hdfs我們就不用演示了,一樣的,算了還是看看吧。
spark應(yīng)用程序開發(fā)以及運(yùn)行
我們目前是通過pyspark shell進(jìn)行操作的,顯然這僅僅是用來做測(cè)試使用的,我們真正開發(fā)項(xiàng)目肯定是使用ide進(jìn)行操作的(vim、notepad你也給我當(dāng)成是ide,Σ(⊙▽⊙"a)。下面我們就來看看如何使用python開發(fā)一個(gè)spark應(yīng)用程序,并且運(yùn)行它。這里我在Windows上使用pycharm開發(fā),注意:但是python解釋器配置的我阿里云上python3,pycharm是支持這個(gè)功能的,遠(yuǎn)程連接服務(wù)器上的python環(huán)境,所以我們?cè)赪indows上操作的python是linux上的python。
import os
import platform
print(os.name) # posix
print(platform.system()) # Linux
print(os.listdir("/"))
"""
['home', 'run', 'tmp', 'opt', 'usr', 'lost+found', 'srv', 'lib', '.autorelabel',
'proc', 'mnt', 'boot', 'lib64', 'dev', 'redis6379.log', 'sbin', 'sys', 'root',
'bin', 'media', 'etc', 'var', 'data']
"""
還有一種簡(jiǎn)便的方法,你在服務(wù)器上啟動(dòng)一個(gè)jupyter notebook,然后再Windows上通過瀏覽器打開、輸入token遠(yuǎn)程連接也是可以的。當(dāng)然如果需要編寫的py文件比較多就不推薦了,如果只是學(xué)習(xí)的話還是可以的。
from pyspark import SparkContext
from pyspark import SparkConf
# 創(chuàng)建SparkConf實(shí)例:設(shè)置的是spark相關(guān)的參數(shù)信息
# 我們這里只設(shè)置appName,master默認(rèn)就好,當(dāng)然名字設(shè)置不設(shè)置也無所謂啊
conf = SparkConf().setAppName("satori")
# 傳入conf,創(chuàng)建SparkContext對(duì)象。另外master、appName也是可以在SparkContext里面單獨(dú)設(shè)置的
sc = SparkContext(conf=conf)
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
# 不在shell里面了,我們需要print才能看到結(jié)果
print(rdd1.collect()) # [1, 2, 3, 4, 5]
# 好的習(xí)慣,編程結(jié)束之后stop掉,表示關(guān)閉與spark的連接
# 否則當(dāng)你再次創(chuàng)建相同的SparkContext實(shí)例的時(shí)候就會(huì)報(bào)錯(cuò)
# 會(huì)提示你:Cannot run multiple SparkContexts at once; existing SparkContext(app=satori, master=local[*]
sc.stop()
我們這里是通過pyspark模塊執(zhí)行是成功的,那么我們也可以編寫一個(gè)py文件提交到spark上面運(yùn)行。
提交方式:pyspark-submit --master xxx --name xxx py文件
from pyspark import SparkContext
from pyspark import SparkConf
# 這里我們不再設(shè)置master和appName(name)了,還記得我們之前說過嗎?
# 官方不推薦這種硬編碼的模式,而是通過提交任務(wù)的時(shí)候指定
conf = SparkConf()
# 既然如此,那么我們就不再需要這個(gè)SparkConf了,這里我們寫上但是不傳遞到SparkContext里
sc = SparkContext()
data = [1, 2, 3, 4, 5]
rdd1 = sc.parallelize(data)
print(rdd1.collect()) # [1, 2, 3, 4, 5]
sc.stop()
上面的代碼我們起名為test1.py,然后提交該作業(yè):spark-submit --master local[*] --name 古明地覺 /root/test1.py
我們提交之后,執(zhí)行是成功了的,但是輸出的東西灰常多,程序的結(jié)果就隱藏在中間。
那么問題來了,如果我有很多文件怎么辦?要是標(biāo)準(zhǔn)庫里面的包我們可以導(dǎo)入,但如果是我們自己寫的依賴怎么提交呢?首先多個(gè)文件(目錄)里面一定存在一個(gè)啟動(dòng)文件,用來啟動(dòng)整個(gè)程序。假設(shè)這個(gè)啟動(dòng)文件叫start.py(當(dāng)然啟動(dòng)文件一定在項(xiàng)目的最外層,如果在項(xiàng)目的包里面,那么它就不可能成為啟動(dòng)文件),那么把除了start.py的其它文件(目錄)打包成一個(gè)zip包或者egg,假設(shè)叫做dependency.egg,那么執(zhí)行的時(shí)候就可以這么執(zhí)行:
spark-submit --master xxx --name xxx --py-files dependency.egg start.py
如果我們寫的程序需要從命令行中傳遞參數(shù),那么直接跟在start.py(啟動(dòng)文件)后面就行。
關(guān)于輸出結(jié)果,我們只截取了一部分,詳細(xì)信息可以自己慢慢查看。以及spark-submit支持的其它參數(shù),也可以通過spark-submit --help來查看,不過很多都用不到,因?yàn)閟park-submit不僅可以提交python程序,還可以提交java等其它程序,里面的很多參數(shù)是為其它語言編寫的程序準(zhǔn)備的,python用不到。
RDD相關(guān)操作
我們已經(jīng)知道如何創(chuàng)建一個(gè)RDD、以及使用python開發(fā)spark程序并提交運(yùn)行,那么下面我們來看看RDD都能進(jìn)行哪些操作。我們讀取數(shù)據(jù)轉(zhuǎn)成RDD之后肯定是要進(jìn)行操作的,我們之前看到了map、reduce、collect等操作,但是除了這些,RDD還支持很多其他的操作,我們來看一下。
RDD的操作分為兩種:transformation和action。
transformation:從一個(gè)RDD轉(zhuǎn)換成新的RDD這個(gè)過程就是transformation,比如map操作
action:對(duì)RDD進(jìn)行計(jì)算得到一個(gè)值的過程叫做action,比如collect。
直接看可能不好理解,我們來舉個(gè)例子。我們對(duì)一個(gè)RDD進(jìn)行map操作得到了新的RDD,但是這個(gè)RDD它并不是具體的值。我們對(duì)RDD進(jìn)行collect操作的時(shí)候,才會(huì)把值返回回來。實(shí)際上,所有的transformation都是惰性的,意思是我們進(jìn)行map操作的時(shí)候,RDD只是記錄了這個(gè)操作,但是它并沒有具體的計(jì)算,當(dāng)我們進(jìn)行collect求值的時(shí)候才會(huì)真正的開始進(jìn)行計(jì)算。
>>> data = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(data)
>>> rdd1 = rdd.map(lambda x: str(x) + "~~~")
>>> rdd2 = rdd1.map(lambda x: "~~~" + x)
>>>
>>> rdd2.collect()
['~~~1~~~', '~~~2~~~', '~~~3~~~', '~~~4~~~', '~~~5~~~']
>>>
我們對(duì)rdd進(jìn)行操作得到rdd1,rdd1得到rdd2,像這種對(duì)一個(gè)RDD操作得到新的RDD的過程我們稱之為transformation,它是惰性的(lazy),這些過程并不會(huì)真正的開始計(jì)算,只是記錄了相關(guān)的操作。當(dāng)我們對(duì)于rdd2進(jìn)行collect操作、要獲取值的時(shí)候,才會(huì)真正的開始計(jì)算,會(huì)從最初的rdd開始計(jì)算,這個(gè)過程我們稱之為action。
下面我們就來舉例說明RDD的相關(guān)操作:
map
map:接收一個(gè)函數(shù),會(huì)對(duì)RDD里面每一個(gè)分區(qū)的每一個(gè)元素都執(zhí)行相同的操作。話說,能用pyspark的編程的,我估計(jì)這些說了都是廢話。因此如果有些函數(shù)和python的內(nèi)置函數(shù)比較類似的,我就不說那么詳細(xì)了。
>>> rdd1 = sc.parallelize([1, 2, 3, 4, 5])
>>> # 給里面每一個(gè)元素都執(zhí)行加1的操作
>>> rdd1.map(lambda x: x+1).collect()
[2, 3, 4, 5, 6]
filter
filter:類似Python中的filter,選擇出符合條件的
>>> numbers = [1, 2, 3, 4, 5, 6, 7, 8]
>>> rdd = sc.parallelize(numbers)
>>> rdd.filter(lambda x: x > 3).collect()
[4, 5, 6, 7, 8]
>>>
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4, 6, 8]
flatMap
flatMap:和map不同的是,map是輸出一個(gè)值返回一個(gè)值,而flatMap是輸入一個(gè)值,返回一個(gè)序列、然后將這個(gè)序列打開,我們舉例說明。
>>> word = ["satori"]
>>> # 函數(shù)接收什么,返回什么,所以還是原來的結(jié)果
>>> sc.parallelize(word).map(lambda x: x).collect()
['satori']
>>> # 接收一個(gè)值,返回一個(gè)序列,然后會(huì)自動(dòng)將這個(gè)序列打開
>>> sc.parallelize(word).flatMap(lambda x: x).collect()
['s', 'a', 't', 'o', 'r', 'i']
>>>
>>> # split之后是一個(gè)列表,對(duì)于map,那么返回的就是列表
>>> words = ["hello mashiro", "hello satori"]
>>> sc.parallelize(words).map(lambda x: x.split(" ")).collect()
[['hello', 'mashiro'], ['hello', 'satori']]
>>> # 但對(duì)于flatMap來說,會(huì)將這個(gè)列表打開
>>> sc.parallelize(words).flatMap(lambda x: x.split(" ")).collect()
['hello', 'mashiro', 'hello', 'satori']
>>>
所以從名字上看,flatMap相比map多了一個(gè)flat,也是很形象的,flat表示平的,操作上就是直接將列表打開,不再嵌套。另外我們看到我們將很多操作都寫在了一行,這是沒有問題的,如果操作比較多,我們鼓勵(lì)寫在一行,這叫做鏈?zhǔn)骄幊獭.?dāng)然如果為了直觀,你也可以分為多行來寫,反正transformation也是懶加載。
groupByKey
groupByKey:這個(gè)語言表達(dá)有點(diǎn)困難,我們直接看一個(gè)例子。
>>> val = [("a", "hello"), ("a", "how are you"), ("b", "who am i"), ("a", 4)]
>>> rdd = sc.parallelize(val)
>>>
>>> rdd.groupByKey().collect()
[('b', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe37b8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3630>)]
>>> rdd.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
[('b', ['who am i']), ('a', ['hello', 'how are you', 4])]
>>>
我們看到使用groupByKey的rdd,是一個(gè)由[(x1, y1), (x2, y2), (x3, y3)...]這樣的序列(當(dāng)然里面不一定是元組、列表也是可以的)轉(zhuǎn)化得到的,然后使用groupByKey會(huì)將元組里面第一個(gè)值相同的聚合到一起,就像我們看到的那樣,只不過得到的是一個(gè)可迭代對(duì)象,我們需要轉(zhuǎn)化為list對(duì)象。這個(gè)功能特別適合word count,也就是詞頻統(tǒng)計(jì),再來看一個(gè)例子。
>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> # 先進(jìn)行分隔
>>> rdd1 = rdd.flatMap(lambda x: x.split(" "))
>>> rdd1.collect()
['hello', 'mashiro', 'hello', 'world', 'hello', 'koishi']
>>> # 給每個(gè)詞都標(biāo)上一個(gè)1,因?yàn)樗鼈兠總€(gè)詞都出現(xiàn)了1次
>>> rdd2 = rdd1.map(lambda x: (x, 1))
>>> rdd2.collect()
[('hello', 1), ('mashiro', 1), ('hello', 1), ('world', 1), ('hello', 1), ('koishi', 1)]
>>>
>>> # 使用groupByKey將值相同的匯聚到一起
>>> rdd3 = rdd2.groupByKey()
>>> rdd3.collect()
[('mashiro', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3828>), ('world', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3128>), ('koishi', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3c50>), ('hello', <pyspark.resultiterable.ResultIterable object at 0x7fa0aefe3470>)]
>>> # 變成list對(duì)象
>>> rdd4 = rdd3.map(lambda x: (x[0], list(x[1])))
>>> rdd4.collect()
[('mashiro', [1]), ('world', [1]), ('koishi', [1]), ('hello', [1, 1, 1])]
>>> # 進(jìn)行求和,即可得到每個(gè)詞出現(xiàn)的次數(shù)。當(dāng)然求和的話可以直接使用sum,沒必要先變成list對(duì)象
>>> rdd5 = rdd4.map(lambda x: (x[0], sum(x[1])))
>>> rdd5.collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
>>>
>>>
還記得之前說的鏈?zhǔn)骄幊虇幔科鋵?shí)這個(gè)詞頻統(tǒng)計(jì)很簡(jiǎn)單,工作上是沒必要寫這么多行的。
>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
所以groupByKey非常適合詞頻統(tǒng)計(jì),這里面不接收參數(shù),調(diào)用這個(gè)方法RDD需要是一個(gè)列表或者元組、里面嵌套多個(gè)列表或者元組(包含兩個(gè)元素),然后把索引為0的值相同的聚合在一起。
reduceByKey
調(diào)用reduceByKey方法的rdd對(duì)應(yīng)的數(shù)據(jù)集和groupByKey是一樣的,我們一旦看到ByKey,就應(yīng)該想到序列里面的元素要是一個(gè)有兩個(gè)元素的序列,然后第一個(gè)元素相同的分發(fā)到一起。但是它和groupByKey不同的是,groupByKey不接收參數(shù),然后直接把第一個(gè)元素相同聚合在一起,而reduceByKey會(huì)比groupByKey多一步,因?yàn)樗枰邮芤粋€(gè)函數(shù),會(huì)自動(dòng)將分發(fā)到一起的值(原來所有序列的第二個(gè)元素)進(jìn)行一個(gè)計(jì)算。舉例說明:
>>> words = ["hello mashiro", "hello world", "hello koishi"]
>>> rdd = sc.parallelize(words)
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).groupByKey().map(lambda x: (x[0], sum(x[1]))).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
>>>
>>> rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y).collect()
[('mashiro', 1), ('world', 1), ('koishi', 1), ('hello', 3)]
和groupByKey對(duì)比的話,還是很清晰的。
sortByKey
sortByKey:從名字能看出來,這個(gè)是排序用的,按照索引為0的元素進(jìn)行排序。
>>> words = [('c', 2), ('a', 1), ('b', 3)]
>>> rdd = sc.parallelize(words)
>>>
>>> rdd.sortByKey().collect()
[('a', 1), ('b', 3), ('c', 2)]
>>>
>>> rdd.sortByKey(False).collect()
[('c', 2), ('b', 3), ('a', 1)]
>>> # 把元祖里面的兩個(gè)元素想象成字典的key: value,ByKey自然是根據(jù)Key來進(jìn)行操作
>>> # 可顯然我們是想根據(jù)value來進(jìn)行排序,根據(jù)出現(xiàn)次數(shù)多的進(jìn)行排序。所以我們可以先交換順序,排完了再交換回來
>>> rdd.map(lambda x: (x[1], x[0])).sortByKey().map(lambda x: (x[1], x[0])).collect()
[('a', 1), ('c', 2), ('b', 3)]
>>> rdd.map(lambda x: (x[1], x[0])).sortByKey(False).map(lambda x: (x[1], x[0])).collect()
[('b', 3), ('c', 2), ('a', 1)]
>>> # 默認(rèn)從小到大排,F(xiàn)alse則表示逆序、從大到小排
union
union:合并兩個(gè)RDD
>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([11, 22, 33])
>>> # 很簡(jiǎn)單,就是將兩個(gè)RDD合并
>>> rdd1.union(rdd2).collect()
[1, 2, 3, 11, 22, 33]
>>> # 甚至和自身做union也是可以的
>>> rdd1.union(rdd1).collect()
[1, 2, 3, 1, 2, 3]
distinct
distinct:去重,我們看到這有點(diǎn)像sql啊。其實(shí)spark還支持spark sql、也就是寫sql語句的方式進(jìn)行編程。我們后面、或者下一篇博客會(huì)說。
>>> rdd = sc.parallelize([11, 11, 2, 22, 3, 33, 3]).distinct()
>>> # 不過去重之后貌似沒什么順序了
>>> rdd.collect()
[2, 22, 11, 3, 33]
join
join:熟悉sql的估計(jì)肯定不陌生,join有以下幾種:inner join、left join、right join、outer join。這個(gè)操作join的RDD和xxxByKey對(duì)應(yīng)的RDD應(yīng)該具有相同的數(shù)據(jù)格式,對(duì),就是[(x1, y1), (x2, y2)...]這種格式。
有時(shí)候光說不好理解,看例子就能很容易明白。
>>> rdd1 = sc.parallelize([("name", "古明地覺"), ("age", 16), ("gender", "female")])
>>> rdd2 = sc.parallelize([("name", "古明地戀"), ("age", 15), ("place", "東方地靈殿")])
>>>
>>> # join默認(rèn)是內(nèi)連接,還是想象成key: value,把兩個(gè)RDD的key相同的匯聚在一起
>>> # 如果不存在相同的key,那么舍棄
>>> rdd1.join(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('age', (16, 15))]
>>>
>>> # 以左RDD為基準(zhǔn),如果右RDD沒有與之匹配的則為None,比如rdd1的"gender"在rdd2中不存在,所以置為None
>>> rdd1.leftOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('gender', ('female', None)), ('age', (16, 15))]
>>>
>>> # 同理以右RDD為基準(zhǔn),當(dāng)然啦,順序還是從左到右的,里面的元素顯示rdd1的元素,再是rdd2的元素
>>> rdd1.rightOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('age', (16, 15)), ('place', (None, '東方地靈殿'))]
>>>
>>> # 全連接,不用我說了
>>> rdd1.fullOuterJoin(rdd2).collect()
[('name', ('古明地覺', '古明地戀')), ('gender', ('female', None)), ('age', (16, 15)), ('place', (None, '東方地靈殿'))]
zip
zip:類似于python中的zip,但是要求兩個(gè)RDD的元素個(gè)數(shù)以及分區(qū)數(shù)必須一樣。
>>> rdd1 = sc.parallelize(['a', 'b', 'c'])
>>> rdd2 = sc.parallelize([1, 2, 3])
>>>
>>> rdd1.zip(rdd2).collect()
[('a', 1), ('b', 2), ('c', 3)]
>>>
zipWithIndex
zipWithIndex:對(duì)單個(gè)RDD操作的,會(huì)給每個(gè)元素加上一層索引,從0開始自增。
>>> rdd1 = sc.parallelize(['a', 'b', 'c'])
>>> rdd1.zipWithIndex().collect()
[('a', 0), ('b', 1), ('c', 2)]
以上就是一些常用的transformation操作,我們說RDD轉(zhuǎn)換得到新的RDD這個(gè)過程叫做transformation,它是惰性的,只是記錄了操作,但是并沒有立刻進(jìn)行計(jì)算。當(dāng)遇到action操作時(shí)(計(jì)算具體的值,比如collect、reduce、當(dāng)然還有其它action操作,我們后面會(huì)說),才會(huì)真正進(jìn)行計(jì)算。那么下面我們?cè)賮砜纯匆恍┎皇呛艹S玫膖ransformation操作。
mapPartitions
mapPartitions:這個(gè)是對(duì)每一個(gè)分區(qū)進(jìn)行map
>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> # 函數(shù)參數(shù)x不再是rdd的每一個(gè)元素,而是rdd的每一個(gè)分區(qū)
>>> # 這個(gè)不能寫return,要寫yield,或者返回一個(gè)可迭代的對(duì)象,會(huì)自動(dòng)獲取里面的所有元素
>>> def f(x): yield sum(x)
...
>>> # 三個(gè)分區(qū),顯然一個(gè)分區(qū)兩個(gè)元素,那么會(huì)把每個(gè)分區(qū)的所有元素進(jìn)行相加
>>> rdd.mapPartitions(f).collect()
[3, 7, 11]
>>> # sum(x)不是一個(gè)可迭代的,我們需要放在一個(gè)列表里面,或者定義函數(shù)使用yield也行
>>> # 會(huì)自動(dòng)遍歷返回的可迭代對(duì)象,把元素依次放到列表里面
>>> rdd.mapPartitions(lambda x: [sum(x)]).collect()
[3, 7, 11]
mapPartitionsWithIndex
mapPartitionsWithIndex:還是對(duì)每一個(gè)分區(qū)進(jìn)行map,但是會(huì)多出一個(gè)索引
>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.mapPartitionsWithIndex(lambda index, x: (index, sum(x))).collect()
[0, 3, 1, 7, 2, 11]
列表中的0 1 2表示分區(qū)索引。
intersection
intersection:union是將兩個(gè)RDD合并,其實(shí)是取兩者的并集,intersection則是取交集,subtract則是取差集。
>>> rdd1 = sc.parallelize([1, 2, 3])
>>> rdd2 = sc.parallelize([1, 22, 3])
>>> rdd1.intersection(rdd2).collect()
[1, 3]
>>> rdd1.subtract(rdd2).collect()
[2]
sortBy
sortBy:我們之前說過sortByKey會(huì)默認(rèn)按照key來排序,sortBy需要我們自己指定,可以按照key也可以按照value
>>> rdd = sc.parallelize([('a', 1), ('c', 2), ('b', 3)])
>>> rdd.sortBy(lambda x: x[0]).collect()
[('a', 1), ('b', 3), ('c', 2)]
>>> rdd.sortBy(lambda x: x[1]).collect()
[('a', 1), ('c', 2), ('b', 3)]
>>>
>>> rdd.sortBy(lambda x: x[0], False).collect()
[('c', 2), ('b', 3), ('a', 1)]
>>> rdd.sortBy(lambda x: x[1], False).collect()
[('b', 3), ('c', 2), ('a', 1)]
>>>
coalesce
coalesce:改變RDD的分區(qū)數(shù)。分區(qū)數(shù)會(huì)影響作業(yè)的并行度,因此會(huì)視作業(yè)的具體情況而定。這個(gè)方法第一個(gè)參數(shù)接收要改變的分區(qū)個(gè)數(shù),第二個(gè)參數(shù)是shuffle,默認(rèn)為False,表示重新分區(qū)的時(shí)候不進(jìn)行shuffle操作,此時(shí)效率較高;如果指定為True,表示重分區(qū)的時(shí)候進(jìn)行shuffle操作,此時(shí)效果等價(jià)于下面要介紹的repartition,效率較低。關(guān)于什么是shuffle操作,我們后面會(huì)說。
>>> rdd = sc.parallelize(range(10), 5)
>>> # 使用該函數(shù)可以查看分區(qū)數(shù)
>>> rdd.getNumPartitions()
5
>>> # 改變分區(qū)數(shù),變成3
>>> rdd1 = rdd.coalesce(3)
>>> rdd1.getNumPartitions()
3
>>> # 分區(qū)數(shù)只能變少,不能變多
>>> rdd2 = rdd1.coalesce(4)
>>> rdd2.getNumPartitions()
3
>>>
repartition
repartition:該方法也是對(duì)RDD進(jìn)行重新分區(qū),其內(nèi)部使用shuffle算法,并且分區(qū)可以變多、也可以變少,如果是減少分區(qū)數(shù),那么推薦使用coalesce。
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd1 = rdd.repartition(4)
>>> rdd1.getNumPartitions()
4
>>> rdd1.repartition(2).getNumPartitions()
2
>>>
flatMapValues
flatMapValues:和groupByKey相反,我們看個(gè)栗子就清楚了。
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("a", 3), ("b", 2)])
>>> rdd1 = rdd.groupByKey().map(lambda x: (x[0], list(x[1])))
>>> rdd1.collect()
[('b', [1, 2]), ('a', [1, 2, 3])]
>>> # 所以它個(gè)groupByKey是相反的,這里面一般寫lambda x: x
>>> rdd1.flatMapValues(lambda x: x).collect()
[('b', 1), ('b', 2), ('a', 1), ('a', 2), ('a', 3)]
groupBy
groupBy:之前的groupByKey默認(rèn)是按照相同的key進(jìn)行聚合,這里則可以單獨(dú)指定,并且里面序列里面的元素可以不再是元組,普通的整型也是可以的。
>>> rdd = sc.parallelize([12, "a", "ab", "1", 23, "xx"])
>>> # 將里面的元素變成str之后,長(zhǎng)度大于1的分為一組,小于等于1的分為一組
>>> rdd.groupBy(lambda x: len(str(x))>1).collect()
[(False, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f5c0>), (True, <pyspark.resultiterable.ResultIterable object at 0x7f4c4f40f048>)]
>>>
>>> rdd.groupBy(lambda x: len(str(x))>1).map(lambda x: (x[0], list(x[1]))).collect()
[(False, ['a', '1']), (True, [12, 'ab', 23, 'xx'])]
keyBy
keyBy:看例子就能理解,其實(shí)很多方法我們完全可以用已經(jīng)存在的來替代。
>>> rdd = sc.parallelize([1, 2, 3])
>>> rdd.keyBy(lambda x: f"hello_{x}").collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)]
>>>
>>> rdd.map(lambda x: (f"hello_{x}", x)).collect()
[('hello_1', 1), ('hello_2', 2), ('hello_3', 3)]
可以看到keyBy就是將函數(shù)返回的元素和原來的元素組合成一個(gè)二元tuple,這個(gè)我們完全可以使用map來替代,或許keyBy簡(jiǎn)單了那么一點(diǎn)點(diǎn),但是說實(shí)話我個(gè)人還是習(xí)慣用map。其實(shí)一些api如果沒有什么不可替代性、或者無法在很大程度上簡(jiǎn)化工作量的話,我覺得記太多反而是個(gè)負(fù)擔(dān)。
keys和values
keys:獲取所有的key。values:獲取所有的value。我們這里的key和value都指的是二元tuple里面的兩個(gè)元素。其實(shí)RDD對(duì)應(yīng)的數(shù)據(jù)類型無非兩種,一種是對(duì)應(yīng)的列表里面都是整型或者字符串的RDD,另一種是里面都是二元tuple(或者list)的RDD,我們基本上使用這兩種RDD。我們上面出現(xiàn)的所有的key指的都是二元tuple里面的第一個(gè)元素,把這個(gè)tuple的兩個(gè)元素想象成字典的key和value即可。
>>> rdd = sc.parallelize([("a", 1), ("b", "a"), ("c", "c")])
>>> rdd.keys().collect()
['a', 'b', 'c']
>>> rdd.values().collect()
[1, 'a', 'c']
glom
glom:將每一個(gè)分區(qū)變成一個(gè)單獨(dú)的列表
>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6]]
>>>
pipe
pipe:將RDD里面的每一個(gè)元素都執(zhí)行相同的linux命令
>>> rdd = sc.parallelize(["hello", "hello1", "hello2"], 3)
>>> rdd.pipe("cat").collect()
['hello', 'hello1', 'hello2']
>>> # 1 1 6表示1行、1個(gè)單詞、6個(gè)字符
>>> rdd.pipe("wc").collect()
[' 1 1 6', ' 1 1 7', ' 1 1 7']
>>>
randomSplit
randomSplit:將RDD里面的元素隨機(jī)分隔
>>> rdd = sc.parallelize(range(10))
>>> rdd1 = rdd.randomSplit([1, 4])
>>> rdd1
[PythonRDD[203] at RDD at PythonRDD.scala:53, PythonRDD[204] at RDD at PythonRDD.scala:53]
>>> [_.collect() for _ in rdd1]
[[5, 7, 9], [0, 1, 2, 3, 4, 6, 8]]
>>>
sample
sample:隨機(jī)取樣
>>> rdd = sc.parallelize(range(10))
>>> # 參數(shù)一:是否有放回。參數(shù)二:抽樣比例。參數(shù)三:隨機(jī)種子
>>> rdd.sample(True, 0.2, 123).collect()
[0, 9]
foldByKey
foldByKey:針對(duì)于key: value形式的RDD,進(jìn)行聚合
>>> rdd = sc.parallelize([("a", (1, 2, 3, 4)), ("b", (11, 22, 33, 44))])
>>> rdd1 = rdd.flatMapValues(lambda x: x)
>>> rdd1.collect()
[('a', 1), ('a', 2), ('a', 3), ('a', 4), ('b', 11), ('b', 22), ('b', 33), ('b', 44)]
>>> # 參數(shù)一:起始值,參數(shù)二:操作函數(shù)
>>> rdd1.foldByKey(0, lambda x, y: x + y).collect()
[('b', 110), ('a', 10)]
>>> # 起始值指定20,那么會(huì)把20也當(dāng)成一個(gè)元素、也就是初始元素,扔到函數(shù)里面去
>>> rdd1.foldByKey(20, lambda x, y: x + y).collect()
[('b', 130), ('a', 30)]
>>> # 我們看到0確實(shí)在里面
>>> rdd1.foldByKey(0, lambda x, y: f"{x}->{y}").collect()
[('b', '0->11->22->33->44'), ('a', '0->1->2->3->4')]
>>>
以上就是一些transformation算子,有一些算子比較簡(jiǎn)單我就沒介紹,比如mapValues之類的,我們完全可以使用map來替代,也很簡(jiǎn)單,沒必要記這么多。如果有一些沒有介紹到的,可以自己通過pycharm查看RDD這個(gè)類源碼,看看它都支持哪些方法。源碼是很詳細(xì)的,都有大量的注釋。
那么下面我們來看一下action方法,action方法估計(jì)我們最一開始就見過了,沒錯(cuò)就是collect,把RDD里面的內(nèi)容以列表的形式返回,那么除了collect還有哪些action算子呢?我們來看一下。
reduce
reduce:這個(gè)應(yīng)該也早就見過了,將里面的內(nèi)容相加。
>>> rdd = sc.parallelize([1, 2, 3, 4])
>>> rdd.reduce(lambda x, y: x + y)
10
count
count:計(jì)算元素的個(gè)數(shù)。
>>> rdd = sc.parallelize([1, 2, 3, [4, 5]])
>>> rdd.count()
4
take、first
take、first:獲取指定個(gè)數(shù)的元素、獲取第一個(gè)元素。
>>> rdd = sc.parallelize([1, 2, [3, 4, 5], 6, 7, 8])
>>> # 如果指定的個(gè)數(shù)超過了元素的總個(gè)數(shù)也不會(huì)報(bào)錯(cuò),而是返回所有元素,即便RDD為空也可以。
>>> rdd.take(3)
[1, 2, [3, 4, 5]]
>>> # 注意:對(duì)于first來說,空的rdd調(diào)用的話會(huì)報(bào)錯(cuò)
>>> rdd.first()
1
max、min、mean、sum
max、min、mean、sum:獲取元素最大值、最小值、平均值、總和。
>>> rdd = sc.parallelize([11, 22, 33, 22])
>>> rdd.max()
33
>>> rdd.min()
11
>>> rdd.mean()
22.0
>>> rdd.sum()
88
當(dāng)然還有其它的數(shù)學(xué)函數(shù),比如:stdev,求標(biāo)準(zhǔn)差、variance,求方差等等。遇到相應(yīng)的需求,可以去查找。并且對(duì)于上面的數(shù)學(xué)操作,還分別對(duì)應(yīng)另一個(gè)函數(shù),比如:count -> countApprox,sum -> sumApprox等等,這些函數(shù)的特點(diǎn)是可以傳入一個(gè)timeout,單位為毫秒,要是在指定的時(shí)間內(nèi)沒有計(jì)算完畢的話,那么就直接返回當(dāng)前的計(jì)算結(jié)果。可以自己嘗試一下。
foreach
foreach:類似于map,對(duì)序列里面的每一個(gè)元素都執(zhí)行相同的操作。
>>> rdd = sc.parallelize([11, 22, 33, 22])
>>> # 但是foreach不會(huì)有任何的反應(yīng),不會(huì)跟map一樣返回新的RDD
>>> rdd.foreach(lambda x: x + 1)
>>> # 我們可以執(zhí)行打印操作
>>> rdd.foreach(lambda x: print(x, x+123))
11 134
22 145
33 156
22 145
>>>
foreachPartition
foreachPartition:會(huì)對(duì)每一個(gè)分區(qū)都執(zhí)行相同的操作。
>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> rdd.foreachPartition(lambda x: print(x))
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>
<itertools.chain object at 0x7f9c90ca0978>
>>> rdd.foreachPartition(lambda x: print(list(x)))
[1, 2]
[3, 4]
[5, 6]
>>>
aggregate
aggregate:這個(gè)稍微有點(diǎn)復(fù)雜,里面接收三個(gè)參數(shù)。
參數(shù)一:起始值,這個(gè)起始值是作用在每個(gè)分區(qū)上的
參數(shù)二:每個(gè)分區(qū)進(jìn)行的操作
參數(shù)三:每個(gè)分區(qū)操作完之后的這些返回的結(jié)果進(jìn)行的操作
>>> rdd = sc.parallelize([1, 2, 3, 1, 2, 3], 3)
>>> # 指定了三個(gè)分區(qū),那么結(jié)果每個(gè)分區(qū)對(duì)應(yīng)的值應(yīng)該是這樣: [1, 2] [3, 1] [2, 3]
>>> # 每個(gè)分區(qū)按照第二個(gè)參數(shù)指定的操作進(jìn)行計(jì)算,別忘記初始值,這個(gè)是作用在每個(gè)分區(qū)上面的
>>> # 結(jié)果就是:2 * 1 * 2, 2 * 3 * 1, 2 * 2 * 3 --> 4, 6, 12
>>> # 然后每個(gè)分區(qū)返回的結(jié)果執(zhí)行第三個(gè)參數(shù)指定的操作,加在一起,所以是24
>>> rdd.aggregate(2, lambda x, y:x*y, lambda x, y: x+y)
24
aggregateByKey
aggregateByKey:這個(gè)是一個(gè)transformation方法,不是action,之所以放進(jìn)來是為了和aggregate進(jìn)行對(duì)比便于理解。這個(gè)是把相同的key分成一組,說不好說,直接看例子吧
>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("c", 1), ("c", 2), ("c", 3)], 3)
>>> # 相同的分為一組,但是注意分區(qū),倒數(shù)第三個(gè)("c", 1)是和("b", 3)在一個(gè)分區(qū)里面的
>>> # [("a", [1, 2])] [("b", [3]), ("c", [1])] [("c", [2, 3])]
>>> # 初始元素和里面元素依次相乘--> [("a", 4)] [("b", 6), ("c", 2)] [("c", 12)]
>>> # 然后對(duì)分區(qū)里面相同key再次進(jìn)行參數(shù)三指定的操作--> [("a", 4)] [("b", 6)] [("c", 14)]
>>> # 上面的每一個(gè)列表看成是一個(gè)分區(qū)即可,為了清晰展示,我把每一個(gè)分區(qū)單獨(dú)寫成了一個(gè)列表
>>> rdd.aggregateByKey(2, lambda x,y:x*y, lambda x,y:x+y).collect()
[('b', 6), ('a', 4), ('c', 14)]
另外,對(duì)于很多的transformation操作,我們都是可以通過參數(shù):numPartitions指定生成的新的RDD的分區(qū)的,不過一般情況下我們不指定這個(gè)參數(shù),會(huì)和初始的RDD的分區(qū)數(shù)保持一致。當(dāng)然如果初始的RDD的分區(qū)數(shù)設(shè)置的不合理,那么是可以在transformation操作的時(shí)候進(jìn)行更改的。
fold
fold:類似于aggregateByKey,但它是action方法,而且調(diào)用的不是key、value形式的RDD、并且只需要指定一個(gè)函數(shù),會(huì)對(duì)每個(gè)分區(qū)、以及每個(gè)分區(qū)返回的結(jié)果都執(zhí)行相同的操作
>>> rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)
>>> # [1, 2] [3, 4] [5, 6] -> 2 * 1 * 2, 2 * 3 * 4, 2 * 5 * 6
>>> # 4 * 24 * 60 * 2 = 11520,并且每一個(gè)分區(qū)計(jì)算之后的結(jié)果還要乘上指定的初始值,這一點(diǎn)需要注意
>>> rdd.fold(2, lambda x,y: x*y)
11520
>>>
collectAsMap
collectAsMap:對(duì)于內(nèi)部是二元tuple的RDD,我們可以轉(zhuǎn)化為字典。
>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", 3), ("c", 1), ("c", 2), ("c", 3)], 3)
>>> # key相同的,value就會(huì)被替換掉
>>> rdd.collectAsMap()
{'a': 2, 'b': 3, 'c': 3}
>>>
id
id:返回RDD的id值,每個(gè)RDD的id值是唯一的
>>> rdd1 = sc.parallelize([])
>>> rdd2 = sc.parallelize([])
>>> rdd3 = sc.parallelize([])
>>>
>>> rdd1.id(), rdd2.id(), rdd3.id()
(326, 327, 328)
>>>
histogram
histogram:返回一個(gè)直方圖數(shù)據(jù),看栗子
>>> rdd = sc.parallelize(range(10))
>>> # 返回0-5以及5-8中間的元素個(gè)數(shù),當(dāng)然會(huì)連同區(qū)間一起返回。注意區(qū)間是左閉右開的
>>> rdd.histogram([0, 5, 8])
([0, 5, 8], [5, 4])
>>> # 如果不指定列表,而是指定整型的話
>>> # 會(huì)自動(dòng)為我們將[min, max]等分4個(gè)區(qū)間,那么第一個(gè)列表就有5個(gè)元素
>>> rdd = sc.parallelize([0, 11, 33, 22, 44, 55, 66, 33, 100])
>>> rdd.histogram(4)
([0, 25, 50, 75, 100], [3, 3, 2, 1])
>>>
isEmpty
isEmpty:檢測(cè)一個(gè)RDD是否為空
>>> rdd1 = sc.parallelize([])
>>> rdd2 = sc.parallelize([1])
>>>
>>> rdd1.isEmpty(), rdd2.isEmpty()
(True, False)
lookup
lookup:查找指定key對(duì)應(yīng)的value,那么顯然操作的RDD要是key: value形式的
>>> rdd = sc.parallelize([("a", 1), ("a", 2), ("b", "a")])
>>> rdd.lookup("a")
[1, 2]
>>> rdd.lookup("b")
['a']
>>>
總結(jié)
以上就是RDD的一些操作,當(dāng)然我們這里沒有全部介紹完,但是也介紹挺多了,如果工作中不夠用的話,那么只能看源碼了。當(dāng)然這么多一次性肯定是無法全部背下來的,需要用的時(shí)候再去查即可,當(dāng)然還是要多動(dòng)手敲,孰能生巧。
總結(jié)
以上是生活随笔為你收集整理的(1)spark核心RDD的概念解析、创建、以及相关操作的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql数据库备份到oss_备份MyS
- 下一篇: 三六零:周鸿祎与胡欢已协商离婚,拟将 6