PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解
目錄
前言
一、PySpark基礎功能
?1.Spark SQL 和DataFrame
2.Pandas API on Spark
3.Streaming
4.MLBase/MLlib
5.Spark Core
二、PySpark依賴
Dependencies
三、DataFrame
1.創建
創建不輸入schema格式的DataFrame
創建帶有schema的DataFrame
從Pandas DataFrame創建
通過由元組列表組成的RDD創建
2.查看
DataFrame.show()
spark.sql.repl.eagerEval.enabled
縱向顯示
?查看DataFrame格式和列名
查看統計描述信息
PySpark DataFrame轉換為Pandas DataFrame
?3.查詢
添加新列實例:
條件查詢DataFrame.filter()
?4.運算
Pandas_udf
DataFrame.mapInPandas
5.分組
?聯合分組和應用函數
?6.獲取數據輸入/輸出
CSV
?Parquet
?ORC
?四、結合Spark SQL
點關注,防走丟,如有紕漏之處,請留言指教,非常感謝
前言
要想了解PySpark能夠干什么可以去看看我之前寫的文章,里面很詳細介紹了Spark的生態:
Spark框架深度理解一:開發緣由及優缺點
Spark框架深度理解二:生態圈
Spark框架深度理解三:運行架構、核心數據集RDD
PySpark只是通過JVM轉換使得Python代碼能夠在Spark集群上識別運行。故Spark的絕大多數功能都可以被Python程序使用。
上篇文章:一文速學-PySpark數據分析基礎:PySpark原理詳解
已經把PySpark運行原理講的很清楚了,現在我們需要了解PySpark語法基礎來逐漸編寫PySpark程序實現分布式數據計算。
已搭建環境:
Spark:3.3.0
Hadoop:3.3.3
Scala:2.11.12
JDK:1.8.0_201
PySpark:3.1.2
一、PySpark基礎功能
PySpark是Python中Apache Spark的接口。它不僅可以使用Python API編寫Spark應用程序,還提供了PySpark shell,用于在分布式環境中交互分析數據。PySpark支持Spark的大多數功能,如Spark SQL、DataFrame、Streaming、MLlib(機器學習)和Spark Core。
?1.Spark SQL 和DataFrame
Spark SQL是用于結構化數據處理的Spark模塊。它提供了一種稱為DataFrame的編程抽象,是由SchemaRDD發展而來。不同于SchemaRDD直接繼承RDD,DataFrame自己實現了RDD的絕大多數功能。可以把Spark SQL DataFrame理解為一個分布式的Row對象的數據集合。
Spark SQL已經集成在spark-shell中,因此只要啟動spark-shell就可以使用Spark SQL的Shell交互接口。如果在spark-shell中執行SQL語句,需要使用SQLContext對象來調用sql()方法。Spark SQL對數據的查詢分成了兩個分支:SQLContext和HiveContext,其中HiveContext繼承了SQLContext,因此HiveContext除了擁有SQLContext的特性之外還擁有自身的特性。
Spark SQL允許開發人員直接處理RDD,同時也可查詢例如在 Apache Hive上存在的外部數據。Spark SQL的一個重要特點是其能夠統一處理關系表和RDD,使得開發人員可以輕松地使用SQL命令進行外部查詢,同時進行更復雜的數據分析。
2.Pandas API on Spark
Spark上的pandas API可以擴展使用 python pandas庫。
- 輕松切換到pandas API和PySpark API上下文,無需任何開銷。
- 有一個既適用于pandas(測試,較小的數據集)又適用于Spark(分布式數據集)的代碼庫。
- 熟練使用pandas的話很快上手
3.Streaming
Apache Spark中的Streaming功能運行在Spark之上,支持跨Streaming和歷史數據的強大交互和分析應用程序,同時繼承了Spark的易用性和容錯特性。Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分成一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。
4.MLBase/MLlib
MLlib構建在Spark之上,是一個可擴展的機器學習庫,它提供了一組統一的高級API,幫助用戶創建和調整實用的機器學習管道。MLBase分為四部分:MLlib、MLI、ML Optimizer和MLRuntime。
- ML Optimizer會選擇它認為最適合的已經在內部實現好了的機器學習算法和相關參數,來處理用戶輸入的數據,并返回模型或別的幫助分析的結果;
- MLI 是一個進行特征抽取和高級ML編程抽象的算法實現的API或平臺;
- MLlib是Spark實現一些常見的機器學習算法和實用程序,包括分類、回歸、聚類、協同過濾、降維以及底層優化,該算法可以進行可擴充; MLRuntime 基于Spark計算框架,將Spark的分布式計算應用到機器學習領域。
?
5.Spark Core
Spark Core是Spark平臺的底層通用執行引擎,所有其他功能都構建在其之上。它提供了RDD(彈性分布式數據集)和內存計算能力。
二、PySpark依賴
Dependencies
| Package | 最低版本限制 | Note |
| pandas | 1.0.5 | 支撐Spark SQL |
| Numpy | 1.7 | 滿足支撐MLlib基礎API |
| pyarrow | 1.0.0 | 支撐Spark SQL |
| Py4j | 0.10.9.5 | 要求 |
| pandas | 1.0.5 | pandas API on Spark需要 |
| pyarrow | 1.0.0 | pandas API on Spark需要 |
| Numpy | 1.14 | pandas API on Spark需要 |
請注意,PySpark需要Java 8或更高版本,并正確設置Java_HOME。如果使用JDK 11,請設置Dio.netty.tryReflectionSetAccessible=true?以獲取與箭頭相關的功能。
AArch64(ARM64)用戶注意:PyArrow是PySpark SQL所必需的,但PyArrow 4.0.0中引入了對AArch64的PyArrow支持。如果由于PyArrow安裝錯誤導致PyArrow安裝在AArch64上失敗,可以按如下方式安裝PyArrow>=4.0.0:
pip install "pyarrow>=4.0.0" --prefer-binary三、DataFrame
PySpark應用程序從初始化SparkSession開始,SparkSession是PySpark的入口點,如下所示。如果通過PySpark可執行文件在PySpark shell中運行它,shell會自動在變量spark中為用戶創建會話。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()1.創建
PySpark DataFrame能夠通過pyspark.sql.SparkSession.createDataFrame創建,通常通過傳遞列表(list)、元組(tuples)和字典(dictionaries)的列表和pyspark.sql.Rows,Pandas DataFrame,由此類列表組成的RDD轉換。pyspark.sql.SparkSession.createDataFrame接收schema參數指定DataFrame的架構(優化可加速)。省略時,PySpark通過從數據中提取樣本來推斷相應的模式。
創建不輸入schema格式的DataFrame
from datetime import datetime, date import pandas as pd from pyspark.sql import Rowdf = spark.createDataFrame([Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ]) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]創建帶有schema的DataFrame
df = spark.createDataFrame([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ], schema='a long, b double, c string, d date, e timestamp') df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]從Pandas DataFrame創建
pandas_df = pd.DataFrame({'a': [1, 2, 3],'b': [2., 3., 4.],'c': ['string1', 'string2', 'string3'],'d': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)] }) df = spark.createDataFrame(pandas_df) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]通過由元組列表組成的RDD創建
rdd = spark.sparkContext.parallelize([(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0)) ]) df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e']) df DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]?以上的DataFrame格式創建的都是一樣的。
df.printSchema() root|-- a: long (nullable = true)|-- b: double (nullable = true)|-- c: string (nullable = true)|-- d: date (nullable = true)|-- e: timestamp (nullable = true)2.查看
DataFrame.show()
使用格式:
df.show(<int>) df.show(1) +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+ only showing top 1 rowspark.sql.repl.eagerEval.enabled
spark.sql.repl.eagerEval.enabled用于在notebooks(如Jupyter)中快速生成PySpark DataFrame的配置。控制行數可以使用spark.sql.repl.eagerEval.maxNumRows。
spark.conf.set('spark.sql.repl.eagerEval.enabled', True) df?
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows',1) df?
縱向顯示
行也可以垂直顯示。當行太長而無法水平顯示時,縱向顯示就很明顯。
df.show(1, vertical=True) -RECORD 0------------------a | 1b | 2.0c | string1d | 2000-01-01e | 2000-01-01 12:00:00 only showing top 1 row?查看DataFrame格式和列名
df.columns ['a', 'b', 'c', 'd', 'e'] df.printSchema() root|-- a: long (nullable = true)|-- b: double (nullable = true)|-- c: string (nullable = true)|-- d: date (nullable = true)|-- e: timestamp (nullable = true)查看統計描述信息
df.select("a", "b", "c").describe().show() +-------+---+---+-------+ |summary| a| b| c| +-------+---+---+-------+ | count| 3| 3| 3| | mean|2.0|3.0| null| | stddev|1.0|1.0| null| | min| 1|2.0|string1| | max| 3|4.0|string3| +-------+---+---+-------+DataFrame.collect()將分布式數據收集到驅動程序端,作為Python中的本地數據。請注意,當數據集太大而無法容納在驅動端時,這可能會引發內存不足錯誤,因為它將所有數據從執行器收集到驅動端。
df.collect() [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]?為了避免引發內存不足異常可以使用DataFrame.take()或者是DataFrame.tail():
df.take(1) [Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))] df.tail(1) [Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]PySpark DataFrame轉換為Pandas DataFrame
?PySpark DataFrame還提供了到pandas DataFrame的轉換,以利用pandas API。注意,toPandas還將所有數據收集到driver端,當數據太大而無法放入driver端時,很容易導致內存不足錯誤。
df.toPandas()?
?3.查詢
PySpark DataFrame是惰性計算的,僅選擇一列不會觸發計算,但它會返回一個列實例:
df.a Column<'a'>大多數按列操作都返回列:
from pyspark.sql import Column from pyspark.sql.functions import uppertype(df.c) == type(upper(df.c)) == type(df.c.isNull()) True上述生成的Column可用于從DataFrame中選擇列。例如,DataFrame.select()獲取返回另一個DataFrame的列實例:
df.select(df.c).show() +-------+ | c| +-------+ |string1| |string2| |string3| +-------+添加新列實例:
df.withColumn('upper_c', upper(df.c)).show() +---+---+-------+----------+-------------------+-------+ | a| b| c| d| e|upper_c| +---+---+-------+----------+-------------------+-------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1| | 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2| | 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3| +---+---+-------+----------+-------------------+-------+條件查詢DataFrame.filter()
df.filter(df.a == 1).show() +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+?4.運算
Pandas_udf
PySpark支持各種UDF和API,允許用戶執行Python本機函數。另請參閱最新的Pandas UDF(?Pandas UDFs)和Pandas Function API(?Pandas Function APIs)。例如,下面的示例允許用戶在Python本機函數中直接使用pandas Series中的API。
Apache Arrow in PySpark
import pandas as pd from pyspark.sql.functions import pandas_udf@pandas_udf('long') def pandas_plus_one(series: pd.Series) -> pd.Series:# Simply plus one by using pandas Series.return series + 1df.select(pandas_plus_one(df.a)).show() +------------------+ |pandas_plus_one(a)| +------------------+ | 2| | 3| | 4| +------------------+DataFrame.mapInPandas
DataFrame.mapInPandas允許用戶在pandas DataFrame中直接使用API,而不受結果長度等任何限制。
def pandas_filter_func(iterator):for pandas_df in iterator:yield pandas_df[pandas_df.a == 1]df.mapInPandas(pandas_filter_func, schema=df.schema).show() +---+---+-------+----------+-------------------+ | a| b| c| d| e| +---+---+-------+----------+-------------------+ | 1|2.0|string1|2000-01-01|2000-01-01 12:00:00| +---+---+-------+----------+-------------------+5.分組
PySpark DataFrame還提供了一種使用常見方法,即拆分-應用-合并策略來處理分組數據的方法。它根據特定條件對數據進行分組,對每個組應用一個函數,然后將它們組合回DataFrame。
df = spark.createDataFrame([['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2']) df.show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 1| 10| | blue|banana| 2| 20| | red|carrot| 3| 30| | blue| grape| 4| 40| | red|carrot| 5| 50| |black|carrot| 6| 60| | red|banana| 7| 70| | red| grape| 8| 80| +-----+------+---+---+?分組,然后將avg()函數應用于結果組。
df.groupby('color').avg().show() +-----+-------+-------+ |color|avg(v1)|avg(v2)| +-----+-------+-------+ | red| 4.8| 48.0| | blue| 3.0| 30.0| |black| 6.0| 60.0| +-----+-------+-------+還可以使用pandas API對每個組應用Python自定義函數。
def plus_mean(pandas_df):return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 0| 60| | blue|banana| -1| 20| | blue| grape| 1| 40| | red|banana| -3| 10| | red|carrot| -1| 30| | red|carrot| 0| 50| | red|banana| 2| 70| | red| grape| 3| 80| +-----+------+---+---+?聯合分組和應用函數
df1 = spark.createDataFrame([(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],('time', 'id', 'v1'))df2 = spark.createDataFrame([(20000101, 1, 'x'), (20000101, 2, 'y')],('time', 'id', 'v2'))def asof_join(l, r):return pd.merge_asof(l, r, on='time', by='id')df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(asof_join, schema='time int, id int, v1 double, v2 string').show() +--------+---+---+---+ | time| id| v1| v2| +--------+---+---+---+ |20000101| 1|1.0| x| |20000102| 1|3.0| x| |20000101| 2|2.0| y| |20000102| 2|4.0| y| +--------+---+---+---+?6.獲取數據輸入/輸出
CSV簡單易用。Parquet和ORC是高效緊湊的文件格式,讀寫速度更快。
PySpark中還有許多其他可用的數據源,如JDBC、text、binaryFile、Avro等。另請參閱Apache Spark文檔中最新的Spark SQL、DataFrames和Datasets指南。Spark SQL, DataFrames and Datasets Guide
CSV
df.write.csv('foo.csv', header=True) spark.read.csv('foo.csv', header=True).show()這里記錄一個報錯:
java.lang.UnsatisfiedLinkError:org.apache.hadoop.io.nativeio.NativeIO$Windows.access0?將Hadoop安裝目錄下的 bin 文件夾中的 hadoop.dll 和 winutils.exe 這兩個文件拷貝到 C:\Windows\System32 下,問題解決。
+---+---+-------+----------+--------------------+ | a| b| c| d| e| +---+---+-------+----------+--------------------+ | 1|2.0|string1|2000-01-01|2000-01-01T12:00:...| | 2|3.0|string2|2000-02-01|2000-01-02T12:00:...| | 3|4.0|string3|2000-03-01|2000-01-03T12:00:...| +---+---+-------+----------+--------------------+?Parquet
df.write.parquet('bar.parquet') spark.read.parquet('bar.parquet').show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ |black|carrot| 6| 60| | blue|banana| 2| 20| | blue| grape| 4| 40| | red|carrot| 5| 50| | red|banana| 7| 70| | red|banana| 1| 10| | red|carrot| 3| 30| | red| grape| 8| 80| +-----+------+---+---+?ORC
df.write.orc('zoo.orc') spark.read.orc('zoo.orc').show() +-----+------+---+---+ |color| fruit| v1| v2| +-----+------+---+---+ | red|banana| 7| 70| | red| grape| 8| 80| |black|carrot| 6| 60| | blue|banana| 2| 20| | red|banana| 1| 10| | red|carrot| 5| 50| | red|carrot| 3| 30| | blue| grape| 4| 40| +-----+------+---+---+?四、結合Spark SQL
DataFrame和Spark SQL共享同一個執行引擎,因此可以無縫地互換使用。例如,可以將數據幀注冊為表,并按如下方式輕松運行SQL:
df.createOrReplaceTempView("tableA") spark.sql("SELECT count(*) from tableA").show() +--------+ |count(1)| +--------+ | 8| +--------+?此外UDF也可在現成的SQL中注冊和調用
@pandas_udf("integer") def add_one(s: pd.Series) -> pd.Series:return s + 1spark.udf.register("add_one", add_one) spark.sql("SELECT add_one(v1) FROM tableA").show()?
這些SQL表達式可以直接混合并用作PySpark列。
from pyspark.sql.functions import exprdf.selectExpr('add_one(v1)').show() df.select(expr('count(*)') > 0).show()點關注,防走丟,如有紕漏之處,請留言指教,非常感謝
以上就是本期全部內容。我是fanstuck ,有問題大家隨時留言討論 ,我們下期見。
總結
以上是生活随笔為你收集整理的PySpark数据分析基础:PySpark基础功能及DataFrame操作基础语法详解的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于流量套利你需要知道的一切
- 下一篇: python热力图——“星空代码”