spark算子大全glom_2小时入门Spark之RDD编程
公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:pyspark,獲取本項(xiàng)目github地址。
本節(jié)將介紹RDD數(shù)據(jù)結(jié)構(gòu)的常用函數(shù)。包括如下內(nèi)容:
創(chuàng)建RDD
常用Action操作
常用Transformation操作
常用PairRDD的轉(zhuǎn)換操作
緩存操作
共享變量
分區(qū)操作
這些函數(shù)中,我最常用的是如下15個(gè)函數(shù),需要認(rèn)真掌握其用法。
map
flatMap
mapPartitions
filter
count
reduce
take
saveAsTextFile
collect
join
union
persist
repartition
reduceByKey
aggregateByKey
import?findspark
#指定spark_home為剛才的解壓路徑,指定python路徑
spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path?=?"/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)
import?pyspark
from?pyspark?import?SparkContext,?SparkConf
conf?=?SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
sc?=?SparkContext(conf=conf)
print(pyspark.__version__)
3.0.1
一,創(chuàng)建RDD
創(chuàng)建RDD主要有兩種方式,一個(gè)是textFile加載本地或者集群文件系統(tǒng)中的數(shù)據(jù),
第二個(gè)是用parallelize方法將Driver中的數(shù)據(jù)結(jié)構(gòu)并行化成RDD。
#從本地文件系統(tǒng)中加載數(shù)據(jù)
file?=?"./data/hello.txt"
rdd?=?sc.textFile(file,3)
rdd.collect()
['hello?world',
'hello?spark',
'spark?love?jupyter',
'spark?love?pandas',
'spark?love?sql']
#從集群文件系統(tǒng)中加載數(shù)據(jù)
#file?=?"hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd?=?sc.textFile(file,3)
#parallelize將Driver中的數(shù)據(jù)結(jié)構(gòu)生成RDD,第二個(gè)參數(shù)指定分區(qū)數(shù)
rdd?=?sc.parallelize(range(1,11),2)
rdd.collect()
[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]
二,常用Action操作
Action操作將觸發(fā)基于RDD依賴關(guān)系的計(jì)算。
collect
rdd?=?sc.parallelize(range(10),5)
#collect操作將數(shù)據(jù)匯集到Driver,數(shù)據(jù)過(guò)大時(shí)有超內(nèi)存風(fēng)險(xiǎn)
all_data?=?rdd.collect()
all_data
[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
take
#take操作將前若干個(gè)數(shù)據(jù)匯集到Driver,相比collect安全
rdd?=?sc.parallelize(range(10),5)
part_data?=?rdd.take(4)
part_data
[0,?1,?2,?3]
takeSample
#takeSample可以隨機(jī)取若干個(gè)到Driver,第一個(gè)參數(shù)設(shè)置是否放回抽樣
rdd?=?sc.parallelize(range(10),5)
sample_data?=?rdd.takeSample(False,10,0)
sample_data
[7,?8,?1,?5,?3,?4,?2,?0,?9,?6]
first
#first取第一個(gè)數(shù)據(jù)
rdd?=?sc.parallelize(range(10),5)
first_data?=?rdd.first()
print(first_data)
0
count
#count查看RDD元素?cái)?shù)量
rdd?=?sc.parallelize(range(10),5)
data_count?=?rdd.count()
print(data_count)
10
reduce
#reduce利用二元函數(shù)對(duì)數(shù)據(jù)進(jìn)行規(guī)約
rdd?=?sc.parallelize(range(10),5)
rdd.reduce(lambda?x,y:x+y)
45
foreach
#foreach對(duì)每一個(gè)元素執(zhí)行某種操作,不生成新的RDD
#累加器用法詳見(jiàn)共享變量
rdd?=?sc.parallelize(range(10),5)
accum?=?sc.accumulator(0)
rdd.foreach(lambda?x:accum.add(x))
print(accum.value)
45
countByKey
#countByKey對(duì)Pair?RDD按key統(tǒng)計(jì)數(shù)量
pairRdd?=?sc.parallelize([(1,1),(1,4),(3,9),(2,16)])
pairRdd.countByKey()
defaultdict(int,?{1:?2,?3:?1,?2:?1})
saveAsTextFile
#saveAsTextFile保存rdd成text文件到本地
text_file?=?"./data/rdd.txt"
rdd?=?sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
#重新讀入會(huì)被解析文本
rdd_loaded?=?sc.textFile(file)
rdd_loaded.collect()
['2',?'3',?'4',?'1',?'0']
三,常用Transformation操作
Transformation轉(zhuǎn)換操作具有懶惰執(zhí)行的特性,它只指定新的RDD和其父RDD的依賴關(guān)系,只有當(dāng)Action操作觸發(fā)到該依賴的時(shí)候,它才被計(jì)算。
map
#map操作對(duì)每個(gè)元素進(jìn)行一個(gè)映射轉(zhuǎn)換
rdd?=?sc.parallelize(range(10),3)
rdd.collect()
[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
rdd.map(lambda?x:x**2).collect()
[0,?1,?4,?9,?16,?25,?36,?49,?64,?81]
filter
#filter應(yīng)用過(guò)濾條件過(guò)濾掉一些數(shù)據(jù)
rdd?=?sc.parallelize(range(10),3)
rdd.filter(lambda?x:x>5).collect()
[6,?7,?8,?9]
flatMap
#flatMap操作執(zhí)行將每個(gè)元素生成一個(gè)Array后壓平
rdd?=?sc.parallelize(["hello?world","hello?China"])
rdd.map(lambda?x:x.split("?")).collect()
[['hello',?'world'],?['hello',?'China']]
rdd.flatMap(lambda?x:x.split("?")).collect()
['hello',?'world',?'hello',?'China']
sample
#sample對(duì)原rdd在每個(gè)分區(qū)按照比例進(jìn)行抽樣,第一個(gè)參數(shù)設(shè)置是否可以重復(fù)抽樣
rdd?=?sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()
[1,?4,?9]
distinct
#distinct去重
rdd?=?sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
[4,?1,?5,?2,?3]
subtract
#subtract找到屬于前一個(gè)rdd而不屬于后一個(gè)rdd的元素
a?=?sc.parallelize(range(10))
b?=?sc.parallelize(range(5,15))
a.subtract(b).collect()
[0,?1,?2,?3,?4]
union
#union合并數(shù)據(jù)
a?=?sc.parallelize(range(5))
b?=?sc.parallelize(range(3,8))
a.union(b).collect()
[0,?1,?2,?3,?4,?3,?4,?5,?6,?7]
intersection
#intersection求交集
a?=?sc.parallelize(range(1,6))
b?=?sc.parallelize(range(3,9))
a.intersection(b).collect()
[3,?4,?5]
cartesian
#cartesian笛卡爾積
boys?=?sc.parallelize(["LiLei","Tom"])
girls?=?sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()
[('LiLei',?'HanMeiMei'),
('LiLei',?'Lily'),
('Tom',?'HanMeiMei'),
('Tom',?'Lily')]
sortBy
#按照某種方式進(jìn)行排序
#指定按照第3個(gè)元素大小進(jìn)行排序
rdd?=?sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda?x:x[2]).collect()
[(4,?1,?1),?(3,?2,?2),?(1,?2,?3)]
zip
#按照拉鏈方式連接兩個(gè)RDD,效果類似python的zip函數(shù)
#需要兩個(gè)RDD具有相同的分區(qū),每個(gè)分區(qū)元素?cái)?shù)量相同
rdd_name?=?sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age?=?sc.parallelize([19,18,20])
rdd_zip?=?rdd_name.zip(rdd_age)
print(rdd_zip.collect())
[('LiLei',?19),?('Hanmeimei',?18),?('Lily',?20)]
zipWithIndex
#將RDD和一個(gè)從0開(kāi)始的遞增序列按照拉鏈方式連接。
rdd_name?=??sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index?=?rdd_name.zipWithIndex()
print(rdd_index.collect())
[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]
四,常用PairRDD的轉(zhuǎn)換操作
PairRDD指的是數(shù)據(jù)為長(zhǎng)度為2的tuple類似(k,v)結(jié)構(gòu)的數(shù)據(jù)類型的RDD,其每個(gè)數(shù)據(jù)的第一個(gè)元素被當(dāng)做key,第二個(gè)元素被當(dāng)做value.
reduceByKey
#reduceByKey對(duì)相同的key對(duì)應(yīng)的values應(yīng)用二元?dú)w并操作
rdd?=?sc.parallelize([("hello",1),("world",2),
("hello",3),("world",5)])
rdd.reduceByKey(lambda?x,y:x+y).collect()
[('hello',?4),?('world',?7)]
groupByKey
#groupByKey將相同的key對(duì)應(yīng)的values收集成一個(gè)Iterator
rdd?=?sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
rdd.groupByKey().collect()
[('hello',?),
('world',?)]
sortByKey
#sortByKey按照key排序,可以指定是否降序
rdd?=?sc.parallelize([("hello",1),("world",2),
("China",3),("Beijing",5)])
rdd.sortByKey(False).collect()
[('world',?2),?('hello',?1),?('China',?3),?('Beijing',?5)]
join
#join相當(dāng)于根據(jù)key進(jìn)行內(nèi)連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
leftOuterJoin和rightOuterJoin
#leftOuterJoin相當(dāng)于關(guān)系表的左連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.leftOuterJoin(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
#rightOuterJoin相當(dāng)于關(guān)系表的右連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female")])
age.rightOuterJoin(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
cogroup
#cogroup相當(dāng)于對(duì)兩個(gè)輸入分別goupByKey然后再對(duì)結(jié)果進(jìn)行g(shù)roupByKey
x?=?sc.parallelize([("a",1),("b",2),("a",3)])
y?=?sc.parallelize([("a",2),("b",3),("b",5)])
result?=?x.cogroup(y).collect()
print(result)
print(list(result[0][1][0]))
[('a',?(,?)),?('b',?(,?))]
[1,?3]
subtractByKey
#subtractByKey去除x中那些key也在y中的元素
x?=?sc.parallelize([("a",1),("b",2),("c",3)])
y?=?sc.parallelize([("a",2),("b",(1,2))])
x.subtractByKey(y).collect()
[('c',?3)]
foldByKey
#foldByKey的操作和reduceByKey類似,但是要提供一個(gè)初始值
x?=?sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda?x,y:x*y).collect()
[('a',?3),?('b',?10)]
五,緩存操作
如果一個(gè)rdd被多個(gè)任務(wù)用作中間量,那么對(duì)其進(jìn)行cache緩存到內(nèi)存中對(duì)加快計(jì)算會(huì)非常有幫助。
聲明對(duì)一個(gè)rdd進(jìn)行cache后,該rdd不會(huì)被立即緩存,而是等到它第一次被計(jì)算出來(lái)時(shí)才進(jìn)行緩存。
可以使用persist明確指定存儲(chǔ)級(jí)別,常用的存儲(chǔ)級(jí)別是MEMORY_ONLY和EMORY_AND_DISK。
如果一個(gè)RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執(zhí)行的。
緩存數(shù)據(jù)不會(huì)切斷血緣依賴關(guān)系,這是因?yàn)榫彺鏀?shù)據(jù)某些分區(qū)所在的節(jié)點(diǎn)有可能會(huì)有故障,例如內(nèi)存溢出或者節(jié)點(diǎn)損壞。
這時(shí)候可以根據(jù)血緣關(guān)系重新計(jì)算這個(gè)分區(qū)的數(shù)據(jù)。
#cache緩存到內(nèi)存中,使用存儲(chǔ)級(jí)別 MEMORY_ONLY。
#MEMORY_ONLY意味著如果內(nèi)存存儲(chǔ)不下,放棄存儲(chǔ)其余部分,需要時(shí)重新計(jì)算。
a?=?sc.parallelize(range(10000),5)
a.cache()
sum_a?=?a.reduce(lambda?x,y:x+y)
cnt_a?=?a.count()
mean_a?=?sum_a/cnt_a
print(mean_a)
#persist緩存到內(nèi)存或磁盤(pán)中,默認(rèn)使用存儲(chǔ)級(jí)別MEMORY_AND_DISK
#MEMORY_AND_DISK意味著如果內(nèi)存存儲(chǔ)不下,其余部分存儲(chǔ)到磁盤(pán)中。
#persist可以指定其它存儲(chǔ)級(jí)別,cache相當(dāng)于persist(MEMORY_ONLY)
from??pyspark.storagelevel?import?StorageLevel
a?=?sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a?=?a.reduce(lambda?x,y:x+y)
cnt_a?=?a.count()
mean_a?=?sum_a/cnt_a
a.unpersist()?#立即釋放緩存
print(mean_a)
六,共享變量
當(dāng)spark集群在許多節(jié)點(diǎn)上運(yùn)行一個(gè)函數(shù)時(shí),默認(rèn)情況下會(huì)把這個(gè)函數(shù)涉及到的對(duì)象在每個(gè)節(jié)點(diǎn)生成一個(gè)副本。
但是,有時(shí)候需要在不同節(jié)點(diǎn)或者節(jié)點(diǎn)和Driver之間共享變量。
Spark提供兩種類型的共享變量,廣播變量和累加器。
廣播變量是不可變變量,實(shí)現(xiàn)在不同節(jié)點(diǎn)不同任務(wù)之間共享數(shù)據(jù)。
廣播變量在每個(gè)機(jī)器上緩存一個(gè)只讀的變量,而不是為每個(gè)task生成一個(gè)副本,可以減少數(shù)據(jù)的傳輸。
累加器主要是不同節(jié)點(diǎn)和Driver之間共享變量,只能實(shí)現(xiàn)計(jì)數(shù)或者累加功能。
累加器的值只有在Driver上是可讀的,在節(jié)點(diǎn)上不可見(jiàn)。
#廣播變量?broadcast?不可變,在所有節(jié)點(diǎn)可讀
broads?=?sc.broadcast(100)
rdd?=?sc.parallelize(range(10))
print(rdd.map(lambda?x:x+broads.value).collect())
print(broads.value)
[100,?101,?102,?103,?104,?105,?106,?107,?108,?109]
100
#累加器?只能在Driver上可讀,在其它節(jié)點(diǎn)只能進(jìn)行累加
total?=?sc.accumulator(0)
rdd?=?sc.parallelize(range(10),3)
rdd.foreach(lambda?x:total.add(x))
total.value
45
#?計(jì)算數(shù)據(jù)的平均值
rdd?=?sc.parallelize([1.1,2.1,3.1,4.1])
total?=?sc.accumulator(0.1)
count?=?sc.accumulator(0)
def?func(x):
total.add(x)
count.add(1)
rdd.foreach(func)
total.value/count.value
2.625
七,分區(qū)操作
分區(qū)操作包括改變分區(qū)操作,以及針對(duì)分區(qū)執(zhí)行的一些轉(zhuǎn)換操作。
glom:將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。
coalesce:shuffle可選,默認(rèn)為False情況下窄依賴,不能增加分區(qū)。repartition和partitionBy調(diào)用它實(shí)現(xiàn)。
repartition:按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在同一個(gè)分區(qū)
partitionBy:按key進(jìn)行shuffle,相同key放入同一個(gè)分區(qū)
HashPartitioner:默認(rèn)分區(qū)器,根據(jù)key的hash值進(jìn)行分區(qū),相同的key進(jìn)入同一分區(qū),效率較高,key不可為Array.
RangePartitioner:只在排序相關(guān)函數(shù)中使用,除相同的key進(jìn)入同一分區(qū),相鄰的key也會(huì)進(jìn)入同一分區(qū),key必須可排序。
TaskContext: ?獲取當(dāng)前分區(qū)id方法 TaskContext.get.partitionId
mapPartitions:每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要分批處理數(shù)據(jù)的情況,比如將數(shù)據(jù)插入某個(gè)表,每批數(shù)據(jù)只需要開(kāi)啟一次數(shù)據(jù)庫(kù)連接,大大減少了連接開(kāi)支
mapPartitionsWithIndex:類似mapPartitions,提供了分區(qū)索引,輸入?yún)?shù)為(i,Iterator)
foreachPartition:類似foreach,但每次提供一個(gè)Partition的一批數(shù)據(jù)
glom
#glom將一個(gè)分區(qū)內(nèi)的數(shù)據(jù)轉(zhuǎn)換為一個(gè)列表作為一行。
a?=?sc.parallelize(range(10),2)
b?=?a.glom()
b.collect()
[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]
coalesce
#coalesce?默認(rèn)shuffle為False,不能增加分區(qū),只能減少分區(qū)
#如果要增加分區(qū),要設(shè)置shuffle?=?true
#parallelize等許多操作可以指定分區(qū)數(shù)
a?=?sc.parallelize(range(10),3)
print(a.getNumPartitions())
print(a.glom().collect())
3
[[0,?1,?2],?[3,?4,?5],?[6,?7,?8,?9]]
b?=?a.coalesce(2)
print(b.glom().collect())
[[0,?1,?2],?[3,?4,?5,?6,?7,?8,?9]]
repartition
#repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū),可以增加分區(qū)
#repartition實(shí)際上調(diào)用coalesce實(shí)現(xiàn),設(shè)置了shuffle?=?True
a?=?sc.parallelize(range(10),3)
c?=?a.repartition(4)
print(c.glom().collect())
[[6,?7,?8,?9],?[3,?4,?5],?[],?[0,?1,?2]]
#repartition按隨機(jī)數(shù)進(jìn)行shuffle,相同key不一定在一個(gè)分區(qū)
a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c?=?a.repartition(2)
print(c.glom().collect())
[[('a',?1),?('a',?2),?('c',?3)],?[('a',?1)]]
partitionBy
#partitionBy按key進(jìn)行shuffle,相同key一定在一個(gè)分區(qū)
a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c?=?a.partitionBy(2)
print(c.glom().collect())
mapPartitions
#mapPartitions可以對(duì)每個(gè)分區(qū)分別執(zhí)行操作
#每次處理分區(qū)內(nèi)的一批數(shù)據(jù),適合需要按批處理數(shù)據(jù)的情況
#例如將數(shù)據(jù)寫(xiě)入數(shù)據(jù)庫(kù)時(shí),可以極大的減少連接次數(shù)。
#mapPartitions的輸入分區(qū)內(nèi)數(shù)據(jù)組成的Iterator,其輸出也需要是一個(gè)Iterator
#以下例子查看每個(gè)分區(qū)內(nèi)的數(shù)據(jù),相當(dāng)于用mapPartitions實(shí)現(xiàn)了glom的功能。
a?=?sc.parallelize(range(10),2)
a.mapPartitions(lambda?it:iter([list(it)])).collect()
[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]
mapPartitionsWithIndex
#mapPartitionsWithIndex可以獲取兩個(gè)參數(shù)
#即分區(qū)id和每個(gè)分區(qū)內(nèi)的數(shù)據(jù)組成的Iterator
a?=?sc.parallelize(range(11),2)
def?func(pid,it):
s?=?sum(it)
return(iter([str(pid)?+?"|"?+?str(s)]))
[str(pid)?+?"|"?+?str]
b?=?a.mapPartitionsWithIndex(func)
b.collect()
#利用TaskContext可以獲取當(dāng)前每個(gè)元素的分區(qū)
from?pyspark.taskcontext?import?TaskContext
a?=?sc.parallelize(range(5),3)
c?=?a.map(lambda?x:(TaskContext.get().partitionId(),x))
c.collect()
[(0,?0),?(1,?1),?(1,?2),?(2,?3),?(2,?4)]
foreachPartitions
#foreachPartition對(duì)每個(gè)分區(qū)分別執(zhí)行操作
#范例:求每個(gè)分區(qū)內(nèi)最大值的和
total?=?sc.accumulator(0.0)
a?=?sc.parallelize(range(1,101),3)
def?func(it):
total.add(max(it))
a.foreachPartition(func)
total.value
199.0
aggregate
#aggregate是一個(gè)Action操作
#aggregate比較復(fù)雜,先對(duì)每個(gè)分區(qū)執(zhí)行一個(gè)函數(shù),再對(duì)每個(gè)分區(qū)結(jié)果執(zhí)行一個(gè)合并函數(shù)。
#例子:求元素之和以及元素個(gè)數(shù)
#三個(gè)參數(shù),第一個(gè)參數(shù)為初始值,第二個(gè)為分區(qū)執(zhí)行函數(shù),第三個(gè)為結(jié)果合并執(zhí)行函數(shù)。
rdd?=?sc.parallelize(range(1,21),3)
def?inner_func(t,x):
return((t[0]+x,t[1]+1))
def?outer_func(p,q):
return((p[0]+q[0],p[1]+q[1]))
rdd.aggregate((0,0),inner_func,outer_func)
(210,?20)
aggregateByKey
#aggregateByKey的操作和aggregate類似,但是會(huì)對(duì)每個(gè)key分別進(jìn)行操作
#第一個(gè)參數(shù)為初始值,第二個(gè)參數(shù)為分區(qū)內(nèi)歸并函數(shù),第三個(gè)參數(shù)為分區(qū)間歸并函數(shù)
a?=?sc.parallelize([("a",1),("b",1),("c",2),
("a",2),("b",3)],3)
b?=?a.aggregateByKey(0,lambda?x,y:max(x,y),
lambda?x,y:max(x,y))
b.collect()
[('b',?3),?('a',?2),?('c',?2)]
總結(jié)
以上是生活随笔為你收集整理的spark算子大全glom_2小时入门Spark之RDD编程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 军官领花和士兵领花区别是什么?
- 下一篇: 乌钢镗刀杆可以焊接吗?