弹性式分布数据集RDD——Pyspark基础 (二)
連載中:http://ihoge.cn/tags/pyspark/
title: 彈性式分布數據集RDD——Pyspark基礎 (二)
date: 2018-04-15 17:59:21
comments: true
categories:
- Spark
tags:
- pyspark
RDD的內部運行方式
RDD不僅是一組不可變的JVM(Java虛擬機)對象的分布集,而且是Spark的核心,可以讓任務執行高速運算。
RDD將跟蹤(計入日記)應用于每個快的所有轉換,以加速計算速度,并在發生錯誤和部分數據丟失時提供回退(容錯機制)。
RDD采用并行的運行方式,也就是每個轉換操作并行執行,從而提高速度。
RDD有兩種并行操作:
- 轉換操作(返回指向新的RDD的指針)
- 動作操作(在運行計算后向驅動程序返回值)
數據集的轉換通常是惰性的,這也意味著任何轉換操作僅在調用數據集上的操作時才執行。該延遲執行會產生風多的精細查詢:針對性能進行優化查詢。這種優化始于Spark的DAGScheduler——面向階段的調度器。DAGScheduler負責Stage級的調度詳見:Spark運行原理剖析
由于具有單獨的RDD轉換和動作,DAGScheduler可以在查詢中執行優化。包括但不限于避免shuffle數據(最耗費資源的任務)
創建RDD
方式一: 用.parallelize(...)集合(元素list或array)
data = sc.parallelize([('a',1),('b',2),('c',3),('d',5),('e',5)])方式二: 讀入外部文件
- 支持多文件系統中讀取:如NTFS、FAT、HFS+(Mac OS Extended),或者如HDFS、S3、Cassandra這類的分布式文件系統,還有其他類文件系統。
- 指出多種數據格式:如文本、parquet、JSON、Hive tables(Hive表)以及使用JDBC驅動程序可讀取的關系數據庫中的數據。(注意:Spark可以自動處理壓縮數據集)
��Tip1:讀取的方式不同,持有對象表達方式也不同。從文件中讀取的數據表示為MapPartitionsRDD;使用集合方法的數據表示為ParallelCollectionRDD
��**Tip2:**RDD是無schema的數據結構(和DataFrame不同),所以我們幾乎可以混用任何數據結構:tuple、dict、list和spark等都能支持。如果對數據集使用.collect()方法,將把RDD對所有元素返回給驅動程序,驅動程序將其序列化成了一個列表。
data_from_file = sc.textFile("hdfs://master:9000/pydata/VS14MORT.txt.gz",4) # 這里表示4個分區 def extractInformation(row):import reimport numpy as npselected_indices = [2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,19,21,22,23,24,25,27,28,29,30,32,33,34,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,58,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,81,82,83,84,85,87,89]'''Input record schemaschema: n-m (o) -- xxxn - position fromm - position too - number of charactersxxx - description1. 1-19 (19) -- reserved positions2. 20 (1) -- resident status3. 21-60 (40) -- reserved positions4. 61-62 (2) -- education code (1989 revision)5. 63 (1) -- education code (2003 revision)6. 64 (1) -- education reporting flag7. 65-66 (2) -- month of death8. 67-68 (2) -- reserved positions9. 69 (1) -- sex10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated11. 71-73 (3) -- number of units (years, months etc)12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)13. 75-76 (2) -- age recoded into 52 categories14. 77-78 (2) -- age recoded into 27 categories15. 79-80 (2) -- age recoded into 12 categories16. 81-82 (2) -- infant age recoded into 22 categories17. 83 (1) -- place of death18. 84 (1) -- marital status19. 85 (1) -- day of the week of death20. 86-101 (16) -- reserved positions21. 102-105 (4) -- current year22. 106 (1) -- injury at work23. 107 (1) -- manner of death24. 108 (1) -- manner of disposition25. 109 (1) -- autopsy26. 110-143 (34) -- reserved positions27. 144 (1) -- activity code28. 145 (1) -- place of injury29. 146-149 (4) -- ICD code30. 150-152 (3) -- 358 cause recode31. 153 (1) -- reserved position32. 154-156 (3) -- 113 cause recode33. 157-159 (3) -- 130 infant cause recode34. 160-161 (2) -- 39 cause recode35. 162 (1) -- reserved position36. 163-164 (2) -- number of entity-axis conditions37-56. 165-304 (140) -- list of up to 20 conditions57. 305-340 (36) -- reserved positions58. 341-342 (2) -- number of record axis conditions59. 343 (1) -- reserved position60-79. 344-443 (100) -- record axis conditions80. 444 (1) -- reserve position81. 445-446 (2) -- race82. 447 (1) -- bridged race flag83. 448 (1) -- race imputation flag84. 449 (1) -- race recode (3 categories)85. 450 (1) -- race recode (5 categories)86. 461-483 (33) -- reserved positions87. 484-486 (3) -- Hispanic origin88. 487 (1) -- reserved89. 488 (1) -- Hispanic origin/race recode'''record_split = re\.compile(r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')try:rs = np.array(record_split.split(row))[selected_indices]except:rs = np.array(['-99'] * len(selected_indices))return rsdata_file = data_from_file.map(extractInformation) data_file.map(lambda row: row).take(1) data_file.cache() data_file.is_cached True全局作用域和局部作用域
Spark可以在兩種模式下運行:本地和集群。本地運行Spark代碼時和目前使用的python沒有說明不同。然而他如果將相同的代碼部署到集群,便可能會導致大量的困擾,這就需要了解Spark是怎么在集群上執行工作的。這里有一篇文章介紹的很詳細。參考:Spark運行原理詳解
在集群模式下,提交任務時任務發送給了Master節點。該驅動程序節點為任務創建DAG,并且決定哪一個執行者(Worker)節點運行特定的任務。然后該驅動程序知識工作者執行它們的任務,并且在結束時將結果返回給驅動程序。然而在這之前,驅動程序為每一個任務的終止做準備:驅動程序中有一組變量和方法,以變工作者在RDD上執行任務。
這組變量和方法在執行者的上下問本質上是靜態的,每個執行器從驅動程序中獲取的一份變量和方法的副本。這意味著運行任務時,如果執行者改變這些變量或覆蓋這些方法,它不影響任何其他執行者的副本或者驅動程序的變量和方法。這可能會導致一些意想不到的行為和運行錯誤,這些行為和錯誤通常都很難被追蹤到。
轉換
轉換操作可以調整數據集。包括映射、篩選、鏈接、轉換數據集中的值。
.map()轉換
data_2014 = data_file.map(lambda x: x[16]) data_2014.take(10) ['2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '-99'].filter()轉換
data_filter = data_file.filter(lambda x: x[16] == '2014' and x[21] == '0') print(data_filter.count()) data_file.take(2) 22[array(['1', ' ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',' ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ','238', '070', ' ', '24', '01', '11I64 ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', '01','I64 ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40'),array(['1', ' ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',' ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250','214', '062', ' ', '21', '03', '11I250 ', '61I272 ', '62E669 ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', '03','I250 ', 'E669 ', 'I272 ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40')].flatMap()轉換
.flatMap()方法和.map()工作類似,不同的是flatMap()返回一個扁平的結果而不是一個列表。
data_flat = data_file.flatMap(lambda x: (x[16], int(x[16])+1)) data_flat.take(10) ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015].flatMap()可以用于過濾一些格式不正確的記錄。在這個機制下,.flatMap()方法吧每一行看作一個列表對待,然后將所有記錄簡單的加入到一起,通過傳遞一個空列表可以丟棄格式不正確的記錄。
.distinct()轉換
這里用該方法檢查性別列表是否只包含了男性和女性驗證我們是否準確解釋了數據集。
distinct_gender = data_file.map(lambda x: x[5]).distinct() distinct_gender.collect() ['M', 'F', '-99'].sample() 轉換
該方法返回數據集的隨機樣本。第一個參數withReplacement指定采樣是否應該替換,第二個參數fraction定義返回數據量的百分比,第三個參數是偽隨機數產生器的種子seed。
為了節省運算時間,這里選取愿數據千分之一的隨機數據作為下面的練習數據。
data_sample = data_file.sample(False, 0.001, 666) data_sample.cache() PythonRDD[25] at RDD at PythonRDD.scala:48.leftOuterJoin()轉換
- .leftOuterJoin(): 根據兩個數據集中都有得值來連接兩個RDD,并返回左側的RDD記錄,而右邊的記錄副加載兩個RDD匹配的地方。
- .join() :只返回兩個RDD之間的關聯數值
- .intersection():返回兩個RDD中相等的記錄
.repartition()轉換
重新對數據集進行分區,改變數據集分賽區的數量。此功能應該謹慎并且僅當真正需要的時候使用,因為它會充足數據,導致性能產生巨大的影響。
print(len(rdd2.glom().collect())) rdd2 = rdd2.repartition(4) print(len(rdd2.glom().collect())) 3 4動作
.collect() 動作
返回所有RDD的元素給驅動程序
��同時常用的還有: .collectAsMap()方法
.take() 動作
可以說這事最有用的方法,返回單個數據分區的前n行。
rdd.take(1) #等同于: rdd.first().reduce() 動作
該方法使用指定的方法減少RDD中的元素。可以用該方法計算RDD總的元素之和:
rdd1.map(lambda x: x[1]).reduce(lambda x, y: x + y)在每一個分區里,reduce()方法運行求和方法,將改總和返回給最終聚合所在的程序節點。
??警告:
要謹慎注意的是,reduce傳遞的函數需要時關聯的,既滿足元素順序改變結果不變,操作符順序改變結果不變。如:
這里我們希望輸出結果是10.0,第一個只把RDD放在一個分區,輸出結果符合預期。但是在第二個例子中,分了2個區,結果就不對了。因為該方法是在每個分區并行計算的。
.reduceByKey() 動作
該方法和.reduce()方法類似,但是實在key-key基礎上運行:
data_key = sc.parallelize([('a',3), ('a',1), ('b',6), ('d',1), ('b',6), ('d',15), ('d',3), ('a',7), ('b', 8)],4) data_key.reduceByKey(lambda x, y: x+y).collect() [('b', 20), ('a', 11), ('d', 19)].count() 動作
.count() 方法統計出了RDD里所有的元素數量。
rdd.count().count() 方法產生入戲方法同樣的結果,但不需要把整個數據集移動到驅動程序:
len(rdd.collect()). # ??警告:不要這樣做!!.countByKey() 動作
如果數據集是Ket-Value形式,可以使用.countByKey()方法
data_key.countByKey().items() dict_items([('a', 3), ('b', 3), ('d', 3)]).saveAsTextFile() 動作
該方法將RDD保存為文本文件:每個文件一個分區
data_key.saveAsTextFile('hdfs://master:9000/out/data_key.txt')要讀取它的時候需要解析,因為所有行都被視為字符串:
def parseInput(row):import repattern = re.compile(r"\(\'([a-z]+)\',.([0-9]+)\)") # 這里“+”號代表匹配一個或多個匹配字符,否則針對雙位數動作操作會報錯row_split = pattern.split(row)return (row_split[1], row_split[2]) data_key_read = sc.textFile('hdfs://master:9000/out/data_key.txt') data_key_read.map(parseInput).collect() [('a', '3'),('a', '1'),('b', '6'),('d', '1'),('b', '6'),('d', '15'),('d', '3'),('a', '7'),('b', '8')]��同時還有:
- rdd.saveAsHadoopDataset
- rdd.saveAsSequenceFile
- …
等方法
.foreach() 動作
這個方法對RDD里的每個元素,用迭代方法應用相同的函數;和.map()相比,.foreach()方法按照一個接一個的方式,對每一條記錄應用一個定義好的函數。當希望將數據曹村道PySpark本身不支持的數據庫是,該方法很有用。
def f(x):print(x)rdd.foreach(f)小結:
- RDD是Spark的核心;這些無schema數據結構早Spark中處理的最基本的數據結構。
- RDD的兩種創建方式: parallelize 和 文件讀取
- Spark中的轉化是惰性的,只在操作被調用時應用。
- Scala 和 Python RDD之間一個主要的區別是速度: Python RDD 比 Scala 慢很多!
總結
以上是生活随笔為你收集整理的弹性式分布数据集RDD——Pyspark基础 (二)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark运行原理剖析
- 下一篇: 免安装免配置 还免费的Spark 集群