pyspark基础学习——数据处理
目錄
- 前言
- 一、準備工作和數(shù)據(jù)的導(dǎo)入選擇
- 1.1 導(dǎo)入數(shù)據(jù)
- 1.2 選擇數(shù)據(jù)子集:
- 1.3 列名重命名
- 二、數(shù)據(jù)清洗
- 2.1 檢測空值數(shù)量
- 2.2 刪除存在空值的行
- 2.3 forward,backward填充
- 三、 數(shù)據(jù)處理
- 3.1 數(shù)據(jù)篩選
- 3.2 數(shù)據(jù)統(tǒng)計
- 3.3 數(shù)據(jù)類型轉(zhuǎn)換
- 3.4 采用SQL語法進行處理
- 四、數(shù)據(jù)導(dǎo)出
- 總結(jié)
前言
上一篇文章中講了如何在windows下安裝和檢測: pyspark,同時簡單介紹了運行的環(huán)境。本文想就我的一些學(xué)習(xí)經(jīng)驗,分享一下使用pyspark來處理csv文件上的一些常用的pyspark語法。
一、準備工作和數(shù)據(jù)的導(dǎo)入選擇
運行python代碼,第一件事當(dāng)然是導(dǎo)入對應(yīng)的包,同時我們要為spark先創(chuàng)建好相應(yīng)的環(huán)境,并且,spark中支持SQL,而且在SQL中有眾多的函數(shù),因此我們可以創(chuàng)建SparkSession對象,為了后續(xù)SQL函數(shù)的調(diào)用,我們要導(dǎo)入functions包,以及數(shù)據(jù)類型轉(zhuǎn)換的時候,我們要導(dǎo)入types的包。
import pyspark from pyspark.sql import SparkSession from pyspark.sql.types import * from pyspark.sql.functions import * from pyspark.sql.types import TimestampType spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext1.1 導(dǎo)入數(shù)據(jù)
將csv文件導(dǎo)入為Dataframe樣式:
header表示是否需要導(dǎo)入表頭;inferSchema表示是否需要推導(dǎo)出數(shù)據(jù)的類型(false默認為string);delimiter表示指定分隔符進行讀取。file對應(yīng)文件的位置。
1.2 選擇數(shù)據(jù)子集:
drop中填入不需要的列的列名。
df2 = df1.drop('列名')1.3 列名重命名
df3=df2.withColumnRenamed("original name", "modified name")如果有多個列的列名要進行修改,可以直接在后面再加上withColumnRenamed()進行修改
二、數(shù)據(jù)清洗
因為數(shù)據(jù)本身的問題,在處理的過程中需要我們對一些空值、異常值等進行處理。但是此次作業(yè)獲取到的數(shù)據(jù)中主要是對空值的處理,因此對于異常值的處理不進行討論
2.1 檢測空值數(shù)量
df3.toPandas().isnull().sum()2.2 刪除存在空值的行
對于一些關(guān)鍵列的數(shù)據(jù)丟失、或是該行的缺失值占比較高的情況下,我們很難將人工將其彌補,因此直接對該行進行刪除。
df_clear=df3.dropna(subset='列名')2.3 forward,backward填充
forward: 前面一個值填充后面
backward:后面一個值填充前面
代碼示例:
df = spark.createDataFrame([(1, 'd1',None),(1, 'd2',10),(1, 'd3',None),(1, 'd4',30),(1, 'd5',None),(1, 'd6',None), ],('id', 'day','temperature')) df.show()運行結(jié)果如下:
| 1 | d1 | null |
| 1 | d2 | 10 |
| 1 | d3 | null |
| 1 | d4 | 30 |
| 1 | d5 | null |
| 1 | d6 | null |
填充后的結(jié)果如下表所示:
| 1 | d1 | null | null | 10 |
| 1 | d2 | 10 | 10 | 10 |
| 1 | d3 | null | 10 | 30 |
| 1 | d4 | 30 | 30 | 30 |
| 1 | d5 | null | 30 | null |
| 1 | d6 | null | 30 | null |
Window.unboundedPreceding:分區(qū)的開始位置
Window.currentRow:分區(qū)計算到現(xiàn)在的位置
Window.unboundedFollowing:分區(qū)的最后位置。
負數(shù):表示若前面有元素,范圍向前延申幾個元素
0:表示當(dāng)前位置,等價于Window.currentRow
正數(shù):表示若后面有元素,范圍向后延申幾個元素
三、 數(shù)據(jù)處理
3.1 數(shù)據(jù)篩選
data1= df_clear.filter(df_clear['column'] == 'attribute') # 條件過濾 data2 = df_clear.select('column') # 選擇某一列的數(shù)據(jù)3.2 數(shù)據(jù)統(tǒng)計
# 輸出樹狀結(jié)構(gòu)(輸出列名、數(shù)據(jù)類型和是否能為空值) df_clear.printSchema() # 將該列數(shù)據(jù)進行匯總統(tǒng)計 df_clear.select('column').describe().show() # 求平均,按照id的方式進行統(tǒng)計 ave_column = df_clear.groupBy('id').agg({'column': 'mean'})agg({“列名”,“函數(shù)名”})為聚合函數(shù),其中有:
| avg | 求均值 |
| count | 計數(shù) |
| max | 求最大值 |
| mean | 求均值 |
| min | 求最小值 |
| sum | 求和 |
3.3 數(shù)據(jù)類型轉(zhuǎn)換
from pyspark.sql.functions import * # 轉(zhuǎn)換為Int類型 df_clear.withColumn("column",df.age.cast('int'))# 轉(zhuǎn)換為String類型 df_clear.withColumn("column",df.age.cast('string'))# 轉(zhuǎn)換為Data類型 df_clear= df_clear.withColumn('column', to_date(df_clear['column']))# 轉(zhuǎn)換為TimestampType類型 dfTime=df_clear.withColumn('column',F.col('column').cast(TimestampType()))3.4 采用SQL語法進行處理
df_sql_cf=df_clear.createOrReplaceTempView("carflow") spark.sql("select * from carflow\where sum_Total_CF=\(select max(sum_Total_CF) from carflow)").show()四、數(shù)據(jù)導(dǎo)出
# ascending表示是否為升序,默認為True df_clear_asc= df_clear.orderBy("column",ascending=False) # 將對應(yīng)的數(shù)據(jù)類型轉(zhuǎn)化為list,再導(dǎo)出為csv文件 df_asc= df_clear_asc.select(F.collect_list('column')).first()[0] df_asc.select("col1","col2","col3").toPandas().to_csv("total.csv")總結(jié)
由于此次學(xué)習(xí)僅用于完成課堂大作業(yè),因此有不足之處還望各位大佬在評論區(qū)制指正,若是能夠為你們提供一點小小的幫助,希望各位大佬們能動動手指,給小弟一個贊!感謝各位大佬們!
該作業(yè)的處理的源代碼和相關(guān)數(shù)據(jù)已經(jīng)傳至github
總結(jié)
以上是生活随笔為你收集整理的pyspark基础学习——数据处理的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Ubuntu系统USDT、LTC、DAS
- 下一篇: jass 脚本bug