pyspark rdd 基本操作
生活随笔
收集整理的這篇文章主要介紹了
pyspark rdd 基本操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
pyspark rdd 基本操作
原文鏈接
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Fri Mar 8 17:09:44 2019@author: lg """from pyspark import SparkContext ,SparkConfconf=SparkConf().setAppName("miniProject").setMaster("local[4]") #conf=SparkConf().setAppName("lg").setMaster("spark://192.168.10.182:7077") sc = SparkContext(conf=conf)#創建RDD #接下來我們使用parallelize方法創建一個RDD: intRDD = sc.parallelize([3,1,2,5,5]) stringRDD = sc.parallelize(['Apple','Orange','Grape','Banana','Apple'])#RDD轉換為Python數據類型 #RDD類型的數據可以使用collect方法轉換為python的數據類型: print (intRDD.collect()) print (stringRDD.collect())#map運算可以通過傳入的函數,將每一個元素經過函數運算產生另外一個RDD。 #比如下面的代碼中,將intRDD中的每個元素加1之后返回,并轉換為python數組輸出: print (intRDD.map(lambda x:x+1).collect())#filter運算 #filter可以用于對RDD內每一個元素進行篩選,并產生另外一個RDD。 #下面的例子中,我們篩選intRDD中數字小于3的元素,同事篩選stringRDD中包含ra的字符串: print (intRDD.filter(lambda x: x<3).collect()) print (stringRDD.filter(lambda x:'ra' in x).collect())#distinct運算 #distinct運算會刪除重復的元素,比如我們去除intRDD中的重復元素1:print (intRDD.distinct().collect())#randomSplit運算 #randomSplit 運算將整個集合以隨機數的方式按照比例分為多個RDD,比如按照0.4和0.6的比例將intRDD分為兩個RDD,并輸出:# sRDD = intRDD.randomSplit([0.4,0.6]) print (len(sRDD)) print (sRDD[0].collect()) print (sRDD[1].collect())#groupBy運算 #groupBy運算可以按照傳入匿名函數的規則,將數據分為多個Array。比如下面的代碼將intRDD分為偶數和奇數:result = intRDD.groupBy(lambda x : x % 2).collect() print (sorted([(x, sorted(y)) for (x, y) in result]))#3、多個RDD轉換運算 #RDD也支持執行多個RDD的運算,這里,我們定義三個RDD:intRDD1 = sc.parallelize([3,1,2,5,5]) intRDD2 = sc.parallelize([5,6]) intRDD3 = sc.parallelize([2,7]) #并集運算 #可以使用union函數進行并集運算: print (intRDD1.union(intRDD2).union(intRDD3).collect()) #交集運算 #可以使用intersection進行交集運算: print (intRDD1.intersection(intRDD2).collect())#差集運算 #可以使用subtract函數進行差集運算: print (intRDD1.subtract(intRDD2).collect()) #笛卡爾積運算 #可以使用cartesian函數進行笛卡爾乘積運算: print (intRDD1.cartesian(intRDD2).collect()) #基本“動作”運算 #讀取元素 #可以使用下列命令讀取RDD內的元素,這是Actions運算,所以會馬上執#取第一條數據 print (intRDD.first()) #取前兩條數據 print (intRDD.take(2)) #升序排列,并取前3條數據 print (intRDD.takeOrdered(3)) #降序排列,并取前3條數據 print (intRDD.takeOrdered(3,lambda x:-x))#統計功能 #可以將RDD內的元素進行統計運算:#統計 print (intRDD.stats()) #最小值 print (intRDD.min()) #最大值 print (intRDD.max()) #標準差 print (intRDD.stdev()) #計數 print (intRDD.count()) #求和 print (intRDD.sum()) #平均 print (intRDD.mean())#5、RDD Key-Value基本“轉換”運算 #Spark RDD支持鍵值對運算,Key-Value運算時mapreduce運算的基礎,本節介紹RDD鍵值的基本“轉換”運算。 #初始化 #我們用元素類型為tuple元組的數組初始化我們的RDD,這里,每個tuple的第一個值將作為鍵,而第二個元素將作為值kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) #得到key和value值 #可以使用keys和values函數分別得到RDD的鍵數組和值數組:print (kvRDD1.keys().collect()) print (kvRDD1.values().collect())#篩選元素 #可以按照鍵進行元素篩選,也可以通過值進行元素篩選,和之前的一樣,使用filter函數, #這里要注意的是,雖然RDD中是以鍵值對形式存在,但是本質上還是一個二元組,二元組的第一個值代表鍵, #第二個值代表值,所以按照如下的代碼既可以按照鍵進行篩選,我們篩選鍵值小于5的數據:print (kvRDD1.filter(lambda x:x[0] < 5).collect())print (kvRDD1.filter(lambda x:x[1] < 5).collect()) #值運算 #我們可以使用mapValues方法處理value值,下面的代碼將value值進行了平方處理: print (kvRDD1.mapValues(lambda x:x**2).collect())#按照key排序 #可以使用sortByKey按照key進行排序,傳入參數的默認值為true, #是按照從小到大排序,也可以傳入參數false,表示從大到小排序:print (kvRDD1.sortByKey().collect()) print (kvRDD1.sortByKey(True).collect()) print (kvRDD1.sortByKey(False).collect())#合并相同key值的數據 #使用reduceByKey函數可以對具有相同key值的數據進行合并。比如下面的代碼, #由于RDD中存在(3,4)和(3,6)兩條key值均為3的數據,他們將被合為一條數據: print (kvRDD1.reduceByKey(lambda x,y:x+y).collect())#多個RDD Key-Value“轉換”運算初始化kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)]) kvRDD2 = sc.parallelize([(3,8)])#內連接運算 #join運算可以實現類似數據庫的內連接,將兩個RDD按照相同的key值join起來, #kvRDD1與kvRDD2的key值唯一相同的是3,kvRDD1中有兩條key值為3的數據(3,4)和(3,6), #而kvRDD2中只有一條key值為3的數據(3,8),所以join的結果是(3,(4,8)) 和(3,(6,8)): print (kvRDD1.join(kvRDD2).collect())#左外連接 #使用leftOuterJoin可以實現類似數據庫的左外連接,如果kvRDD1的key值對應不到kvRDD2,就會顯示None #使用leftOuterJoin可以實現類似數據庫的左外連接,如果kvRDD1的key值對應不到kvRDD2,就會顯示None print (kvRDD1.leftOuterJoin(kvRDD2).collect())#右外連接 #使用rightOuterJoin可以實現類似數據庫的右外連接,如果kvRDD2的key值對應不到kvRDD1,就會顯示None print (kvRDD1.rightOuterJoin(kvRDD2).collect())#刪除相同key值數據 #使用subtractByKey運算會刪除相同key值得數據:print (kvRDD1.subtractByKey(kvRDD2).collect())#Key-Value“動作”運算 #讀取數據 #可以使用下面的幾種方式讀取RDD的數據: #讀取第一條數據 print (kvRDD1.first()) #讀取前兩條數據 print (kvRDD1.take(2)) #讀取第一條數據的key值 print (kvRDD1.first()[0]) #讀取第一條數據的value值 print (kvRDD1.first()[1])#按key值統計: #使用countByKey函數可以統計各個key值對應的數據的條數:#print (kvRDD1.countByKey().collect())#lookup查找運算 #使用lookup函數可以根據輸入的key值來查找對應的Value值: print (kvRDD1.lookup(3))#持久化操作 #spark RDD的持久化機制,可以將需要重復運算的RDD存儲在內存中,以便大幅提升運算效率,有兩個主要的函數: #持久化 #使用persist函數對RDD進行持久化:kvRDD1.persist() #使用unpersist函數對RDD進行持久化: kvRDD1.unpersist()posted on 2019-03-08 18:34 luoganttcc 閱讀(...) 評論(...) 編輯 收藏
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的pyspark rdd 基本操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: pyspark rdd 数据持久化
- 下一篇: pyspark 读取本txt 构建RD