dataframe 如何选中某列的一行_PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)
作者:Pinar Ersoy
翻譯:孫韜淳
校對(duì):陳振東
本文約2500字,建議閱讀10分鐘
本文通過介紹Apache Spark在Python中的應(yīng)用來講解如何利用PySpark包執(zhí)行常用函數(shù)來進(jìn)行數(shù)據(jù)處理工作。
Apache Spark是一個(gè)對(duì)開發(fā)者提供完備的庫和API的集群計(jì)算系統(tǒng),并且支持多種語言,包括Java,Python,R和Scala。SparkSQL相當(dāng)于Apache Spark的一個(gè)模塊,在DataFrame API的幫助下可用來處理非結(jié)構(gòu)化數(shù)據(jù)。
通過名為PySpark的Spark Python API,Python實(shí)現(xiàn)了處理結(jié)構(gòu)化數(shù)據(jù)的Spark編程模型。
這篇文章的目標(biāo)是展示如何通過PySpark運(yùn)行Spark并執(zhí)行常用函數(shù)。
Python編程語言要求一個(gè)安裝好的IDE。最簡單的方式是通過Anaconda使用Python,因其安裝了足夠的IDE包,并附帶了其他重要的包。
1、下載Anaconda并安裝PySpark
通過這個(gè)鏈接,你可以下載Anaconda。你可以在Windows,macOS和Linux操作系統(tǒng)以及64位/32位圖形安裝程序類型間選擇。我們推薦安裝Python的最新版本。
Anaconda的安裝頁面(https://www.anaconda.com/distribution/)
下載好合適的Anaconda版本后,點(diǎn)擊它來進(jìn)行安裝,安裝步驟在Anaconda Documentation中有詳細(xì)的說明。
安裝完成時(shí),Anaconda導(dǎo)航主頁(Navigator Homepage)會(huì)打開。因?yàn)橹皇鞘褂肞ython,僅需點(diǎn)擊“Notebook”模塊中的“Launch”按鈕。
Anaconda導(dǎo)航主頁
為了能在Anaconda中使用Spark,請(qǐng)遵循以下軟件包安裝步驟。
第一步:從你的電腦打開“Anaconda Prompt”終端。
第二步:在Anaconda Prompt終端中輸入“conda install pyspark”并回車來安裝PySpark包。
第三步:在Anaconda Prompt終端中輸入“conda install pyarrow”并回車來安裝PyArrow包。
當(dāng)PySpark和PyArrow包安裝完成后,僅需關(guān)閉終端,回到Jupyter Notebook,并在你代碼的最頂部導(dǎo)入要求的包。
import pandas as pdfrom pyspark.sql import SparkSessionfrom pyspark.context import SparkContextfrom pyspark.sql.functionsimport *from pyspark.sql.typesimport *from datetime import date, timedelta, datetimeimport time2、初始化SparkSession
首先需要初始化一個(gè)Spark會(huì)話(SparkSession)。通過SparkSession幫助可以創(chuàng)建DataFrame,并以表格的形式注冊(cè)。其次,可以執(zhí)行SQL表格,緩存表格,可以閱讀parquet/json/csv/avro數(shù)據(jù)格式的文檔。
sc = SparkSession.builder.appName("PysparkExample") ???.config ("spark.sql.shuffle.partitions", "50") ???.config("spark.driver.maxResultSize","5g") ???.config ("spark.sql.execution.arrow.enabled", "true") ???.getOrCreate()想了解SparkSession每個(gè)參數(shù)的詳細(xì)解釋,請(qǐng)?jiān)L問pyspark.sql.SparkSession。
3、創(chuàng)建數(shù)據(jù)框架
一個(gè)DataFrame可被認(rèn)為是一個(gè)每列有標(biāo)題的分布式列表集合,與關(guān)系數(shù)據(jù)庫的一個(gè)表格類似。在這篇文章中,處理數(shù)據(jù)集時(shí)我們將會(huì)使用在PySpark API中的DataFrame操作。
你可以從https://www.kaggle.com/cmenca/new-york-times-hardcover-fiction-best-sellers中下載Kaggle數(shù)據(jù)集。
3.1、從Spark數(shù)據(jù)源開始
DataFrame可以通過讀txt,csv,json和parquet文件格式來創(chuàng)建。在本文的例子中,我們將使用.json格式的文件,你也可以使用如下列舉的相關(guān)讀取函數(shù)來尋找并讀取text,csv,parquet文件格式。
#Creates a spark data frame called as raw_data.#JSONdataframe = sc.read.json('dataset/nyt2.json')#TXT FILES#dataframe_txt = sc.read.text('text_data.txt')#CSV FILES#dataframe_csv = sc.read.csv('csv_data.csv')#PARQUET FILES#dataframe_parquet = sc.read.load('parquet_data.parquet')4、重復(fù)值
表格中的重復(fù)值可以使用dropDuplicates()函數(shù)來消除。
dataframe = sc.read.json('dataset/nyt2.json')dataframe.show(10)使用dropDuplicates()函數(shù)后,我們可觀察到重復(fù)值已從數(shù)據(jù)集中被移除。
dataframe_dropdup = dataframe.dropDuplicates() dataframe_dropdup.show(10)5、查詢
查詢操作可被用于多種目的,比如用“select”選擇列中子集,用“when”添加條件,用“l(fā)ike”篩選列內(nèi)容。接下來將舉例一些最常用的操作。完整的查詢操作列表請(qǐng)看Apache Spark文檔。
5.1、“Select”操作
可以通過屬性(“author”)或索引(dataframe[‘a(chǎn)uthor’])來獲取列。
#Show all entries in title columndataframe.select("author").show(10)#Show all entries in title, author, rank, price columnsdataframe.select("author", "title", "rank", "price").show(10)第一個(gè)結(jié)果表格展示了“author”列的查詢結(jié)果,第二個(gè)結(jié)果表格展示多列查詢。
5.2、“When”操作
在第一個(gè)例子中,“title”列被選中并添加了一個(gè)“when”條件。
# Show title and assign 0 or 1 depending on titledataframe.select("title",when(dataframe.title != 'ODD HOURS',1).otherwise(0)).show(10)展示特定條件下的10行數(shù)據(jù)
在第二個(gè)例子中,應(yīng)用“isin”操作而不是“when”,它也可用于定義一些針對(duì)行的條件。
# Show rows with specified authors if in the given optionsdataframe [dataframe.author.isin("John Sandford","Emily Giffin")].show(5)5行特定條件下的結(jié)果集
5.3、“Like”操作
在“Like”函數(shù)括號(hào)中,%操作符用來篩選出所有含有單詞“THE”的標(biāo)題。如果我們尋求的這個(gè)條件是精確匹配的,則不應(yīng)使用%算符。
# Show author and title is TRUE if title has " THE " word in titlesdataframe.select("author", "title",dataframe.title.like("% THE %")).show(15)title列中含有單詞“THE”的判斷結(jié)果集
5.4、“startswith”-“endswith”
StartsWith指定從括號(hào)中特定的單詞/內(nèi)容的位置開始掃描。類似的,EndsWith指定了到某處單詞/內(nèi)容結(jié)束。兩個(gè)函數(shù)都是區(qū)分大小寫的。
dataframe.select("author", "title",dataframe.title.startswith("THE")).show(5)dataframe.select("author", "title",dataframe.title.endswith("NT")).show(5)對(duì)5行數(shù)據(jù)進(jìn)行startsWith操作和endsWith操作的結(jié)果。
5.5、“substring”操作
Substring的功能是將具體索引中間的文本提取出來。在接下來的例子中,文本從索引號(hào)(1,3),(3,6)和(1,6)間被提取出來。
dataframe.select(dataframe.author.substr(1, 3).alias("title")).show(5)dataframe.select(dataframe.author.substr(3, 6).alias("title")).show(5)dataframe.select(dataframe.author.substr(1, 6).alias("title")).show(5)分別顯示子字符串為(1,3),(3,6),(1,6)的結(jié)果
6、增加,修改和刪除列
在DataFrame API中同樣有數(shù)據(jù)處理函數(shù)。接下來,你可以找到增加/修改/刪除列操作的例子。
6.1、增加列
# Lit() is required while we are creating columns with exactvalues.dataframe = dataframe.withColumn('new_column',F.lit('This is a new column'))display(dataframe)在數(shù)據(jù)集結(jié)尾已添加新列
6.2、修改列
對(duì)于新版DataFrame API,withColumnRenamed()函數(shù)通過兩個(gè)參數(shù)使用。
# Update column 'amazon_product_url' with 'URL'dataframe = dataframe.withColumnRenamed('amazon_product_url', 'URL')dataframe.show(5)“Amazon_Product_URL”列名修改為“URL”
6.3、刪除列
列的刪除可通過兩種方式實(shí)現(xiàn):在drop()函數(shù)中添加一個(gè)組列名,或在drop函數(shù)中指出具體的列。兩個(gè)例子展示如下。
dataframe_remove = dataframe.drop("publisher","published_date").show(5)dataframe_remove2=dataframe?.drop(dataframe.publisher).drop(dataframe.published_date).show(5)“publisher”和“published_date”列用兩種不同的方法移除。
7、數(shù)據(jù)審閱
存在幾種類型的函數(shù)來進(jìn)行數(shù)據(jù)審閱。接下來,你可以找到一些常用函數(shù)。想了解更多則需訪問Apache Spark doc。
# Returns dataframe column names and data typesdataframe.dtypes# Displays the content of dataframedataframe.show()# Return first n rowsdataframe.head()# Returns first rowdataframe.first()# Return first n rowsdataframe.take(5)# Computes summary statisticsdataframe.describe().show()# Returns columns of dataframedataframe.columns# Counts the number of rows in dataframedataframe.count()# Counts the number of distinct rows in dataframedataframe.distinct().count()# Prints plans including physical and logicaldataframe.explain(4)8、“GroupBy”操作
通過GroupBy()函數(shù),將數(shù)據(jù)列根據(jù)指定函數(shù)進(jìn)行聚合。
# Group by author, count the books of the authors in the groupsdataframe.groupBy("author").count().show(10)作者被以出版書籍的數(shù)量分組
9、“Filter”操作
通過使用filter()函數(shù),在函數(shù)內(nèi)添加條件參數(shù)應(yīng)用篩選。這個(gè)函數(shù)區(qū)分大小寫。
# Filtering entries of title# Only keeps records having value 'THE HOST'dataframe.filter(dataframe["title"] == 'THE HOST').show(5)標(biāo)題列經(jīng)篩選后僅存在有“THE HOST”的內(nèi)容,并顯示5個(gè)結(jié)果。
10、缺失和替換值
對(duì)每個(gè)數(shù)據(jù)集,經(jīng)常需要在數(shù)據(jù)預(yù)處理階段將已存在的值替換,丟棄不必要的列,并填充缺失值。pyspark.sql.DataFrameNaFunction庫幫助我們?cè)谶@一方面處理數(shù)據(jù)。舉例如下。
# Replacing null valuesdataframe.na.fill()dataFrame.fillna()dataFrameNaFunctions.fill()# Returning new dataframe restricting rows with null valuesdataframe.na.drop()dataFrame.dropna()dataFrameNaFunctions.drop()# Return new dataframe replacing one value with anotherdataframe.na.replace(5, 15)dataFrame.replace()dataFrameNaFunctions.replace()11、重分區(qū)
在RDD(彈性分布數(shù)據(jù)集)中增加或減少現(xiàn)有分區(qū)的級(jí)別是可行的。使用repartition(self,numPartitions)可以實(shí)現(xiàn)分區(qū)增加,這使得新的RDD獲得相同/更高的分區(qū)數(shù)。分區(qū)縮減可以用coalesce(self, numPartitions, shuffle=False)函數(shù)進(jìn)行處理,這使得新的RDD有一個(gè)減少了的分區(qū)數(shù)(它是一個(gè)確定的值)。請(qǐng)?jiān)L問Apache Spark doc獲得更多信息。
# Dataframe with 10 partitionsdataframe.repartition(10).rdd.getNumPartitions()# Dataframe with 1 partitiondataframe.coalesce(1).rdd.getNumPartitions()12、嵌入式運(yùn)行SQL查詢
原始SQL查詢也可通過在我們SparkSession中的“sql”操作來使用,這種SQL查詢的運(yùn)行是嵌入式的,返回一個(gè)DataFrame格式的結(jié)果集。請(qǐng)?jiān)L問Apache Spark doc獲得更詳細(xì)的信息。
# Registering a tabledataframe.registerTempTable("df")sc.sql("select * from df").show(3)sc.sql("select ??????????????CASE WHEN description LIKE '%love%' THEN 'Love_Theme' ??????????????WHEN description LIKE '%hate%' THEN 'Hate_Theme' ??????????????WHEN description LIKE '%happy%' THEN 'Happiness_Theme' ??????????????WHEN description LIKE '%anger%' THEN 'Anger_Theme' ??????????????WHEN description LIKE '%horror%' THEN 'Horror_Theme' ??????????????WHEN description LIKE '%death%' THEN 'Criminal_Theme' ??????????????WHEN description LIKE '%detective%' THEN 'Mystery_Theme' ??????????????ELSE 'Other_Themes' ??????????????END Themes ??????from df").groupBy('Themes').count().show()13、輸出
13.1、數(shù)據(jù)結(jié)構(gòu)
DataFrame API以RDD作為基礎(chǔ),把SQL查詢語句轉(zhuǎn)換為低層的RDD函數(shù)。通過使用.rdd操作,一個(gè)數(shù)據(jù)框架可被轉(zhuǎn)換為RDD,也可以把Spark Dataframe轉(zhuǎn)換為RDD和Pandas格式的字符串同樣可行。
# Converting dataframe into an RDDrdd_convert = dataframe.rdd# Converting dataframe into a RDD of stringdataframe.toJSON().first()# Obtaining contents of df as PandasdataFramedataframe.toPandas()不同數(shù)據(jù)結(jié)構(gòu)的結(jié)果
13.2、寫并保存在文件中
任何像數(shù)據(jù)框架一樣可以加載進(jìn)入我們代碼的數(shù)據(jù)源類型都可以被輕易轉(zhuǎn)換和保存在其他類型文件中,包括.parquet和.json。請(qǐng)?jiān)L問Apache Spark doc尋求更多保存、加載、寫函數(shù)的細(xì)節(jié)。
# Write & Save File in .parquet formatdataframe.select("author", "title", "rank", "description") .write .save("Rankings_Descriptions.parquet")當(dāng).write.save()函數(shù)被處理時(shí),可看到Parquet文件已創(chuàng)建。
#?Write & Save File in .json formatdataframe.select("author", "title") .write .save("Authors_Titles.json",format="json")當(dāng).write.save()函數(shù)被處理時(shí),可看到JSON文件已創(chuàng)建。
13.3、停止SparkSession
Spark會(huì)話可以通過運(yùn)行stop()函數(shù)被停止,如下。
# End Spark Sessionsc.stop()代碼和Jupyter Notebook可以在我的GitHub上找到。
歡迎提問和評(píng)論!
參考文獻(xiàn):
1. http://spark.apache.org/docs/latest/
2. https://docs.anaconda.com/anaconda/
原文標(biāo)題:
PySpark and SparkSQL Basics
How to implement Spark with Python Programming
原文鏈接:
https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53
編輯:于騰凱
校對(duì):洪舒越
譯者簡介
孫韜淳,首都師范大學(xué)大四在讀,主修遙感科學(xué)與技術(shù)。目前專注于基本知識(shí)的掌握和提升,期望在未來有機(jī)會(huì)探索數(shù)據(jù)科學(xué)在地學(xué)應(yīng)用的眾多可能性。愛好之一為翻譯創(chuàng)作,在業(yè)余時(shí)間加入到THU數(shù)據(jù)派平臺(tái)的翻譯志愿者小組,希望能和大家一起交流分享,共同進(jìn)步。
—完—
關(guān)注清華-青島數(shù)據(jù)科學(xué)研究院官方微信公眾平臺(tái)“ THU數(shù)據(jù)派 ”及姊妹號(hào)“ 數(shù)據(jù)派THU ”獲取更多講座福利及優(yōu)質(zhì)內(nèi)容。
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的dataframe 如何选中某列的一行_PySpark和SparkSQL基础:如何利用Python编程执行Spark(附代码)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java变位词_GoLang 字符串变位
- 下一篇: arcgis怎么用python重新排序_