hadoop组件---spark----全面了解spark以及与hadoop的区别
Spark是什么
Spark (全稱 Apache Spark?) 是一個專門處理大數(shù)據(jù)量分析任務(wù)的通用數(shù)據(jù)分析引擎。
spark官網(wǎng)
Spark核心代碼是用scala語言開發(fā)的,不過支持使用多種語言進行開發(fā)調(diào)用比如scala,java,python。
spark github
Spark文檔2.4.4
Spark目前有比較完整的數(shù)據(jù)處理生態(tài)組件,可以部署在多種系統(tǒng)環(huán)境中,同時支持處理多種數(shù)據(jù)源。
Spark發(fā)展歷史
2009年,Spark誕生于伯克利大學(xué)AMPLab,屬于伯克利大學(xué)的研究性項目;
2010年,通過BSD 許可協(xié)議正式對外開源發(fā)布;
2012年,Spark第一篇論文發(fā)布,第一個正式版(Spark 0.6.0)發(fā)布;
2013年,成為了Aparch基金項目,進入高速發(fā)展期。第三方開發(fā)者貢獻了大量的代碼,活躍度非常高;發(fā)布Spark Streaming、Spark Mllib(機器學(xué)習(xí))、Shark(Spark on Hadoop);
2014 年,Spark 成為 Apache 的頂級項目; 5 月底 Spark1.0.0 發(fā)布;發(fā)布 Spark Graphx(圖計算)、Spark SQL代替Shark;
2015年,推出DataFrame(大數(shù)據(jù)分析);2015年至今,Spark在國內(nèi)IT行業(yè)變得愈發(fā)火爆,大量的公司開始重點部署或者使用Spark來替代MapReduce、Hive、Storm等傳統(tǒng)的大數(shù)據(jù)計算框架;
2016年,Spark 2.0.0版本發(fā)布,推出dataset(更強的數(shù)據(jù)分析手段);
2017年,structured streaming 發(fā)布;
2018年,Spark2.4.0發(fā)布,成為全球最大的開源項目。
截至 2020年1月15號 目前最穩(wěn)定的最后發(fā)布版本為 Spark 2.4.4。
還有一個 新值得期待的 預(yù)發(fā)布版本 Spark 3.0 主要 是增加了 與k8s等云結(jié)合使用的特性。
特點
1、速度快,適合實時分析場景
Spark基于內(nèi)存進行計算(當然也有部分計算基于磁盤,比如shuffle),在運算方面是hadoop運算速度的一百多倍。
2、容易上手開發(fā)
Spark的基于RDD的計算模型,比Hadoop的基于Map-Reduce的計算模型要更加易于理解,更加易于上手開發(fā),實現(xiàn)各種復(fù)雜功能,比如二次排序、topN等復(fù)雜操作時,更加便捷。
3、支持多種語言
Spark提供Java,Scala,Python和R中的高級API .Spark代碼可以用任何這些語言編寫。 它在Scala和Python中提供了一個shell。 可以通過./bin/spark-shell和Python shell通過./bin/pyspark從已安裝的目錄訪問Scala shell。
4、支持多種格式的數(shù)據(jù)來源
Spark支持多種數(shù)據(jù)源,如Parquet,JSON,HDFS、Hbase、Hive和Cassandra,Alluxio,CSV和RDBMS表,還包括通常的格式,如文本文件、CSV和RDBMS表,甚至一些云存儲比如S3等。 Data Source API提供了一種可插拔的機制,用于通過Spark SQL獲取結(jié)構(gòu)化數(shù)據(jù)。
5、超強的通用性
Spark提供了Spark RDD、Spark SQL、Spark Streaming、Spark MLlib、Spark GraphX等技術(shù)組件,可以一站式地完成大數(shù)據(jù)領(lǐng)域的離線批處理、交互式查詢、流式計算、機器學(xué)習(xí)、圖計算等常見的任務(wù)。
6、集成Hadoop
Spark并不是要成為一個大數(shù)據(jù)領(lǐng)域的“獨裁者”,一個人霸占大數(shù)據(jù)領(lǐng)域所有的“地盤”,而是與Hadoop進行了高度的集成,兩者可以完美的配合使用。Hadoop的HDFS、Hive、HBase負責存儲,YARN負責資源調(diào)度;Spark負責大數(shù)據(jù)計算。實際上,Hadoop+Spark的組合,是一種“double win”的組合。
7、可以在任何環(huán)境下搭建
spark框架可以運行在各種操作系統(tǒng)上。
最初Spark作為hadoop的一個計算框架組件而發(fā)布,現(xiàn)在慢慢長大,可以獨立運行了。意味著 我們不搭建Hadoop集群也能 獨立的安裝運行Spark。
除了運行在Hadoop集群中,
目前Spark支持
(一)local本地模式
只需要一臺機器,運行該模式非常簡單,只需要把Spark的安裝包解壓后,默認也不需修改任何配置文件,取默認值。不用啟動Spark的Master、Worker守護進程( 只有集群的Standalone方式時,才需要這兩個角色),也不用啟動Hadoop的各服務(wù)(除非你要用到HDFS)。
運行客戶端程序(可以是spark自帶的命令行程序,如spark-shell,也可以是程序員利用spark api編寫的程序),就可以完成相應(yīng)的運行。相當于這一個客戶端進程,充當了所有的角色。
這種模式,只適合開發(fā)階段使用,我們可以在該模式下開發(fā)和測試代碼,使的代碼的邏輯沒問題,后面再提交到集群上去運行和測試。
如果是學(xué)習(xí)或者做測試,為了搭建環(huán)境的簡化,可以搭建本地模式。
在實際生產(chǎn)環(huán)境,spark會采用集群模式來運行,即分布式式運行,spark可以使用多種集群資源管理器來管理自己的集群。
(二)獨立的Spark集群standalone模式
Standalone模式,即獨立模式,自帶完整的服務(wù),使用spark自帶的集群資源管理功能。可單獨部署到一個集群中,無需依賴任何其他資源管理系統(tǒng)。即每臺機器上只需部署下載的Spark版本即可。
這種模式需要提前啟動spark的master和Worker守護進程,才能運行spark客戶端程序。
因為Standalone模式不需要依賴任何第三方組件,如果數(shù)據(jù)量比較小,且不需要hadoop(如不需要訪問hdfs服務(wù)),則使用Standalone模式是一種可選的簡單方便的方案。
(三)在aws的ec2中安裝
這種模式類似于Standalone模式,不過部署的集群是aws的ec2服務(wù)器,需要有一些 權(quán)限方面的配置,在GitHub中有專門針對 ec2中部署spark的腳本項目, 可以直接根據(jù)其中的步驟進行部署。
(四)使用yarn進行管理
該模式,使用hadoop的YARN作為集群資源管理器。這種模式下因為使用yarn的服務(wù)進行資源管理,所以不需要啟動Spark的Master、Worker守護進程。
如果你的應(yīng)用不僅使用spark,還用到hadoop生態(tài)圈的其它服務(wù),從兼容性上考慮,使用Yarn作為統(tǒng)一的資源管理是更好的選擇,這樣選擇這種模式就比較適合。
目前spark on yarn的部署方式 最為常用。
(五)使用mesos進行管理
該模式,使用Mesos作為集群資源管理器。如果你的應(yīng)用還使用了docker,則選擇此模式更加通用。
(六)使用k8s進行管理
Spark本身的設(shè)計更偏向使用靜態(tài)的資源管理,雖然Spark也支持了類似Yarn等動態(tài)的資源管理器,但是這些資源管理并不是面向動態(tài)的云基礎(chǔ)設(shè)施而設(shè)計的,在速度、成本、效率等領(lǐng)域缺乏解決方案。
隨著Kubernetes的快速發(fā)展,數(shù)據(jù)科學(xué)家們開始考慮是否可以用Kubernetes的彈性與面向云原生等特點與Spark進行結(jié)合。
在Spark 2.3中,Resource Manager中添加了Kubernetes原生的支持。
意味著 我們可以使用k8s對Spark進行管理了,而且能運用云的特性,很好的進行集群伸縮,降低我們的成本以及當運算資源不足時快速增加節(jié)點。
(七) 偽分布集群模式
即在一臺機器上模擬集群下的分布式場景,會啟動多個進程。上述的集群模式都可以啟動偽分布式集群模式,當然要求機器的配置滿足要求。
這種模式主要是開發(fā)階段和學(xué)習(xí)使用。
8、極高的社區(qū)活躍度
Spark目前是Apache基金會的頂級項目,全世界有大量的優(yōu)秀工程師是Spark的committer。并且世界上很多頂級的IT公司都在大規(guī)模地使用Spark。
spark的使用場景
物聯(lián)網(wǎng)領(lǐng)域: 通過物聯(lián)網(wǎng)的設(shè)備收集到海量的數(shù)據(jù),比如環(huán)境監(jiān)控,海洋監(jiān)控,地震預(yù)測等,需要及時的處理反饋。
大健康領(lǐng)域: 用戶健康生活與遺傳信息基因等數(shù)據(jù)的分析,反饋健康方面的信息給用戶
醫(yī)療保健:醫(yī)療保健領(lǐng)域使用實時分析來持續(xù)檢查關(guān)鍵患者的醫(yī)療狀況。尋找血液和器官移植的醫(yī)院需要在緊急情況下保持實時聯(lián)系。及時就醫(yī)是患者生死攸關(guān)的問題。
政府:政府機構(gòu)主要在國家安全領(lǐng)域進行實時分析。各國需要不斷跟蹤警察和安全機構(gòu)對于威脅的更新。
電信:以電話,視頻聊天和流媒體實時分析等形式圍繞服務(wù)的公司,以減少客戶流失并保持領(lǐng)先競爭優(yōu)勢。他們還提取移動網(wǎng)絡(luò)的測量結(jié)果。
銀行業(yè)務(wù):銀行業(yè)務(wù)幾乎涉及全球所有資金。確保整個系統(tǒng)的容錯事務(wù)變得非常重要。通過銀行業(yè)務(wù)的實時分析,可以實現(xiàn)欺詐檢測。
股票市場:股票經(jīng)紀人使用實時分析來預(yù)測股票投資組合的變動。公司通過使用實時分析來推銷其品牌的市場需求,從而重新思考其業(yè)務(wù)模式。
使用spark的公司和項目也非常多,可以參考官網(wǎng)列表
Project and Product names using
hadoop和spark的關(guān)系與區(qū)別
Spark作為Hadoop生態(tài)中重要的一員,其發(fā)展速度堪稱恐怖,不過其作為一個完整的技術(shù)棧,在技術(shù)和環(huán)境的雙重刺激下,得到如此多的關(guān)注也是有依據(jù)的。
Spark核心在于內(nèi)存計算模型代替Hadoop生態(tài)的MapReduce離線計算模型,用更加豐富Transformation和Action算子來替代map,reduce兩種算子。
計算流程的區(qū)別
Hadoop這項大數(shù)據(jù)處理技術(shù)大概已有十年歷史,而且被看做是首選的大數(shù)據(jù)集合處理的解決方案。
MapReduce是單流程的優(yōu)秀解決方案,不過對于需要多流程計算和算法的用例來說,并非十分高效。
數(shù)據(jù)處理流程中的每一步都需要一個Map階段和一個Reduce階段,而且如果要利用這一解決方案,需要將所有用例都轉(zhuǎn)換成MapReduce模式。
在下一步開始之前,上一步的作業(yè)輸出數(shù)據(jù)必須要存儲到分布式文件系統(tǒng)中。因此,復(fù)制和磁盤存儲會導(dǎo)致這種方式速度變慢。
另外Hadoop解決方案中通常會包含難以安裝和管理的集群。而且為了處理不同的大數(shù)據(jù)用例,還需要集成多種不同的工具(如用于機器學(xué)習(xí)的Mahout和流數(shù)據(jù)處理的Storm)。
如果想要完成比較復(fù)雜的工作,就必須將一系列的MapReduce作業(yè)串聯(lián)起來然后順序執(zhí)行這些作業(yè)。每一個作業(yè)都是高時延的,而且只有在前一個作業(yè)完成之后下一個作業(yè)才能開始啟動。
而Spark則允許程序開發(fā)者使用有向無環(huán)圖(DAG)開發(fā)復(fù)雜的多步數(shù)據(jù)管道。而且還支持跨有向無環(huán)圖的內(nèi)存數(shù)據(jù)共享,以便不同的作業(yè)可以共同處理同一個數(shù)據(jù)。
Spark運行在現(xiàn)有的Hadoop分布式文件系統(tǒng)基礎(chǔ)之上(HDFS)提供額外的增強功能。
它支持將Spark應(yīng)用部署到現(xiàn)存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。
我們應(yīng)該將Spark看作是Hadoop MapReduce的一個替代品而不是Hadoop的替代品。其意圖并非是替代Hadoop,而是為了提供一個管理不同的大數(shù)據(jù)用例和需求的全面且統(tǒng)一的解決方案。
關(guān)鍵區(qū)別
hadoop是批處理工具,更擅長處理離線數(shù)據(jù),而spark在內(nèi)存中處理數(shù)據(jù),可以是實時處理。
Hadoop基于大數(shù)據(jù)的批處理。 這意味著數(shù)據(jù)會在一段時間內(nèi)先存儲下來,然后使用Hadoop進行處理。
在Spark中,處理可以實時進行。
Spark中的這種實時處理能力幫助我們解決實時分析問題。
除此之外,Spark能夠比Hadoop MapReduce( Hadoop處理框架)快100倍地進行批處理。
因此,目前Apache Spark是業(yè)界大數(shù)據(jù)處理的首選工具。
hadoop和spark發(fā)展的歷史故事參考
https://www.zhihu.com/question/23036370?sort=created
組件框架的區(qū)別
針對核心關(guān)鍵的功能 ,Hadoop和Spark都發(fā)展出了相應(yīng)的組件
| 處理引擎 | Mapreduce | Spark RDD(Spark Core) |
| 交互式查詢 | Hive | Spark SQL |
| 實時流計算 | Storm | Spark Streaming |
| 機器學(xué)習(xí) | Mahout | MLlib |
| 圖計算 | Hama或者 Giraph | GraphX |
Spark相關(guān)概念
Spark Shell
Spark的shell提供了一種學(xué)習(xí)API的簡單方法,以及一種以交互方式分析數(shù)據(jù)的強大工具。
Spark Session
在早期版本的Spark中,Spark Context是Spark的入口點。 對于每個其他API,我們需要使用不同的上下文。 對于流式傳輸,我們需要StreamingContext,SQL sqlContext和hive HiveContext。 為了解決這個問題,SparkSession應(yīng)運而生。 它本質(zhì)上是SQLContext,HiveContext和StreamingContext的組合。
數(shù)據(jù)源
Data Source API提供了一種可插拔的機制,用于通過Spark SQL訪問結(jié)構(gòu)化數(shù)據(jù)。 Data Source API用于將結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)讀取并存儲到Spark SQL中。 數(shù)據(jù)源不僅僅是簡單的管道,可以轉(zhuǎn)換數(shù)據(jù)并將其拉入Spark。
RDD
彈性分布式數(shù)據(jù)集(RDD)是Spark的基本數(shù)據(jù)結(jié)構(gòu)。 它是一個不可變的分布式對象集合。 RDD中的每個數(shù)據(jù)集被劃分為邏輯分區(qū),其可以在集群的不同節(jié)點上計算。 RDD可以包含任何類型的Python,Java或Scala對象,包括用戶定義的類。
RDD可被分發(fā)到集群各個節(jié)點上,進行并行操作。RDDs 可以通過 Hadoop InputFormats 創(chuàng)建(如 HDFS),或者從其他 RDDs 轉(zhuǎn)化而來。
獲得RDD的三種方式:
Parallelize:將一個存在的集合,變成一個RDD,這種方式試用于學(xué)習(xí)spark和做一些spark的測試
>>>sc.parallelize(['cat','apple','bat’])MakeRDD:只有scala版本才有此函數(shù),用法與parallelize類似
textFile:從外部存儲中讀取數(shù)據(jù)來創(chuàng)建 RDD
>>>sc.textFile(“file\\\usr\local\spark\README.md”)RDD的兩個特性:不可變;分布式。
RDD支持兩種操作;
Transformation(轉(zhuǎn)化操作:返回值還是RDD)如map(),filter()等。這種操作是lazy(惰性)的,即從一個RDD轉(zhuǎn)換生成另一個RDD的操作不是馬上執(zhí)行,只是記錄下來,只有等到有Action操作是才會真正啟動計算,將生成的新RDD寫到內(nèi)存或hdfs里,不會對原有的RDD的值進行改變;
Action(行動操作:返回值不是RDD)會實際觸發(fā)Spark計算,對RDD計算出一個結(jié)果,并把結(jié)果返回到內(nèi)存或hdfs中,如count(),first()等。
RDD的緩存策略
Spark最為強大的功能之一便是能夠把數(shù)據(jù)緩存在集群的內(nèi)存里。這通過調(diào)用RDD的cache函數(shù)來實現(xiàn):rddFromTextFile.cache,
調(diào)用一個RDD的cache函數(shù)將會告訴Spark將這個RDD緩存在內(nèi)存中。在RDD首次調(diào)用一個執(zhí)行操作時,這個操作對應(yīng)的計算會立即執(zhí)行,數(shù)據(jù)會從數(shù)據(jù)源里讀出并保存到內(nèi)存。因此,首次調(diào)用cache函數(shù)所需要的時間會部分取決于Spark從輸入源讀取數(shù)據(jù)所需要的時間。但是,當下一次訪問該數(shù)據(jù)集的時候,數(shù)據(jù)可以直接從內(nèi)存中讀出從而減少低效的I/O操作,加快計算。多數(shù)情況下,這會取得數(shù)倍的速度提升。
廣播變量
廣播變量(broadcast variable)為只讀變量,它由運行SparkContext的驅(qū)動程序創(chuàng)建后發(fā)送給會參與計算的節(jié)點。對那些需要讓各工作節(jié)點高效地訪問相同數(shù)據(jù)的應(yīng)用場景,比如機器學(xué)習(xí),這非常有用。Spark下創(chuàng)建廣播變量只需在SparkContext上調(diào)用一個方法即可:
>>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))累加器Accumulator
在Spark中如果想在Task計算的時候統(tǒng)計某些事件的數(shù)量,使用filter/reduce也可以,但是使用累加器是一種更方便的方式,累加器一個比較經(jīng)典的應(yīng)用場景是用來在Spark Streaming應(yīng)用中記錄某些事件的數(shù)量。
使用累加器時需要注意只有Driver能夠取到累加器的值,Task端進行的是累加操作。
創(chuàng)建的Accumulator變量的值能夠在Spark Web UI上看到,在創(chuàng)建時應(yīng)該盡量為其命名
Spark內(nèi)置了三種類型的Accumulator,分別是LongAccumulator用來累加整數(shù)型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。
后續(xù)我們會記錄累加器的用法。
Dataset
Dataset是分布式數(shù)據(jù)集合。 數(shù)據(jù)集可以從JVM對象構(gòu)造,然后使用功能轉(zhuǎn)換(map,flatMap,filter等)進行操作。 數(shù)據(jù)集API在Scala和Java中可用。
DataFrames
DataFrame是命名列組織成數(shù)據(jù)集。 它在概念上等同于關(guān)系數(shù)據(jù)庫中的表或R / Python中的數(shù)據(jù)框,但在引擎蓋下具有更豐富的優(yōu)化。 DataFrame可以從多種來源構(gòu)建,例如:結(jié)構(gòu)化數(shù)據(jù)文件,Hive中的表,外部數(shù)據(jù)庫或現(xiàn)有RDD。
RDD、Dataframe、DataSet區(qū)別
spark中 RDD、DataFrame、Dataset的關(guān)系及區(qū)別 以及相互轉(zhuǎn)換
Spark 組件
Spark組件使Apache Spark快速可靠。 構(gòu)建了很多這些Spark組件來解決使用Hadoop MapReduce時出現(xiàn)的問題。 Apache Spark具有以下組件:
Spark Core
Spark Streaming
Spark SQL
GraphX
MLlib (Machine Learning)
用戶使用的SQL、Streaming、MLib、GraphX接口最終都會轉(zhuǎn)換成Spark Core分布式運行。
Spark Core
Spark Core是大規(guī)模并行和分布式數(shù)據(jù)處理的基礎(chǔ)引擎。 核心是分布式執(zhí)行引擎,Java,Scala和Python API為分布式ETL應(yīng)用程序開發(fā)提供了一個平臺。 此外,在核心上構(gòu)建的其他庫允許用于流式傳輸,SQL和機器學(xué)習(xí)的各種工作負載。 它負責:
內(nèi)存管理和故障恢復(fù)
在群集上調(diào)度,分發(fā)和監(jiān)視作業(yè)
與存儲系統(tǒng)交互
Spark Streaming
Spark Streaming是Spark的組件,用于處理實時流數(shù)據(jù)。 因此,它是核心Spark API的補充。 它支持實時數(shù)據(jù)流的高吞吐量和容錯流處理。 基本流單元是DStream,它基本上是一系列用于處理實時數(shù)據(jù)的RDD(彈性分布式數(shù)據(jù)集)。
Spark Streaming是spark中一個非常重要的擴展庫,它是Spark核心API的一個擴展,可以實現(xiàn)高吞吐量的、具備容錯機制的實時流數(shù)據(jù)的處理。支持從多種數(shù)據(jù)源獲取數(shù)據(jù),包括Kafk、Flume、以及TCP socket等,從數(shù)據(jù)源獲取數(shù)據(jù)之后,可以使用諸如map、reduce和window等高級函數(shù)進行復(fù)雜算法的處理。最后還可以將處理結(jié)果存儲到文件系統(tǒng)和數(shù)據(jù)庫等。
但從Spark2.0開始,提出了新的實時流框架 Structured Streaming (2.0和2.1是實驗版本,從Spark2.2開始為穩(wěn)定版本)來替代Spark streaming,這時Spark streaming就進入維護模式。相比Spark Streaming,Structured Streaming的Api更加好用,功能強大。
Spark SQL
Spark SQL是Spark中的一個新模塊,它使用Spark編程API實現(xiàn)集成關(guān)系處理。 它支持通過SQL或Hive查詢查詢數(shù)據(jù)。 對于那些熟悉RDBMS的人來說,Spark SQL將很容易從之前的工具過渡到可以擴展傳統(tǒng)關(guān)系數(shù)據(jù)處理的邊界。
Spark SQL通過函數(shù)編程API集成關(guān)系處理。 此外,它為各種數(shù)據(jù)源提供支持,并且使用代碼轉(zhuǎn)換編織SQL查詢,從而產(chǎn)生一個非常強大的工具。
以下是Spark SQL的四個庫。
Data Source API
DataFrame API
Interpreter & Optimizer
SQL Service
Spark SQL是Spark用來操作結(jié)構(gòu)化數(shù)據(jù)的組件。通過Spark SQL,用戶可以使用SQL或者Apache Hive版本的SQL方言(HQL)來查詢數(shù)據(jù)。Spark SQL支持多種數(shù)據(jù)源類型,例如Hive表、Parquet以及JSON等。Spark SQL不僅為Spark提供了一個SQL接口,還支持開發(fā)者將SQL語句融入到Spark應(yīng)用程序開發(fā)過程中,無論是使用Python、Java還是Scala,用戶可以在單個的應(yīng)用中同時進行SQL查詢和復(fù)雜的數(shù)據(jù)分析。
GraphX
GraphX是用于圖形和圖形并行計算的Spark API。 因此,它使用彈性分布式屬性圖擴展了Spark RDD。
屬性圖是一個有向多圖,它可以有多個平行邊。 每個邊和頂點都有與之關(guān)聯(lián)的用戶定義屬性。 這里,平行邊緣允許相同頂點之間的多個關(guān)系。 在高層次上,GraphX通過引入彈性分布式屬性圖來擴展Spark RDD抽象:一個定向多圖,其屬性附加到每個頂點和邊。
為了支持圖形計算,GraphX公開了一組基本運算符(例如,subgraph,joinVertices和mapReduceTriplets)以及Pregel API的優(yōu)化變體。 此外,GraphX包含越來越多的圖算法和構(gòu)建器,以簡化圖形分析任務(wù)。
GraphX是Spark面向圖計算提供的框架與算法庫。GraphX中提出了彈性分布式屬性圖的概念,并在此基礎(chǔ)上實現(xiàn)了圖視圖與表視圖的有機結(jié)合與統(tǒng)一;同時針對圖數(shù)據(jù)處理提供了豐富的操作,例如取子圖操作subgraph、頂點屬性操作mapVertices、邊屬性操作mapEdges等。GraphX還實現(xiàn)了與Pregel的結(jié)合,可以直接使用一些常用圖算法,如PageRank、三角形計數(shù)等。
MlLib (Machine Learning)
MLlib代表機器學(xué)習(xí)庫。 Spark MLlib用于在Apache Spark中執(zhí)行機器學(xué)習(xí)。
MLlib是Spark提供的一個機器學(xué)習(xí)算法庫,其中包含了多種經(jīng)典、常見的機器學(xué)習(xí)算法,主要有分類、回歸、聚類、協(xié)同過濾等。MLlib不僅提供了模型評估、數(shù)據(jù)導(dǎo)入等額外的功能,還提供了一些更底層的機器學(xué)習(xí)原語,包括一個通用的梯度下降優(yōu)化基礎(chǔ)算法。所有這些方法都被設(shè)計為可以在集群上輕松伸縮的架構(gòu)。
如何運行Spark程序
在實際編程中,我們不需關(guān)心以上調(diào)度細節(jié).只需使用 Spark 提供的指定語言的編程接口調(diào)用相應(yīng)的 API 即可.
在 Spark API 中, 一個 應(yīng)用(Application) 對應(yīng)一個 SparkContext 的實例。一個 應(yīng)用 可以用于單個 Job,或者分開的多個 Job 的 session,或者響應(yīng)請求的長時間生存的服務(wù)器。與 MapReduce 不同的是,一個 應(yīng)用 的進程(我們稱之為 Executor),會一直在集群上運行,即使當時沒有 Job 在上面運行。
而調(diào)用一個Spark內(nèi)部的 Action 會產(chǎn)生一個 Spark job 來完成它。 為了確定這些job實際的內(nèi)容,Spark 檢查 RDD 的DAG再計算出執(zhí)行 plan 。這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數(shù)據(jù)已經(jīng)緩存下來的 RDD),產(chǎn)生結(jié)果 RDD 的 Action 為結(jié)束 。并根據(jù)是否發(fā)生 shuffle 劃分 DAG 的 stage.
Spark原生架構(gòu)和運行原理
架構(gòu)和粗流程描述
一個完整的Spark應(yīng)用程序,在提交集群運行時,它的處理流程涉及到如下圖所示的架構(gòu):
每個Spark應(yīng)用都由一個驅(qū)動器程序(drive program)來發(fā)起集群上的各種并行操作。
驅(qū)動器程序包含應(yīng)用的main函數(shù)。
驅(qū)動器負責創(chuàng)建SparkContext。
SparkContext可以與不同種類的集群資源管理器(Cluster Manager),例如Hadoop YARN,Mesos進行通信。
獲取到集群進行所需的資源后,SparkContext將得到集群中工作節(jié)點(Worker Node)上對應(yīng)的Executor。
不同的Spark程序有不同的Executor,他們之間是相互獨立的進程,Executor為應(yīng)用程序提供分布式計算以及數(shù)據(jù)存儲功能。
之后SparkContext將應(yīng)用程序代碼發(fā)送到各Executor,將任務(wù)(Task)分配給executors執(zhí)行。
ClusterManager
在Standalone模式中即為Master節(jié)點(主節(jié)點),控制整個集群,監(jiān)控Worker.在YARN中為ResourceManager
Worker
從節(jié)點,負責控制計算節(jié)點,啟動Executor或Driver。在YARN模式中為NodeManager,負責計算節(jié)點的控制。
Driver
運行Application的main()函數(shù)并創(chuàng)建SparkContect。
Executor
執(zhí)行器,在worker node上執(zhí)行任務(wù)的組件、用于啟動線程池運行任務(wù)。每個Application擁有獨立的一組Executor。
SparkContext
整個應(yīng)用的上下文,控制應(yīng)用的生命周期。
RDD
Spark的計算單元,一組RDD可形成執(zhí)行的有向無環(huán)圖RDD Graph。
DAG Scheduler
根據(jù)作業(yè)(Job)構(gòu)建基于Stage的DAG,并提交Stage給TaskScheduler。
TaskScheduler
將任務(wù)(Task)分發(fā)給Executor。
SparkEnv
線程級別的上下文,存儲運行時的重要組件的引用。
SparkEnv內(nèi)構(gòu)建并包含如下一些重要組件的引用。
1)MapOutPutTracker:負責Shuffle元信息的存儲。
2)BroadcastManager:負責廣播變量的控制與元信息的存儲。
3)BlockManager:負責存儲管理、創(chuàng)建和查找快。
4)MetricsSystem:監(jiān)控運行時性能指標信息。
5)SparkConf:負責存儲配置信息。
詳細流程描述
使用spark-submit提交一個Spark作業(yè)之后,這個作業(yè)就會啟動一個對應(yīng)的Driver進程。
根據(jù)你使用的部署模式(deploy-mode)不同,Driver進程可能在本地啟動,也可能在集群中某個工作節(jié)點上啟動。
而Driver進程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的資源管理集群,比如使用YARN作為資源管理集群)申請運行Spark作業(yè)需要使用的資源,這里的資源指的就是Executor進程。
YARN集群管理器會根據(jù)我們?yōu)镾park作業(yè)設(shè)置的資源參數(shù),在各個工作節(jié)點上,啟動一定數(shù)量的Executor進程,每個Executor進程都占有一定數(shù)量的內(nèi)存和CPU core。
在申請到了作業(yè)執(zhí)行所需的資源之后,Driver進程就會開始調(diào)度和執(zhí)行我們編寫的作業(yè)代碼了。
Driver進程會將我們編寫的Spark作業(yè)代碼分拆為多個stage,每個stage執(zhí)行一部分代碼片段,并為每個stage創(chuàng)建一批Task,然后將這些Task分配到各個Executor進程中執(zhí)行。
Task是最小的計算單元,負責執(zhí)行一模一樣的計算邏輯(也就是我們自己編寫的某個代碼片段),只是每個Task處理的數(shù)據(jù)不同而已。
一個stage的所有Task都執(zhí)行完畢之后,會在各個節(jié)點本地的磁盤文件中寫入計算中間結(jié)果,然后Driver就會調(diào)度運行下一個stage。
下一個stage的Task的輸入數(shù)據(jù)就是上一個stage輸出的中間結(jié)果。
如此循環(huán)往復(fù),直到將我們自己編寫的代碼邏輯全部執(zhí)行完,并且計算完所有的數(shù)據(jù),得到我們想要的結(jié)果為止。
Spark是根據(jù)shuffle類算子來進行stage的劃分。
如果我們的代碼中執(zhí)行了某個shuffle類算子(比如reduceByKey、join等),那么就會在該算子處,劃分出一個stage界限來。
可以大致理解為,shuffle算子執(zhí)行之前的代碼會被劃分為一個stage,shuffle算子執(zhí)行以及之后的代碼會被劃分為下一個stage。
因此一個stage剛開始執(zhí)行的時候,它的每個Task可能都會從上一個stage的Task所在的節(jié)點,去通過網(wǎng)絡(luò)傳輸拉取需要自己處理的所有key,然后對拉取到的所有相同的key使用我們自己編寫的算子函數(shù)執(zhí)行聚合操作(比如reduceByKey()算子接收的函數(shù))。這個過程就是shuffle。
當我們在代碼中執(zhí)行了cache/persist等持久化操作時,根據(jù)我們選擇的持久化級別的不同,每個Task計算出來的數(shù)據(jù)也會保存到Executor進程的內(nèi)存或者所在節(jié)點的磁盤文件中。
因此Executor的內(nèi)存主要分為三塊:
第一塊是讓Task執(zhí)行我們自己編寫的代碼時使用,默認是占Executor總內(nèi)存的20%;
第二塊是讓Task通過shuffle過程拉取了上一個stage的Task的輸出后,進行聚合等操作時使用,默認也是占Executor總內(nèi)存的20%;
第三塊是讓RDD持久化時使用,默認占Executor總內(nèi)存的60%。
Task的執(zhí)行速度是跟每個Executor進程的CPU core數(shù)量有直接關(guān)系的。
一個CPU core同一時間只能執(zhí)行一個線程。而每個Executor進程上分配到的多個Task,都是以每個Task一條線程的方式,多線程并發(fā)運行的。
如果CPU core數(shù)量比較充足,而且分配到的Task數(shù)量比較合理,那么通常來說,可以比較快速和高效地執(zhí)行完這些Task線程。
以上就是Spark作業(yè)的基本運行原理的說明.
shuffle 和 stage
shuffle 是劃分 DAG 中 stage 的標識,同時影響 Spark 執(zhí)行速度的關(guān)鍵步驟.
RDD 的 Transformation 函數(shù)中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作.
窄依賴跟寬依賴的區(qū)別在于 是否發(fā)生 shuffle(洗牌) 操作.
寬依賴會發(fā)生 shuffle 操作. 窄依賴是子 RDD的各個分片(partition)不依賴于其他分片,能夠獨立計算得到結(jié)果,寬依賴指子 RDD 的各個分片會依賴于父RDD 的多個分片,所以會造成父 RDD 的各個分片在集群中重新分片, 看如下兩個示例:
// Map: "cat" -> c, cat val rdd1 = rdd.Map(x => (x.charAt(0), x)) // groupby same key and count val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))第一個 Map 操作將 RDD 里的各個元素進行映射, RDD 的各個數(shù)據(jù)元素之間不存在依賴,可以在集群的各個內(nèi)存中獨立計算,也就是并行化
第二個 groupby 之后的 Map 操作,為了計算相同 key 下的元素個數(shù),需要把相同 key 的元素聚集到同一個 partition 下,所以造成了數(shù)據(jù)在內(nèi)存中的重新分布,即 shuffle 操作.
shuffle 操作是 spark 中最耗時的操作,應(yīng)盡量避免不必要的 shuffle.
寬依賴主要有兩個過程: shuffle write 和 shuffle fetch.
類似 Hadoop 的 Map 和 Reduce 階段.
shuffle write 將 ShuffleMapTask 任務(wù)產(chǎn)生的中間結(jié)果緩存到內(nèi)存中, shuffle fetch 獲得 ShuffleMapTask 緩存的中間結(jié)果進行 ShuffleReduceTask 計算,這個過程容易造成OutOfMemory.
shuffle 過程內(nèi)存分配使用 ShuffleMemoryManager 類管理,會針對每個 Task 分配內(nèi)存,Task 任務(wù)完成后通過 Executor 釋放空間.
這里可以把 Task 理解成不同 key 的數(shù)據(jù)對應(yīng)一個 Task.
早期的內(nèi)存分配機制使用公平分配,即不同 Task 分配的內(nèi)存是一樣的,但是這樣容易造成內(nèi)存需求過多的 Task 的 OutOfMemory, 從而造成多余的 磁盤 IO 過程,影響整體的效率.
(例:某一個 key 下的數(shù)據(jù)明顯偏多,但因為大家內(nèi)存都一樣,這一個 key 的數(shù)據(jù)就容易 OutOfMemory).
1.5版以后 Task 共用一個內(nèi)存池,內(nèi)存池的大小默認為 JVM 最大運行時內(nèi)存容量的16%
分配機制如下:
假如有 N 個 Task,ShuffleMemoryManager 保證每個 Task 溢出之前至少可以申請到1/2N 內(nèi)存,且至多申請到1/N
N 為當前活動的 shuffle Task 數(shù)
因為N 是一直變化的,所以 manager 會一直追蹤 Task 數(shù)的變化,重新計算隊列中的1/N 和1/2N.
但是這樣仍然容易造成內(nèi)存需要多的 Task 任務(wù)溢出,所以最近有很多相關(guān)的研究是針對 shuffle 過程內(nèi)存優(yōu)化的.
如下 DAG 流程圖中,分別讀取數(shù)據(jù),經(jīng)過處理后 join 2個 RDD 得到結(jié)果
在這個圖中,根據(jù)是否發(fā)生 shuffle 操作能夠?qū)⑵浞殖扇缦碌?stage 類型:
(join 需要針對同一個 key 合并,所以需要 shuffle)
運行到每個 stage 的邊界時,數(shù)據(jù)在父 stage 中按照 Task 寫到磁盤上,而在子 stage 中通過網(wǎng)絡(luò)按照 Task 去讀取數(shù)據(jù)。這些操作會導(dǎo)致很重的網(wǎng)絡(luò)以及磁盤的I/O,所以 stage 的邊界是非常占資源的,在編寫 Spark 程序的時候需要盡量避免的 。父 stage 中 partition 個數(shù)與子 stage 的 partition 個數(shù)可能不同,所以那些產(chǎn)生 stage 邊界的 Transformation 常常需要接受一個 numPartition 的參數(shù)來覺得子 stage 中的數(shù)據(jù)將被切分為多少個 partition[^demoa]。
PS:shuffle 操作的時候可以用 combiner 壓縮數(shù)據(jù),減少 IO 的消耗
Spark原生框架處理數(shù)據(jù)流程
1、Client提交應(yīng)用。
2、Master找到一個Worker啟動Driver
3、Driver向Master或者資源管理器申請資源,之后將應(yīng)用轉(zhuǎn)化為RDD Graph
4、再由DAGSchedule將RDD Graph轉(zhuǎn)化為Stage的有向無環(huán)圖提交給TaskSchedule。
5、再由TaskSchedule提交任務(wù)給Executor執(zhí)行。
6、其它組件協(xié)同工作,確保整個應(yīng)用順利執(zhí)行。
Executor執(zhí)行任務(wù)原理
Executor完成一個任務(wù)需要做兩部分工具,一部分就是加載數(shù)據(jù)源,也就是Spark的基礎(chǔ)數(shù)據(jù)單元RDD。
RDD的數(shù)據(jù)來源可以是多種多樣的,我們這里以HDFS為例。
Spark支持兩種RDD操作:transformation和action。
transformation操作
transformation操作會針對已有的RDD創(chuàng)建一個新的RDD。
transformation具有l(wèi)azy特性,即transformation不會觸發(fā)spark程序的執(zhí)行,它們只是記錄了對RDD所做的操作,不會自發(fā)的執(zhí)行。
只有執(zhí)行了一個action,之前的所有transformation才會執(zhí)行。
常用的transformation介紹:
map :將RDD中的每個元素傳人自定義函數(shù),獲取一個新的元素,然后用新的元素組成新的RDD。
filter:對RDD中每個元素進行判斷,如果返回true則保留,返回false則剔除。
flatMap:與map類似,但是對每個元素都可以返回一個或多個元素。
groupByKey:根據(jù)key進行分組,每個key對應(yīng)一個Iterable。
reduceByKey:對每個key對應(yīng)的value進行reduce操作。
sortByKey:對每個key對應(yīng)的value進行排序操作。
join:對兩個包含<key,value>對的RDD進行join操作,每個keyjoin上的pair,都會傳入自定義函數(shù)進行處理。
cogroup:同join,但是每個key對應(yīng)的Iterable都會傳入自定義函數(shù)進行處理。
action操作
action操作主要對RDD進行最后的操作,比如遍歷,reduce,保存到文件等,并可以返回結(jié)果給Driver程序。
action操作執(zhí)行,會觸發(fā)一個spark job的運行,從而觸發(fā)這個action之前所有的transformation的執(zhí)行,這是action的特性。
常用的action介紹:
reduce:將RDD中的所有元素進行聚合操作。第一個和第二個元素聚合,值與第三個元素聚合,值與第四個元素聚合,以此類推。
collect:將RDD中所有元素獲取到本地客戶端(一般不建議使用)。
count:獲取RDD元素總數(shù)。
take(n):獲取RDD中前n個元素。
saveAsTextFile:將RDD元素保存到文件中,對每個元素調(diào)用toString方法。
countByKey:對每個key對應(yīng)的值進行count計數(shù)。
foreach:遍歷RDD中的每個元素。
Spark on yarn 框架處理數(shù)據(jù)流程
1、基于YARN的Spark作業(yè)首先由客戶端生成作業(yè)信息,提交給ResourceManager。
2、ResourceManager在某一NodeManager匯報時把AppMaster分配給NodeManager。
3、NodeManager啟動SparkAppMaster。
4、SparkAppMastere啟動后初始化然后向ResourceManager申請資源。
5、申請到資源后,SparkAppMaster通過RPC讓NodeManager啟動相應(yīng)的SparkExecutor。
6、SparkExecutor向SparkAppMaster匯報并完成相應(yīng)的任務(wù)。
7、SparkClient會通過AppMaster獲取作業(yè)運行狀態(tài)。
如何運行Spark程序
在實際編程中,我們不需要關(guān)心調(diào)度細節(jié).
只需使用 Spark 提供的指定語言的編程接口調(diào)用相應(yīng)的 API 即可.
在 Spark API 中, 一個 應(yīng)用(Application) 對應(yīng)一個 SparkContext 的實例。
一個 應(yīng)用 可以用于單個 Job,或者分開的多個 Job 的 session,或者響應(yīng)請求的長時間生存的服務(wù)器。
與 MapReduce 不同的是,一個 應(yīng)用 的進程(我們稱之為 Executor),會一直在集群上運行,即使當時沒有 Job 在上面運行。
而調(diào)用一個Spark內(nèi)部的 Action 會產(chǎn)生一個 Spark job 來完成它。
為了確定這些job實際的內(nèi)容,Spark 檢查 RDD 的DAG再計算出執(zhí)行 plan 。
這個 plan 以最遠端的 RDD 為起點(最遠端指的是對外沒有依賴的 RDD 或者 數(shù)據(jù)已經(jīng)緩存下來的 RDD),產(chǎn)生結(jié)果 RDD 的 Action 為結(jié)束 。
并根據(jù)是否發(fā)生 shuffle 劃分 DAG 的 stage.
參考鏈接:
https://www.aboutyun.com/forum.php?mod=viewthread&tid=24883
https://www.cnblogs.com/cxxjohnson/p/8909578.html
總結(jié)
以上是生活随笔為你收集整理的hadoop组件---spark----全面了解spark以及与hadoop的区别的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: taro Button按钮组件
- 下一篇: 【我的开源】股票软件简介+源码(蜗牛股票