[Spark]PySpark入门学习教程---RDD介绍(2)
?一 RDD
pyspark.RDD? ? ? ? SparkRDD
RDD指的是彈性分布式數據集(Resilient Distributed Dataset),它是spark計算的核心。盡管現在都使用 DataFrame、Dataset 進行編程,但是它們的底層依舊是依賴于RDD的。我們來解釋一下 RDD 的這幾個單詞含義。
- 彈性:在計算上具有容錯性,spark是一個計算框架,如果某一個節點掛了,可以自動進行計算之間血緣關系的跟蹤
- 分布式:很好理解,hdfs上數據是跨節點的,那么spark的計算也是要跨節點的
- 數據集:可以將數組、文件等一系列數據的集合轉換為RDD
RDD 是 Spark 的一個最基本的抽象 (如果你看一下源碼的話,你會發現RDD在底層是一個抽象類,抽象類顯然不能直接使用,必須要繼承它然后實現它內部的一些方法后才可以使用),它代表了不可變的、元素的分區(partition)集合,這些分區可以被并行操作。假設我們有一個包含 300 萬個元素的數組,那么我們就可以將這個數組分成 3 份,每一份對應一個分區,每個分區都可以在不同的機器上進行運算,這樣就能提高運算效率。
RDD 支持很多操作,比如:map、filter 等等,我們后面會慢慢介紹。當然,RDD在 Spark 的源碼是一個類,但是我們后面有時候會把 RDD 和 RDD實例對象 都叫做 RDD,沒有刻意區分,心里面清楚就可以啦。
大致上可分三大類算子:
1、Value數據類型的Transformation算子,這種變換不觸發提交作業,針對處理的數據項是Value型的數據。
2、Key-Value數據類型的Transformation算子,這種變換不觸發提交作業,針對處理的數據項是Key-Value型的數據。
3、Action算子,這類算子會觸發SparkContext提交作業。
二 創建RDD
創建RDD主要有兩種方式
一個是textFile加載本地或者集群文件系統中的數據,
第二個是用parallelize方法將Driver中的數據結構并行化成RDD。
#從本地文件系統中加載數據 file = "/home/data/hello.txt" #從集群文件系統中加載數據 #file = "hdfs://localhost:9000/user/hadoop/data.txt" #也可以省去hdfs://localhost:9000 rdd = sc.textFile(file,3) rdd.collect() ['hello world','hello spark','spark love jupyter','spark love pandas','spark love sql'] #parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數 rdd = sc.parallelize(range(1,11),2) rdd.collect() [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]三 常用Transformation操作
Transformation轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。
?
3.1 Value型Transformation算子?
import pyspark conf = pyspark.SparkConf().setMaster("local[4]").setAppName("PySpark_Transformation1") sc = pyspark.SparkContext(conf=conf)[1] map
# 以下的操作由于是Transform操作,因為我們需要在最后加上一個collect算子用來觸發計算。 # 1. map: 和python差不多,map轉換就是對每一個元素進行一個映射 rdd = sc.parallelize(range(1, 11), 4) rdd_map = rdd.map(lambda x: x*2) print("原始數據:", rdd.collect()) print("擴大2倍:", rdd_map.collect()) # 原始數據: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 擴大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20][2] flatMap?
# 2. flatMap: 這個相比于map多一個flat(壓平)操作,顧名思義就是要把高維的數組變成一維 rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"]) print("原始數據:", rdd2.collect()) print("直接split之后的map結果:", rdd2.map(lambda x: x.split(" ")).collect()) print("直接split之后的flatMap結果:", rdd2.flatMap(lambda x: x.split(" ")).collect()) # 直接split之后的map結果: [['hello', 'SamShare'], ['hello', 'PySpark']] # 直接split之后的flatMap結果: ['hello', 'SamShare', 'hello', 'PySpark'][3] mapPartitions?
# mapPartitions: 根據分區內的數據進行映射操作 rdd = sc.parallelize([1, 2, 3, 4], 2) def f(iterator):yield sum(iterator) print(rdd.collect()) print(rdd.mapPartitions(f).collect()) # [1, 2, 3, 4] # [3, 7][4] union?
# 9. union: 合并兩個RDD rdd = sc.parallelize([1, 1, 2, 3]) print(rdd.union(rdd).collect()) # [1, 1, 2, 3, 1, 1, 2, 3][5] cartesian??
# cartesian: 生成笛卡爾積 rdd = sc.parallelize([1, 2]) print(sorted(rdd.cartesian(rdd).collect())) # [(1, 1), (1, 2), (2, 1), (2, 2)]?[6] groupByKey??
# groupByKey: 按照key來聚合數據 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(rdd.collect()) print(sorted(rdd.groupByKey().mapValues(len).collect())) print(sorted(rdd.groupByKey().mapValues(list).collect())) # [('a', 1), ('b', 1), ('a', 1)] # [('a', 2), ('b', 1)] # [('a', [1, 1]), ('b', [1])][7] filter??
# 3. filter: 過濾數據 rdd = sc.parallelize(range(1, 11), 4) print("原始數據:", rdd.collect()) print("過濾奇數:", rdd.filter(lambda x: x % 2 == 0).collect()) # 原始數據: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # 過濾奇數: [2, 4, 6, 8, 10][8] distinct??
# distinct: 去重元素 rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32]) print("原始數據:", rdd.collect()) print("去重數據:", rdd.distinct().collect()) # 原始數據: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32] # 去重數據: [4, 8, 16, 32, 2]??[9] distinct??
# subtract: 數據集相減, Return each value in self that is not contained in other. x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) y = sc.parallelize([("a", 3), ("c", None)]) print(sorted(x.subtract(y).collect())) # [('a', 1), ('b', 4), ('b', 5)][10] sample
rdd = sc.parallelize(range(1,11),2) # 這里的 2 指的是分區數量 print("原始數據:", rdd.collect()) print("Sample數據:", rdd.sample(False, 0.5, 9).collect()) # 原始數據: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # Sample數據: [3, 4, 6, 7, 8, 9]?[11] takeSample
rdd = sc.parallelize(range(1,15),2) # 這里的 2 指的是分區數量 print("原始數據:", rdd.collect()) print("taksSample數據:", rdd.takeSample(True, 4, 9)) # 原始數據: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] # takeSample數據: [3, 4, 6, 7, 8, 9]?
[12] cache、persist?
?3.2?Key-Value型Transformation算子?
[1]?mapValues
rdd1 = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])]) print("mapValues:", rdd.mapValues(len).collect()) #mapValues ?len: [('a', 3), ('b', 1)] rdd2 = sc.parallelize([("a", [1,2]), ("b", [3,4,5])]) print("mapValues:", rdd2.mapValues(len).collect()) #mapValues ?sum: [('a', 3), ('b', 1)][2] reduceByKey
# reduceByKey: 根據key來映射數據 from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print("原始數據:", rdd.collect()) print("原始數據:", rdd.reduceByKey(add).collect()) # 原始數據: [('a', 1), ('b', 1), ('a', 1)] # 原始數據: [('b', 1), ('a', 2)]?[3] join |?leftOuterJoin |?ightOuterJoin
# 16. join: x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2), ("a", 3)]) print(sorted(x.join(y).collect())) # [('a', (1, 2)), ('a', (1, 3))]# 17. leftOuterJoin/rightOuterJoin x = sc.parallelize([("a", 1), ("b", 4)]) y = sc.parallelize([("a", 2)]) print(sorted(x.leftOuterJoin(y).collect())) # [('a', (1, 2)), ('b', (4, None))]?其他
# 7. sortBy: 根據規則進行排序 tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()) print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()) # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] # [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]# subtractByKey 去除x中那些key也在y中的元素 x = sc.parallelize([("a",1),("b",2),("c",3)]) y = sc.parallelize([("a",2),("b",(1,2))]) x.subtractByKey(y).collect() #[('c', 3)]# 10. intersection: 取兩個RDD的交集,同時有去重的功效 rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3]) rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) print(rdd1.intersection(rdd2).collect()) # [1, 2, 3]# 12. zip: 拉鏈合并,需要兩個RDD具有相同的長度以及分區數量 x = sc.parallelize(range(0, 5)) y = sc.parallelize(range(1000, 1005)) print(x.collect()) print(y.collect()) print(x.zip(y).collect()) # [0, 1, 2, 3, 4] # [1000, 1001, 1002, 1003, 1004] # [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]# 13. zipWithIndex: 將RDD和一個從0開始的遞增序列按照拉鏈方式連接。 rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"]) rdd_index = rdd_name.zipWithIndex() print(rdd_index.collect()) # [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]# 15. sortByKey: tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] print(sc.parallelize(tmp).sortByKey(True, 1).collect()) # [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]四? 常用Action操作
Action操作將觸發基于RDD依賴關系的計算。
import os import pyspark from pyspark import SparkContext, SparkConfconf = SparkConf().setAppName("test_SamShare").setMaster("local[4]") sc = SparkContext(conf=conf)# 使用 parallelize方法直接實例化一個RDD rdd = sc.parallelize(range(1,11),4) # 這里的 4 指的是分區數量 rdd.take(100) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]----------------------------------------------Action算子解析 ---------------------------------------------- # 1. collect: 指的是把數據都匯集到driver端,便于后續的操作 rdd = sc.parallelize(range(0, 5)) rdd_collect = rdd.collect() print(rdd_collect) # [0, 1, 2, 3, 4]# 2. first: 取第一個元素 sc.parallelize([2, 3, 4]).first() # 2# 3. collectAsMap: 轉換為dict,使用這個要注意了,不要對大數據用,不然全部載入到driver端會爆內存 m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() m # {1: 2, 3: 4}# 4. reduce: 逐步對兩個元素進行操作 rdd = sc.parallelize(range(10),5) print(rdd.reduce(lambda x,y:x+y)) # 45# 5. countByKey/countByValue: rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(sorted(rdd.countByKey().items())) print(sorted(rdd.countByValue().items())) # [('a', 2), ('b', 1)] # [(('a', 1), 2), (('b', 1), 1)]# 6. take: 相當于取幾個數據到driver端 rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) print(rdd.take(5)) # [('a', 1), ('b', 1), ('a', 1)]# 7. saveAsTextFile: 保存rdd成text文件到本地 text_file = "./data/rdd.txt" rdd = sc.parallelize(range(5)) rdd.saveAsTextFile(text_file)# 8. takeSample: 隨機取數 rdd = sc.textFile("./test/data/hello_samshare.txt", 4) # 這里的 4 指的是分區數量 rdd_sample = rdd.takeSample(True, 2, 0) # withReplacement 參數1:代表是否是有放回抽樣 rdd_sample# 9. foreach: 對每一個元素執行某種操作,不生成新的RDD rdd = sc.parallelize(range(10), 5) accum = sc.accumulator(0) rdd.foreach(lambda x: accum.add(x)) print(accum.value)全面解析Spark,以及和Python的對接 - 古明地盆 - 博客園 (cnblogs.com)https://www.cnblogs.com/traditional/p/11724876.html
總結
以上是生活随笔為你收集整理的[Spark]PySpark入门学习教程---RDD介绍(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 钉钉账号多平台同时登录怎么设置
- 下一篇: 主创爆料《阿凡达3》:剧情和阵容更精彩