Spark的基本架构
http://ihoge.cn/2018/IntroductionToSpark.html
Spark的基本架構(gòu)
當(dāng)單機(jī)沒有足夠的能力和資源來執(zhí)行大量信息的計算(或者低延遲計算),這時就需要一個集群或一組機(jī)器將許多機(jī)器的資源集中在一起,使我們可以使用全部累積的在一起的計算和存儲資源。現(xiàn)在只有一組機(jī)器不夠強(qiáng)大,你需要一個框架來協(xié)調(diào)他們之間的工作。 Spark是一種工具,可以管理和協(xié)調(diào)跨計算機(jī)集群執(zhí)行數(shù)據(jù)任務(wù)。
Spark用于執(zhí)行任務(wù)的機(jī)器集群可以由Spark的Standalone,YARN或Mesos等集群管理器進(jìn)行管理。然后,我們向這些集群管理器提交Spark應(yīng)用程序,這些集群管理器將資源授予我們的應(yīng)用程序,以便我們完成我們的工作。
1. Spark Application
Spark應(yīng)用程序由一個驅(qū)動程序進(jìn)程和一組執(zhí)行程序進(jìn)程組成。Driver進(jìn)程運(yùn)行main()函數(shù),位于集群中的一個節(jié)點上,它負(fù)責(zé)三件事:維護(hù)Spark應(yīng)用程序的相關(guān)信息;回應(yīng)用戶的程序或輸入;分配和安排Executors之間的工作。驅(qū)動程序過程是絕對必要的 - 它是Spark應(yīng)用程序的核心,并在應(yīng)用程序的生命周期中保留所有相關(guān)信息。
Executor負(fù)責(zé)實際執(zhí)行Driver分配給他們的工作。這意味著,每個Executor只有兩個任務(wù):執(zhí)行由驅(qū)動程序分配給它的代碼,并將該執(zhí)行程序的計算狀態(tài)報告給驅(qū)動程序節(jié)點。
群集管理器控制物理機(jī)器并為Spark應(yīng)用程序分配資源。這可以是幾個核心集群管理員之一:Spark的Standalone,YARN或Mesos。這意味著可以同時在群集上運(yùn)行多個Spark應(yīng)用程序。
在前面的插圖中,左側(cè)是我們的driver,右側(cè)是四個executors。在該圖中,我們刪除了群集節(jié)點的概念。用戶可以通過配置指定有多少執(zhí)行者應(yīng)該落在每個節(jié)點上。
- Spark有一些集群管理器,負(fù)責(zé)調(diào)度可用資源。
- 驅(qū)動程序進(jìn)程負(fù)責(zé)執(zhí)行執(zhí)行程序中的驅(qū)動程序命令,以完成我們的任務(wù)。
2. Spark’s APIs
盡管我們的executor大多會一直運(yùn)行Spark代碼。但我們?nèi)匀豢梢酝ㄟ^Spark的語言API用多種不同語言運(yùn)行Spark代碼。大多數(shù)情況下,Spark會在每種語言中提供一些核心“concepts”,并將不同語言的代碼譯成運(yùn)行在機(jī)器集群上的Spark代碼。
Spark有兩套基本的API:低級非結(jié)構(gòu)化(Unstructured)API和更高級別的結(jié)構(gòu)化(Structured)API。
3. SparkSession
我們通過驅(qū)動程序來控制Spark應(yīng)用程序。該驅(qū)動程序進(jìn)程將自身作為名為SparkSession并作為唯一的接口API對象向用戶開放。 SparkSession實例是Spark在群集中執(zhí)行用戶定義操作的方式。 SparkSession和Spark應(yīng)用程序之間有一對一的對應(yīng)關(guān)系。在Scala和Python中,變量在啟動控制臺時可用作spark。讓我們看下簡單的Scala和/或Python中的SparkSession。
4. Dataframe
DataFrame是最常見的Structured API(結(jié)構(gòu)化API),只是表示有類型的包含行和列的數(shù)據(jù)表。一個簡單的比喻就是一個帶有命名列的電子表格。其根本區(qū)別在于,當(dāng)電子表格位于一臺計算機(jī)上某個特定位置時,Spark DataFrame可以跨越數(shù)千臺計算機(jī)。將數(shù)據(jù)放在多臺計算機(jī)上的原因無非有兩種:數(shù)據(jù)太大而無法放在一臺計算機(jī)上,或者在一臺計算機(jī)上執(zhí)行計算所需的時間太長。
DataFrame概念并不是Spark獨有的。 R和Python都有相似的概念。但是,Python / R DataFrame(有一些例外)存在于一臺機(jī)器上,而不是多臺機(jī)器上。這限制了您可以對python和R中給定的DataFrame執(zhí)行的操作與該特定機(jī)器上存在的資源進(jìn)行對比。但是,由于Spark具有適用于Python和R的Spark’s Language APIs,因此將Pandas(Python)DataFrame轉(zhuǎn)換為Spark DataFrame和R DataFrame轉(zhuǎn)換為Spark DataFrame(R)非常容易。
注意
Spark有幾個核心抽象:Datasets, DataFrames, SQL Tables,和彈性分布式數(shù)據(jù)集(RDD)。這些抽象都表示分布式數(shù)據(jù)集合,但它們有不同的接口來處理這些數(shù)據(jù)。最簡單和最有效的是DataFrames,它可以用于所有語言。
5. Partitions
為了允許每個執(zhí)行者并行執(zhí)行工作,Spark將數(shù)據(jù)分解成稱為分區(qū)的塊。分區(qū)是位于集群中的一臺物理機(jī)上的一組行。 DataFrame的分區(qū)表示數(shù)據(jù)在執(zhí)行過程中如何在整個機(jī)器群中物理分布。如果你有一個分區(qū),即使你有數(shù)千個執(zhí)行者,Spark也只會有一個分區(qū)。如果有多個分區(qū),但只有一個執(zhí)行程序Spark仍然只有一個并行性,因為只有一個計算資源。
值得注意的是,使用DataFrames,我們不會(大部分)操作 手動分區(qū)(基于個人)。我們只需指定物理分區(qū)中數(shù)據(jù)的高級轉(zhuǎn)換,并且Spark確定此工作將如何在集群上實際執(zhí)行。較低級別的API確實存在(通過彈性分布式數(shù)據(jù)集接口)。
6. Transformations
在Spark中,核心數(shù)據(jù)結(jié)構(gòu)是不可改變的,這意味著一旦創(chuàng)建它們就不能更改。起初,這可能看起來像一個奇怪的概念,如果你不能改變它,你應(yīng)該如何使用它?為了“更改”DataFrame,您必須指示Spark如何修改您所需的DataFrame。這些說明被稱為轉(zhuǎn)換。
轉(zhuǎn)換操作沒有返回輸出,這是因為我們只指定了一個抽象轉(zhuǎn)換,并且Spark不會在轉(zhuǎn)換之前采取行動,直到我們執(zhí)行一個動作。Transformations是如何使用Spark來表達(dá)業(yè)務(wù)邏輯的核心。Spark有兩種類型的Transformations,一種是窄依賴轉(zhuǎn)換關(guān)系,一種是寬依賴轉(zhuǎn)換關(guān)系。
寬依賴指輸入分區(qū)對多輸出分區(qū)起作用(多個孩子)。這被稱為shuffle,Spark將在群集之間交換分區(qū)。對于窄依賴轉(zhuǎn)換,Spark將自動執(zhí)行稱為流水線的操作,這意味著如果我們在DataFrame上指定了多個過濾器,它們將全部在內(nèi)存中執(zhí)行。當(dāng)我們執(zhí)行shuffle時,Spark會將結(jié)果寫入磁盤。
7. Lazy Evaluation
Lazy Evaluation意味著Spark將等到執(zhí)行計算指令圖的最后時刻。在Spark中,我們不是在表達(dá)某些操作時立即修改數(shù)據(jù),而是建立起來應(yīng)用于源數(shù)據(jù)的轉(zhuǎn)換計劃。Spark將把原始DataFrame轉(zhuǎn)換計劃編譯為一個高效的物理計劃,該計劃將在群集中盡可能高效地運(yùn)行。這為最終用戶帶來了巨大的好處,因為Spark可以優(yōu)化整個數(shù)據(jù)流從端到端。這方面的一個例子就是所謂的“predicate pushdown” DataFrames。如果我們構(gòu)建一個大的Spark作業(yè),但在最后指定了一個過濾器,只需要我們從源數(shù)據(jù)中獲取一行,則執(zhí)行此操作的最有效方法就是訪問我們需要的單個記錄。 Spark實際上會通過自動推低濾波器來優(yōu)化這一點。
8. Actions
轉(zhuǎn)換使我們能夠建立我們的邏輯計劃。為了觸發(fā)計算,我們需要一個動作操作。一個動作指示Spark計算一系列轉(zhuǎn)換的結(jié)果。
在指定我們的操作時,我們開始了一個Spark作業(yè),它運(yùn)行我們的過濾器轉(zhuǎn)換(一個窄依賴轉(zhuǎn)換),然后是一個聚合(一個寬依賴轉(zhuǎn)換),它在每個分區(qū)的基礎(chǔ)上執(zhí)行計數(shù),然后一個collect將我們的結(jié)果帶到各自語言的本地對象。我們可以通過檢查Spark UI(http://localhost:4040)來看到所有這些,Spark UI是一個包含在Spark中的工具,它允許我們監(jiān)視集群上運(yùn)行的Spark作業(yè)。
9. Dataframe & SQL
Spark SQL是Spark為結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)處理設(shè)計的最受歡迎的模塊之一。 Spark SQL允許用戶使用SQL或可在Java,Scala,Python和R中使用的DataFrame和Dataset API來查詢Spark程序中的structured data。由于DataFrame API提供了一種統(tǒng)一的方法來訪問各種的數(shù)據(jù)源(包括Hive datasets,Avro,Parquet,ORC,JSON和JDBC),用戶能夠以相同方式連接到任何數(shù)據(jù)源,并將這些多個數(shù)據(jù)源連接在一起。 Spark SQL使用Hive meta store為用戶提供了與現(xiàn)有Hive數(shù)據(jù),查詢和UDF完全兼容的功能。用戶可以無縫地 在Spark上無需修改即可運(yùn)行其當(dāng)前的Hive工作負(fù)載。
Spark SQL也可以通過spark-sql shell來訪問,現(xiàn)有的業(yè)務(wù)工具可以通過標(biāo)準(zhǔn)的JDBC和ODBC接口進(jìn)行連接。
現(xiàn)在我們通過一個示例并在DataFrame和SQL中進(jìn)行跟蹤。不管語言如何,以完全相同的方式啟動相同的轉(zhuǎn)換。您可以在SQL或DataFrames(R,Python,Scala或Java)中表達(dá)業(yè)務(wù)邏輯,并且在實際執(zhí)行代碼之前,Spark會將該邏輯編譯計劃優(yōu)化并最終生成最優(yōu)的物理計劃。 Spark SQL允許您作為用戶將任何DataFrame注冊為表或視圖(臨時表),并使用純SQL查詢它。編寫SQL查詢或編寫DataFrame代碼之間沒有性能差異 都“編譯”到我們在DataFrame代碼中指定的相同底層計劃。
通過一個簡單的方法調(diào)用就可以將任何DataFrame制作成表格或視圖。
With SQl
With DataFrame
現(xiàn)在有7個步驟將我們帶回源數(shù)據(jù)。您可以在這些DataFrame的解釋計劃中看到這一點。以上圖解說明了我們在“代碼”中執(zhí)行的一系列步驟。真正的執(zhí)行計劃(解釋中可見的執(zhí)行計劃)將與下面的執(zhí)行計劃有所不同,因為在物理執(zhí)行方面進(jìn)行了優(yōu)化,然而,該執(zhí)行計劃與任何計劃一樣都是起點。這個執(zhí)行計劃是一個有向無環(huán)圖(DAG)的轉(zhuǎn)換,每個轉(zhuǎn)換產(chǎn)生一個新的不可變DataFrame,我們在這個DataFrame上調(diào)用一個動作來產(chǎn)生一個結(jié)果。
1. 第一步是讀取數(shù)據(jù)。但是Spark實際上并沒有讀取它(Lazy Evaluation)
2. 第二步是我們的分組,在技術(shù)上,當(dāng)我們調(diào)用groupBy時,我們最終得到了一個RelationalGroupedDataset,它是DataFrame的一個奇特名稱,該DataFrame具有指定的分組,但需要用戶在可以進(jìn)一步查詢之前指定聚合。
3. 因此第三步是指定聚合。我們使用總和聚合方法。這需要輸入一列 表達(dá)式或簡單的列名稱。 sum方法調(diào)用的結(jié)果是一個新的dataFrame。你會看到它有一個新的模式,但它知道每個列的類型。(再次強(qiáng)調(diào)!)這里沒有執(zhí)行計算是非常重要的。這只是我們表達(dá)的另一種轉(zhuǎn)換,Spark僅僅能夠跟蹤我們提供的類型信息。
4. 第四步是簡化語言,我們使用withColumnRename給原始列重新定義新名稱。當(dāng)然,這不會執(zhí)行計算 - 這只是另一種轉(zhuǎn)換!
5. 第五步導(dǎo)入一個函數(shù)對數(shù)據(jù)進(jìn)行排序,即desc函數(shù)。從destination_total列中找到的最大值。
6. 第六步,我們將指定一個限制。這只是說明我們只需要五個值。這就像一個過濾器,只是它按位置而不是按值過濾。可以肯定地說,它基本上只是指定了一定大小的DataFrame。
7. 最后一步是我們的行動!現(xiàn)在我們實際上開始收集上面的DataFrame結(jié)果的過程,Spark將以我們正在執(zhí)行的語言返回一個列表或數(shù)組。現(xiàn)在我們看下它的解釋計劃。
雖然這個解釋計劃與我們確切的“概念計劃”不符,但所有的部分都在那里。可以看到limit語句以及orderBy(在第一行)。你也可以看到我們的聚合是如何在partial_sum調(diào)用中的兩個階段發(fā)生的。這是因為數(shù)字列表是可交換的,并且Spark可以執(zhí)行sum()并按分區(qū)進(jìn)行劃分。當(dāng)然,我們也可以看到我們?nèi)绾卧贒ataFrame中讀取數(shù)據(jù)。同時我們也可以將它寫出到Spark支持的任何數(shù)據(jù)源中。例如,假設(shè)我們想要將這些信息存儲在PostgreSQL等數(shù)據(jù)庫中,或者將它們寫入另一個文件。
本文永久地址,轉(zhuǎn)載注明出處!
http://ihoge.cn/2018/IntroductionToSpark.html
總結(jié)
以上是生活随笔為你收集整理的Spark的基本架构的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 免安装免配置 还免费的Spark 集群
- 下一篇: Spark ML - 协同过滤