分布式实时计算—Spark—Spark Core
原文作者:bingoabin
原文地址:Spark Core
目錄
一、Spark Core
1. 主要功能
2. Spark Core子框架
3. Spark架構
4. Spark計算模型
二、組件
1. 介紹
2. RDD
3. DataFrame
4. DataSet
6. RDD和DataSet比較
7. DataFrame和DataSet比較
8. 應用場景
一、Spark Core
Apache Spark 是加州大學伯克利分校的 AMP Labs 開發(fā)的開源分布式輕量級通用計算框架。由于 Spark 基于內(nèi)存設計,使得它擁有比 Hadoop 更高的性能(極端情況下可以達到 100x),并且對多語言(Scala、Java、Python)提供支持。其一棧式的設計特點使得我們的學習和維護成本大大地減少,而且其提供了很好的容錯解決方案。
1. 主要功能
Spark Core提供Spark最基礎與最核心的功能,主要包括以下功能:
2. Spark Core子框架
(1)、Spark SQL:首先使用SQL語句解析器(SqlParser)將SQL轉換為語法樹(Tree),并且使用規(guī)則執(zhí)行器(RuleExecutor)將一系列規(guī)則(Rule)應用到語法樹,最終生成物理執(zhí)行計劃并執(zhí)行。其中,規(guī)則執(zhí)行器包括語法分析器(Analyzer)和優(yōu)化器(Optimizer)。?
(2)、Spark Streaming:用于流式計算。Spark Streaming支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和簡單的TCP套接字等多種數(shù)據(jù)輸入源。輸入流接收器(Receiver)負責接入數(shù)據(jù),是接入數(shù)據(jù)流的接口規(guī)范。Dstream是Spark Streaming中所有數(shù)據(jù)流的抽象,Dstream可以被組織為Dstream Graph。Dstream本質上由一系列連續(xù)的RDD組成。?
(3)、GraphX:Spark提供的分布式圖計算框架。GraphX主要遵循整體同步并行(bulk Synchronous parallel,BSP)計算模式下的Pregel模型實現(xiàn)。GraphX提供了對圖的抽象Graph,Graph由頂點(Vertex),邊(Edge)及繼承了Edge的EdgeTriplet三種結構組成。GraphX目前已經(jīng)封裝了最短路徑,網(wǎng)頁排名,連接組件,三角關系統(tǒng)計等算法的實現(xiàn),用戶可以選擇使用。?
(4)、MLlib:Spark提供的機器學習框架。機器學習是一門設計概率論、統(tǒng)計學、逼近論、凸分析、算法復雜度理論等多領域的交叉學科。MLlib目前已經(jīng)提供了基礎統(tǒng)計、分析、回歸、決策樹、隨機森林、樸素貝葉斯、保序回歸、協(xié)同過濾、聚類、維數(shù)縮減、特征提取與轉型、頻繁模式挖掘、預言模型標記語言、管道等多種數(shù)理統(tǒng)計、概率論、數(shù)據(jù)挖掘方面的數(shù)學算法。
3. Spark架構
Spark采用了分布式計算中的Master-Slave模型。Master作為整個集群的控制器,負責整個集群的正常運行;Worker是計算節(jié)點,接受主節(jié)點命令以及進行狀態(tài)匯報;Executor負責任務(Tast)的調度和執(zhí)行;Client作為用戶的客戶端負責提交應用;Driver負責控制一個應用的執(zhí)行。
?
Spark集群啟動時,需要從主節(jié)點和從節(jié)點分別啟動Master進程和Worker進程,對整個集群進行控制。在一個Spark應用的執(zhí)行過程中,Driver是應用的邏輯執(zhí)行起點,運行Application的main函數(shù)并創(chuàng)建SparkContext,DAGScheduler把對Job中的RDD有向無環(huán)圖根據(jù)依賴關系劃分為多個Stage,每一個Stage是一個TaskSet, TaskScheduler把Task分發(fā)給Worker中的Executor;Worker啟動Executor,Executor啟動線程池用于執(zhí)行Task。
4. Spark計算模型
RDD:彈性分布式數(shù)據(jù)集,是一種內(nèi)存抽象,可以理解為一個大數(shù)組,數(shù)組的元素是RDD的分區(qū)Partition,分布在集群上;在物理數(shù)據(jù)存儲上,RDD的每一個Partition對應的就是一個數(shù)據(jù)塊Block,Block可以存儲在內(nèi)存中,當內(nèi)存不夠時可以存儲在磁盤上。
RDD邏輯物理結構
Hadoop將Mapreduce計算的結果寫入磁盤,在機器學習、圖計算、PageRank等迭代計算下,重用中間結果導致的反復I/O耗時過長,成為了計算性能的瓶頸。為了提高迭代計算的性能和分布式并行計算下共享數(shù)據(jù)的容錯性,伯克利的設計者依據(jù)兩個特性而設計了RDD:
Operations:算子
算子是RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進行轉換和操作。如下圖,Spark從外部空間(HDFS)讀取數(shù)據(jù)形成RDD_0,Tranformation算子對數(shù)據(jù)進行操作(如fliter)并轉化為新的RDD_1、RDD_2,通過Action算子(如collect/count)觸發(fā)Spark提交作業(yè)。如上的分析過程可以看出,Tranformation算子并不會觸發(fā)Spark提交作業(yè),直至Action算子才提交作業(yè),這是一個延遲計算的設計技巧,可以避免內(nèi)存過快被中間計算占滿,從而提高內(nèi)存的利用率。
下圖是算子的列表,分三大類:Value數(shù)據(jù)類型的Tranformation算子;Key-Value數(shù)據(jù)類型的Tranformation算子;Action算子。
Lineage Graph:血統(tǒng)關系圖
下圖的第一階段生成RDD的有向無環(huán)圖,即是血統(tǒng)關系圖,記錄了RDD的更新過程,當這個RDD的部分分區(qū)數(shù)據(jù)丟失時,它可以通過Lineage獲取足夠的信息來重新運算和恢復丟失的數(shù)據(jù)分區(qū)。DAGScheduler依據(jù)RDD的依賴關系將有向無環(huán)圖劃分為多個Stage,一個Stage對應著一系列的Task,由TashScheduler分發(fā)給Worker計算。
二、組件
1. 介紹
spark生態(tài)系統(tǒng)中,Spark Core,包括各種Spark的各種核心組件,它們能夠對內(nèi)存和硬盤進行操作,或者調用CPU進行計算。spark core定義了RDD、DataFrame和DataSet
spark最初只有RDD,DataFrame在Spark 1.3中被首次發(fā)布,DataSet在Spark1.6版本中被加入。
2. RDD
RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個只讀的,可分區(qū)的分布式數(shù)據(jù)集,這個數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中,在多次計算間重用。
優(yōu)點:
- 編譯時類型安全?
- 編譯時就能檢查出類型錯誤?
- 面向對象的編程風格?
- 直接通過類名點的方式來操作數(shù)據(jù)
缺點:
- 序列化和反序列化的性能開銷?
- 無論是集群間的通信, 還是IO操作都需要對對象的結構和數(shù)據(jù)進行序列化和反序列化.?
- GC的性能開銷?
- 頻繁的創(chuàng)建和銷毀對象, 勢必會增加GC
3. DataFrame
在Spark中,DataFrame是一種以RDD為基礎的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結構信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進行了針對性的優(yōu)化,最終達到大幅提升運行時效率的目標。反觀RDD,由于無從得知所存數(shù)據(jù)元素的具體內(nèi)部結構,Spark Core只能在stage層面進行簡單、通用的流水線優(yōu)化。
DataFrame引入了schema和off-heap
schema : RDD每一行的數(shù)據(jù), 結構都是一樣的.
這個結構就存儲在schema中。 Spark通過schame就能夠讀懂數(shù)據(jù), 因此在通信和IO時就只需要序列化和反序列化數(shù)據(jù),而結構的部分就可以省略了。 off-heap : 意味著JVM堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是JVM)。Spark能夠以二進制的形式序列化數(shù)據(jù)(不包括結構)到off-heap中,當要操作數(shù)據(jù)時,就直接操作off-heap內(nèi)存。由于Spark理解schema,所以知道該如何操作。
off-heap就像地盤,schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制,也就不再收GC的困擾了。通過schema和off-heap,DataFrame解決了RDD的缺點,但是卻丟了RDD的優(yōu)點。 DataFrame不是類型安全的, API也不是面向對象風格的。
<span style="color:#000000"><code>import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Run {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)/*** id age* 1 30* 2 29* 3 21*/val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)// API不是面向對象的idAgeDF.filter(idAgeDF.col("age") > 25) // 不會報錯, DataFrame不是編譯時類型安全的idAgeDF.filter(idAgeDF.col("age") > "") } } </code></span>4. DataSet
Dataset是一個強類型的特定領域的對象,這種對象可以函數(shù)式或者關系操作并行地轉換。每個Dataset也有一個被稱為一個DataFrame的類型化視圖,這種DataFrame是Row類型的Dataset,即Dataset[Row]Dataset是“懶惰”的,只在執(zhí)行行動操作時觸發(fā)計算。本質上,數(shù)據(jù)集表示一個邏輯計劃,該計劃描述了產(chǎn)生數(shù)據(jù)所需的計算。當執(zhí)行行動操作時,Spark的查詢優(yōu)化程序優(yōu)化邏輯計劃,并生成一個高效的并行和分布式物理計劃。DataSet結合了RDD和DataFrame的優(yōu)點,,并帶來的一個新的概念Encoder 當序列化數(shù)據(jù)時,Encoder產(chǎn)生字節(jié)碼與off-heap進行交互,能夠達到按需訪問數(shù)據(jù)的效果, 而不用反序列化整個對象。 Spark還沒有提供自定義Encoder的API,但是未來會加入。下面看DataFrame和DataSet在2.0.0-preview中的實現(xiàn)
<span style="color:#000000"><code>下面這段代碼, 在1.6.x中創(chuàng)建的是DataFrame // 上文DataFrame示例中提取出來的 val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema) </code></span> <span style="color:#000000"><code>但是同樣的代碼在2.0.0-preview中, 創(chuàng)建的雖然還叫DataFrame// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實現(xiàn), 返回值依然是DataFrame def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { sparkSession.createDataFrame(rowRDD, schema) } </code></span> <span style="color:#000000"><code>但是其實卻是DataSet, 因為DataFrame被聲明為Dataset[Row]package object sql {// ...省略了不相關的代碼type DataFrame = Dataset[Row] } </code></span> <span style="color:#000000"><code>因此當我們從1.6.x遷移到2.0.0的時候, 無需任何修改就直接用上了DataSet.下面是一段DataSet的示例代碼import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local") // 調試的時候一定不要用local[*]val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)import sqlContext.implicits._val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))// 在2.0.0-preview中這行代碼創(chuàng)建出的DataFrame, 其實是DataSet[Row]val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)// 在2.0.0-preview中, 還不支持自定的Encoder, Row類型不行, 自定義的bean也不行// 官方文檔也有寫通過bean創(chuàng)建Dataset的例子,但是我運行時并不能成功// 所以目前需要用創(chuàng)建DataFrame的方法, 來創(chuàng)建DataSet[Row]// sqlContext.createDataset(idAgeRDDRow)// 目前支持String, Integer, Long等類型直接創(chuàng)建DatasetSeq(1, 2, 3).toDS().show()sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()} } </code></span>5. RDD和DataFrame比較
DataFrame與RDD相同之處,都是不可變分布式彈性數(shù)據(jù)集。不同之處在于,DataFrame的數(shù)據(jù)集都是按指定列存儲,即結構化數(shù)據(jù)。類似于傳統(tǒng)數(shù)據(jù)庫中的表。DataFrame的設計是為了讓大數(shù)據(jù)處理起來更容易。DataFrame允許開發(fā)者把結構化數(shù)據(jù)集導入DataFrame,并做了higher-level的抽象; DataFrame提供特定領域的語言(DSL)API來操作你的數(shù)據(jù)集。上圖直觀地體現(xiàn)了DataFrame和RDD的區(qū)別。左側的RDD[Person]雖然以Person為類型參數(shù),但Spark框架本身不了解Person類的內(nèi)部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark SQL可以清楚地知道該數(shù)據(jù)集中包含哪些列,每列的名稱和類型各是什么。DataFrame多了數(shù)據(jù)的結構信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計劃的優(yōu)化,比如filter下推、裁剪等。
6. RDD和DataSet比較
DataSet以Catalyst邏輯執(zhí)行計劃表示,并且數(shù)據(jù)以編碼的二進制形式被存儲,不需要反序列化就可以執(zhí)行sorting、shuffle等操作。
DataSet創(chuàng)立需要一個顯式的Encoder,把對象序列化為二進制,可以把對象的scheme映射為Spark SQl類型,然而RDD依賴于運行時反射機制。
通過上面兩點,DataSet的性能比RDD的要好很多
7. DataFrame和DataSet比較
Dataset可以認為是DataFrame的一個特例,主要區(qū)別是Dataset每一個record存儲的是一個強類型值而不是一個Row。因此具有如下三個特點:
DataFrame和DataSet可以相互轉化,?df.as[ElementType]這樣可以把DataFrame轉化為DataSet,ds.toDF()這樣可以把DataSet轉化為DataFrame。
<span style="color:#000000"><code>//DataFrame// Load a text file and interpret each line as a java.lang.String val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String] val result = ds.flatMap(_.split(" ")) // Split on whitespace.filter(_ != "") // Filter empty words.toDF() // Convert to DataFrame to perform aggregation / sorting.groupBy($"value") // Count number of occurences of each word.agg(count("*") as "numOccurances").orderBy($"numOccurances" desc) // Show most common words first//DataSet,完全使用scala編程,不要切換到DataFrameval wordCount =ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function.count()</code></span>8. 應用場景
什么時候用RDD?使用RDD的一般場景:
- 你需要使用low-level的transformation和action來控制你的數(shù)據(jù)集;?
- 你得數(shù)據(jù)集非結構化,比如,流媒體或者文本流;?
- 你想使用函數(shù)式編程來操作你得數(shù)據(jù),而不是用特定領域語言(DSL)表達;?
- 你不在乎schema,比如,當通過名字或者列處理(或訪問)數(shù)據(jù)屬性不在意列式存儲格式;?
- 你放棄使用DataFrame和Dataset來優(yōu)化結構化和半結構化數(shù)據(jù)集?
RDD在Apache Spark 2.0中慘遭拋棄??答案當然是 NO !?通過后面的描述你會得知:Spark用戶可以在RDD,DataFrame和Dataset三種數(shù)據(jù)集之間無縫轉換,而是只需使用超級簡單的API方法。
什么時候使用DataFrame或者Dataset?
- 你想使用豐富的語義,high-level抽象,和特定領域語言API,那你可DataFrame或者Dataset;?
- 你處理的半結構化數(shù)據(jù)集需要high-level表達, filter,map,aggregation,average,sum ,SQL 查詢,列式訪問和使用lambda函數(shù),那你可DataFrame或者Dataset;?
- 你想利用編譯時高度的type-safety,Catalyst優(yōu)化和Tungsten的code生成,那你可DataFrame或者Dataset;?
- 你想統(tǒng)一和簡化API使用跨Spark的Library,那你可DataFrame或者Dataset;?
如果你是一個R使用者,那你可DataFrame或者Dataset;?如果你是一個Python使用者,那你可DataFrame或者Dataset;
<span style="color:#000000"><code>你可以無縫的把DataFrame或者Dataset轉化成一個RDD,只需簡單的調用 .rdd:// select specific fields from the Dataset, apply a predicate // using the where() method, convert to an RDD, and show first 10 // RDD rowsval deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300) // convert to RDDs and take the first 10 rowsval eventsRDD = deviceEventsDS.rdd.take(10)</code></span>總結
以上是生活随笔為你收集整理的分布式实时计算—Spark—Spark Core的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 分布式离线计算—Spark—基础介绍
- 下一篇: 分布式离线计算—Spark—SparkS