Pyspark:DataFrame的转化操作及行动操作
Spark版本:V3.2.1
持續補充
序言
??Spark DataFrame中的創建及常用的列操作可以參考博客:https://blog.csdn.net/yeshang_lady/article/details/89528090
正文
??因為Spark DataFrame是基于RDD創建的,所以DataFrame的操作也可以分為兩種類型:轉化操作和行動操作。轉化操作可以將Spark DataFrame轉化為新的DataFrame,而不改變原有數據。轉化操作都是惰性的,不會立即計算出來。而行動操作會觸發所有轉化操作的實際求值。
1. 行動操作
1.1 show展示數據
show方法作可以以表格的形式展示DataFrame中的數據,該方法主要有以下幾個參數:
- n:要展示的數據行數;
- truncate: 是否對字符串截斷,也可以直接使用該字段指定顯示的字符串長度數;
- vertical: 是否垂直顯示DataFrame中的行;
其用法舉例如下:
from pyspark.sql.types import * from pyspark.sql import SparkSession from pyspark.sql import functions as func import pandas as pd from pyspark.sql.functions import pandas_udf spark=SparkSession.builder.appName("jsonRDD").getOrCreate() data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.show(2,truncate=3) df.show(vertical=True)其結果如下:
1.2 獲取所有數據到數組
show方法只能將DataFrame中的數據展示出來,但無法使用變量接收DataFrame。為了獲取數據,可以使用collect方法將DataFrame中的數據保存到List對象中。具體用法如下:
data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df_array=df.collect() print(df_array)其結果如下:
需要注意一點,collect方法會將集群中DataFrame的所有數據取回到一個節點當中,所以單臺節點的內存不足以保存整個DataFrame中的所有數據時就會報內存溢出錯誤。
1.3 獲取若干行記錄
first、head、take、tail這四個方法可以獲取DataFrame中的若干行記錄。這四個方法比較類似,其中:
- first: 該方法獲取DataFrame的第1行記錄。
- head: 該方法獲取DataFrame的前nnn行記錄。該方法只適用于DataFrame數據量較小,且所有數據都保存在內存中的情況。
- take: 獲取DataFrame的前nnn行記錄。
- tail: 獲取DataFrame的后nnn行記錄。
僅以take為例進行用法說明(結果不再展示):
data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) res_1=df.take(2) print(res_1)1.4 將DataFrame轉化pandas.DataFrame
toPandas方法可以將spark DataFrame轉化為Pandas DataFrame。用法如下:
data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) dt=df.toPandas() print(dt)其結果如下:
2. 轉化操作
在具體介紹轉化操作之前,需要說明以下幾點:
- Spark DataFrame中的轉化操作方法中的字段名參數的數據類型一般為:String類型及Column對象,或者這兩種對象組成的List對象。當方法能同時接收多個字段名時,String類型和Column不能混用。
- Spark DataFrame的轉化操作返回的結果均為Spark DataFrame類型,所以這些方法可以連續使用。
2.1 decribe獲取指定字段的統計信息
describe方法接收一個或多個String類型(Column類型不可以)的字段名,返回對應字段的統計值,包括:Count、Mean、Stddev、Min、Max。該方法也可以不指定字段名,此時則返回所有字段的統計信息。具體用法如下:
data=[['Alice',26],['Jessica',23],['Shirely',33]] df=spark.createDataFrame(data,['Name','age']) df.describe(['Name','age']).show()其結果如下:
2.2 where/filter篩選數據
where和filter的作用相同,都可以對DataFrame的數據進行篩選。這里僅以where方法為例進行說明。where(condition)中的condition可以接收兩種參數類型。具體如下:
- 當接收的參數為String型時,其寫法參照SQL語言中where子句;
- 當接收的參數為Column類型時,對于每一個字段的篩選要求需要分別描述,然后使用邏輯運算組合起來即可(與或非寫法為:&、|、~)。
其用法舉例如下:
data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #條件: C2不為空且C1大于25 df.where('C2 is not null and C1>25').show() #條件: C2為空 df.where(func.isnull('C2')).show() #條件: C2不為空且C3長度大于3 df.where(~func.isnull('C2')).where(func.length('C3')>3).show() #條件: C2不為空或C1大于25 df.filter((~func.isnull('C2'))|(func.col('C1')>25)).show()其結果如下:
2.3 select/selectExpr查詢指定列
select和selectExpr的作用相同,區別在于這兩個方法接收的參數類型不同。具體如下:
- select:該操作接收Sting類型(列名)、Column或List型的參數。如果想要查詢所有列,也可以使用?*?;
- selectExpr:該操作接收SQL表達式,可以同時對特定字段進行函數處理;
其用法舉例如下:
data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) #查詢所有列 df.select('*').show() #多個Column列組成的List df.select([func.length('C3'),func.lower('C3')]).show() #接收SQL表達式 df.selectExpr('length(C3)','C1>25').show()其結果如下:
2.4 drop刪除指定列
drop方法中既可以接String型的參數,也可以使用Column型參數。使用前者時,可以同時刪除多列,使用后者時一次只能刪除一列。其用法舉例如下:
data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.drop('C1').show() df.drop('C1','C2').show() #注意這里不能寫成List df.drop(df.C1).show()其結果如下:
2.5 limit獲取前n行記錄
limit方法獲取指定DataFrame的前n行記錄,其用法如下:
data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.limit(2).show()其結果如下:
2.6 orderBy/sort/sortWithinPartitions按指定字段排序
orderBy和sort方法使用方法相同,而sortWithinPartitions方法可以對每個Parition排序,這里僅以sort為例進行說明。sort方法可以接受兩種類型的參數,不同的參數類型,排序的說明方式不同。具體如下:
- 在使用Column型參數時,在Column后面加.desc()表示降序,.asc()表示升序(也可以使用functions包中的asc和desc方法)。
- 當使用String型變量時,使用ascending參數來指定排序方向。
具體用法舉例如下:
data=[[34,None,'Shirely'],[24,89,'Alice'],[30,90,'Mark']] df=spark.createDataFrame(data,['C1','C2','C3']) df.sort(['C1','C2'],ascending=[0,1]).show() df.sort([func.desc('C2'),func.asc('C3')]).show()其結果如下:
2.7 groupBy對字段進行分組聚合
groupBy方法可以對數據進行分組,其得到的是GroupedData類型對象。該對象的API提供了一些聚合操作。具體如下:
- avg、max、min、mean、count、sum方法
這里方法只能接String類型的變量名,并且會自動忽略非數值型的字段。用法如下:
其結果如下:
- agg:自定義聚合函數。
agg方法中的自定義函數既可以使用avg、max等內置的聚合函數,也可以使用pyspark.sql.functions.pandas_udf定義的GROUPED_AGG類型的函數。舉例如下:
其結果如下:
- apply/applyInPandas:使用Pandas中的函數
applyInPandas和apply方法作用相同,但apply在未來的Spark版本中會被廢棄掉,且相比applyInPandas方法,apply中函數的定義稍嫌麻煩。這里僅介紹applyInPandas的用法。具體如下:
其結果如下:
注意applyInPandas方法中的schema參數中指定的是自定義函數的返回值的類型信息,這個參數可以使用DDL格式的字符串也可以使用pyspark.sql.types.DataType類型對象。
- pivot: 透視表
pivot方法返回的對象類型仍為GroupedData類型,所以agg、avg等方法仍然可以繼續使用。舉例如下:
其結果如下:
2.8 去重操作
Spark DataFrame中提供了兩種去重操作,具體如下:
- distinct:返回一個不包含重復記錄的DataFrame。
- drop_duplicates:根據指定字段去重。
具體用法如下:
data=[[1,2],[1,2],[3,4],[1,3]] df=spark.createDataFrame(data,['A','B']) df.distinct().show() df.drop_duplicates(['A']).show()其結果如下:
2.9 合并操作
Spark DataFrame中提供的對兩個DataFrame進行合并的方法有:union、unionAll和unionByName。具體如下:
- union、unionAll對兩個字段數目一致的DataFrame進行合并。該兩個方法在合并時并不會檢查DataFrame的字段類型及字段名,只會按照字段的位置進行合并。該方法與SQL中Union all作用相同。舉例如下:
其結果如下:
- unionByName:該方法會按照兩個DataFrame中同名的字段名進行合并,該方法不要求兩個DataFrame的字段數目相同。具體如下:
其結果如下:
2.10 join操作
join的作用與SQL中的join操作作用類似,這里不贅述。用法舉例如下;
data1=[[1,2],[5,4],[7,3]] df1=spark.createDataFrame(data1,['A','B']) data2=[[5,6],[7,8]] df2=spark.createDataFrame(data2,['A','B']) df1.join(df2,df1['A']==df2['A'],how='outer').select(df1.A,df1.B,df2.B.alias('B_1')).show()其結果如下:
2.11 stat獲取指定字段統計信息
stat方法可計算指定字段或指定字段之間的統計信息,比如方差、協方差、頻繁出現的元素集合等。DataFrame.stat下的子調用接口如下表:
| approxQuantile | 計算數值列的近似百分位數( 關于該函數還有一些問題沒解決,以后補充) |
| corr | 計算兩個字段之的相關性 |
| cov | 計算兩個字段的協方差 |
| crosstab | 交叉表 |
| freqItems | 計算某一列或某幾列中出現頻繁的值的集合,support參數規定頻繁項的最低支持度 |
| sampleBy | 對指定列數據進行采樣,需要指定某列具體數值的抽樣比例 |
舉例如下:
df=spark.read.csv('/data/mnm_dataset.csv',schema=schema,header=True)df.stat.freqItems(cols=['State','Color'],support=0.9).show() df.stat.sampleBy('Color',fractions={'Yellow':0.001}).show() df.stat.crosstab('State','Color').show()其結果如下:
2.12 集合類操作
Spark DataFramet提供的集合類操作如下:
- intersect\intersectAll:獲取兩個DataFrame中共有的記錄;
- exceptAll:獲取一個DataFrame中有另一個DataFrame中沒有的數據記錄;
其具體用法如下:
data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) data2=[[1,2]] df2=spark.createDataFrame(data2,['A','B']) data3=[[2,1]] df3=spark.createDataFrame(data3,['B','A'])df1.intersect(df2).show() df1.intersectAll(df3).show() df1.exceptAll(df2).show()其結果如下:
這里需要注意,兩個DataFrame進行集合操作時并不檢查列名,而是依靠列的位置進行判斷的,所以df1和df3的交集結果為空。
2.13 操作字段名
Spark DataFrame常用的操作字段名方法主要包括以下兩種:
- withColumnRenamed: 對DataFrame中的某列改名;
- withColumn: 對DataFrame中新增一列;
其用法如下:
data1=[[1,2],[3,4],[4,5]] df1=spark.createDataFrame(data1,['A','B']) df1.withColumn('C',(df1['A']>3).alias('C')).show() df1.withColumnRenamed('A','A_1').show()其結果如下:
2.14 處理空值列
na方法可以對具有空值列的行數據進行處理,其提供了三種處理方法,具體如下:
- drop: 刪除指定列的空值行,也可以直接使用dropna方法。
- fill: 使用指定的值替換指定空值列的值。通過傳入指定空值列列名以及該空值列替換值組成的Map對象傳入fill方法來替換指定空值列的值。該方法與fillna方法作用相同。
- replace:對值進行替換;
舉例如下:
data=[[1,None,None],[2,4,8],[9,None,11]] df=spark.createDataFrame(data,['A','B','C']) df.show() #刪除數據-只有有一個空值該行數據就刪除 df.na.drop(how='any').show() #刪除數據-若一行數據中的空值數>=thresh,則該行記錄刪除 df.na.drop(thresh=2).show() #用0進行空值填充 df.na.fill(0).show() #不同的列用不同的值填充 df.na.fill({'B':0,'C':1}).show() #數據替換-將A列中的2替換成4,9替換成8 df.na.replace([2,9],[4,8],'A').show()其結果如下:
總結
以上是生活随笔為你收集整理的Pyspark:DataFrame的转化操作及行动操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: JavaSript入门_案例01_ 实现
- 下一篇: php加入购物车怎样实现_PHP实现添加