大数据之Spark简介及RDD说明
- 目錄
- 前言:
- 1、Spark概述
- 1.1、什么是Spark(官網(wǎng):http://spark.apache.org)
- 1.2、為什么要學(xué)Spark
- 1.3、Spark特點(diǎn)
- 2、RDD概述
- 2.1、什么是RDD
- 2.2、RDD的屬性
- 2.3、創(chuàng)建RDD的兩種方式
- 2.4、RDD編程API
- 2.5、RDD的依賴關(guān)系
- 2.6、RDD的緩存
- 2.7、DAG的生成
- 總結(jié):
目錄
前言:
本篇文章只是簡單介紹下Spark,然后對(duì)Spark的RDD在做一個(gè)全面的介紹。由于博主知識(shí)有限,這里只是做一個(gè)簡單的介紹。若有些地方有問題,請(qǐng)大家及時(shí)指出。后續(xù)隨著深入的學(xué)習(xí),會(huì)再進(jìn)一步總結(jié)自己的學(xué)習(xí)成果。
1、Spark概述
1.1、什么是Spark(官網(wǎng):http://spark.apache.org)
Spark是一種快速、通用、可擴(kuò)展的大數(shù)據(jù)分析引擎,2009年誕生于加州大學(xué)伯克利分校AMPLab,2010年開源,2013年6月成為Apache孵化項(xiàng)目,2014年2月成為Apache頂級(jí)項(xiàng)目。目前,Spark生態(tài)系統(tǒng)已經(jīng)發(fā)展成為一個(gè)包含多個(gè)子項(xiàng)目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib等子項(xiàng)目,Spark是基于內(nèi)存計(jì)算的大數(shù)據(jù)并行計(jì)算框架。Spark基于內(nèi)存計(jì)算,提高了在大數(shù)據(jù)環(huán)境下數(shù)據(jù)處理的實(shí)時(shí)性,同時(shí)保證了高容錯(cuò)性和高可伸縮性,允許用戶將Spark部署在大量廉價(jià)硬件之上,形成集群。Spark得到了眾多大數(shù)據(jù)公司的支持,這些公司包括Hortonworks、IBM、Intel、Cloudera、MapR、Pivotal、百度、阿里、騰訊、京東、攜程、優(yōu)酷土豆。當(dāng)前百度的Spark已應(yīng)用于鳳巢、大搜索、直達(dá)號(hào)、百度大數(shù)據(jù)等業(yè)務(wù);阿里利用GraphX構(gòu)建了大規(guī)模的圖計(jì)算和圖挖掘系統(tǒng),實(shí)現(xiàn)了很多生產(chǎn)系統(tǒng)的推薦算法;騰訊Spark集群達(dá)到8000臺(tái)的規(guī)模,是當(dāng)前已知的世界上最大的Spark集群。
1.2、為什么要學(xué)Spark
中間結(jié)果輸出:基于MapReduce的計(jì)算引擎通常會(huì)將中間結(jié)果輸出到磁盤上,進(jìn)行存儲(chǔ)和容錯(cuò)。出于任務(wù)管道承接的,考慮,當(dāng)一些查詢翻譯到MapReduce任務(wù)時(shí),往往會(huì)產(chǎn)生多個(gè)Stage,而這些串聯(lián)的Stage又依賴于底層文件系統(tǒng)(如HDFS)來存儲(chǔ)每一個(gè)Stage的輸出結(jié)果
Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生態(tài)系統(tǒng),以彌補(bǔ)MapReduce的不足。
1.3、Spark特點(diǎn)
1.3.1、快
與Hadoop的MapReduce相比,Spark基于內(nèi)存的運(yùn)算要快100倍以上,基于硬盤的運(yùn)算也要快10倍以上。Spark實(shí)現(xiàn)了高效的DAG執(zhí)行引擎,可以通過基于內(nèi)存來高效處理數(shù)據(jù)流。
1.3.2、易用
Spark支持Java、Python和Scala的API,還支持超過80種高級(jí)算法,使用戶可以快速構(gòu)建不同的應(yīng)用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在這些shell中使用Spark集群來驗(yàn)證解決問題的方法。
1.3.3、通用
Spark提供了統(tǒng)一的解決方案。Spark可以用于批處理、交互式查詢(Spark SQL)、實(shí)時(shí)流處理(Spark Streaming)、機(jī)器學(xué)習(xí)(Spark MLlib)和圖計(jì)算(GraphX)。這些不同類型的處理都可以在同一個(gè)應(yīng)用中無縫使用。Spark統(tǒng)一的解決方案非常具有吸引力,畢竟任何公司都想用統(tǒng)一的平臺(tái)去處理遇到的問題,減少開發(fā)和維護(hù)的人力成本和部署平臺(tái)的物力成本。
1.3.4、兼容性
Spark可以非常方便地與其他的開源產(chǎn)品進(jìn)行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作為它的資源管理和調(diào)度器,并且可以處理所有Hadoop支持的數(shù)據(jù),包括HDFS、HBase和Cassandra等。這對(duì)于已經(jīng)部署Hadoop集群的用戶特別重要,因?yàn)椴恍枰鋈魏螖?shù)據(jù)遷移就可以使用Spark的強(qiáng)大處理能力。Spark也可以不依賴于第三方的資源管理和調(diào)度器,它實(shí)現(xiàn)了Standalone作為其內(nèi)置的資源管理和調(diào)度框架,這樣進(jìn)一步降低了Spark的使用門檻,使得所有人都可以非常容易地部署和使用Spark。此外,Spark還提供了在EC2上部署Standalone的Spark集群的工具。
2、RDD概述
2.1、什么是RDD
RDD(Resilient Distributed Dataset)叫做分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。RDD具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。
2.2、RDD的屬性
一組分片(Partition),即數(shù)據(jù)集的基本組成單位。對(duì)于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
一個(gè)計(jì)算每個(gè)分區(qū)的函數(shù)。Spark中RDD的計(jì)算是以分片為單位的,每個(gè)RDD都會(huì)實(shí)現(xiàn)compute函數(shù)以達(dá)到這個(gè)目的。compute函數(shù)會(huì)對(duì)迭代器進(jìn)行復(fù)合,不需要保存每次計(jì)算的結(jié)果。
RDD之間的依賴關(guān)系。*RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。*
一個(gè)Partitioner,即RDD的分片函數(shù)。當(dāng)前Spark中實(shí)現(xiàn)了兩種類型的分片函數(shù),一個(gè)是基于哈希的HashPartitioner,另外一個(gè)是基于范圍的RangePartitioner。只有對(duì)于于key-value的RDD,才會(huì)有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函數(shù)不但決定了RDD本身的分片數(shù)量,也決定了parent RDD Shuffle輸出時(shí)的分片數(shù)量。
一個(gè)列表,存儲(chǔ)存取每個(gè)Partition的優(yōu)先位置(preferred location)。對(duì)于一個(gè)HDFS文件來說,這個(gè)列表保存的就是每個(gè)Partition所在的塊的位置。按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”的理念,Spark在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能地將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。
2.3、創(chuàng)建RDD的兩種方式
由一個(gè)已經(jīng)存在的Scala集合創(chuàng)建。
val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有Hadoop支持的數(shù)據(jù)集,比如HDFS、Cassandra、HBase等
val rdd2 = sc.textFile(“hdfs://node1.itcast.cn:9000/words.txt”)
2.4、RDD編程API
2.4.1、Transformation
RDD中的所有轉(zhuǎn)換都是延遲加載的,也就是說,它們并不會(huì)直接計(jì)算結(jié)果。相反的,它們只是記住這些應(yīng)用到基礎(chǔ)數(shù)據(jù)集(例如一個(gè)文件)上的轉(zhuǎn)換動(dòng)作。只有當(dāng)發(fā)生一個(gè)要求返回結(jié)果給Driver的動(dòng)作時(shí),這些轉(zhuǎn)換才會(huì)真正運(yùn)行。這種設(shè)計(jì)讓Spark更加有效率地運(yùn)行。
常用的Transformation:
轉(zhuǎn)換 含義 map(func) 返回一個(gè)新的RDD,該RDD由每一個(gè)輸入元素經(jīng)過func函數(shù)轉(zhuǎn)換后組成 filter(func) 返回一個(gè)新的RDD,該RDD由經(jīng)過func函數(shù)計(jì)算后返回值為true的輸入元素組成 flatMap(func) 類似于map,但是每一個(gè)輸入元素可以被映射為0或多個(gè)輸出元素(所以func應(yīng)該返回一個(gè)序列,而不是單一元素) mapPartitions(func) 類似于map,但獨(dú)立地在RDD的每一個(gè)分片上運(yùn)行,因此在類型為T的RDD上運(yùn)行時(shí),func的函數(shù)類型必須是Iterator[T] => Iterator[U] mapPartitionsWithIndex(func) 類似于mapPartitions,但func帶有一個(gè)整數(shù)參數(shù)表示分片的索引值,因此在類型為T的RDD上運(yùn)行時(shí),func的函數(shù)類型必須是(Int, Interator[T]) => Iterator[U] sample(withReplacement, fraction, seed) 根據(jù)fraction指定的比例對(duì)數(shù)據(jù)進(jìn)行采樣,可以選擇是否使用隨機(jī)數(shù)進(jìn)行替換,seed用于指定隨機(jī)數(shù)生成器種子 union(otherDataset) 對(duì)源RDD和參數(shù)RDD求并集后返回一個(gè)新的RDD intersection(otherDataset) 對(duì)源RDD和參數(shù)RDD求交集后返回一個(gè)新的RDD distinct([numTasks])) 對(duì)源RDD進(jìn)行去重后返回一個(gè)新的RDD groupByKey([numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K, Iterator[V])的RDD reduceByKey(func, [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,返回一個(gè)(K,V)的RDD,使用指定的reduce函數(shù),將相同key的值聚合到一起,與groupByKey類似,reduce任務(wù)的個(gè)數(shù)可以通過第二個(gè)可選的參數(shù)來設(shè)置 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) sortByKey([ascending], [numTasks]) 在一個(gè)(K,V)的RDD上調(diào)用,K必須實(shí)現(xiàn)Ordered接口,返回一個(gè)按照key進(jìn)行排序的(K,V)的RDD sortBy(func,[ascending], [numTasks]) 與sortByKey類似,但是更靈活 join(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)相同key對(duì)應(yīng)的所有元素對(duì)在一起的(K,(V,W))的RDD cogroup(otherDataset, [numTasks]) 在類型為(K,V)和(K,W)的RDD上調(diào)用,返回一個(gè)(K,(Iterable<V>,Iterable<W>))類型的RDD cartesian(otherDataset) 笛卡爾積 pipe(command, [envVars]) coalesce(numPartitions) repartition(numPartitions) repartitionAndSortWithinPartitions(partitioner)2.4.2、 Action
動(dòng)作 含義 reduce(func) 通過func函數(shù)聚集RDD中的所有元素,這個(gè)功能必須是課交換且可并聯(lián)的 collect() 在驅(qū)動(dòng)程序中,以數(shù)組的形式返回?cái)?shù)據(jù)集的所有元素 count() 返回RDD的元素個(gè)數(shù) first() 返回RDD的第一個(gè)元素(類似于take(1)) take(n) 返回一個(gè)由數(shù)據(jù)集的前n個(gè)元素組成的數(shù)組 takeSample(withReplacement,num, [seed]) 返回一個(gè)數(shù)組,該數(shù)組由從數(shù)據(jù)集中隨機(jī)采樣的num個(gè)元素組成,可以選擇是否用隨機(jī)數(shù)替換不足的部分,seed用于指定隨機(jī)數(shù)生成器種子 takeOrdered(n, [ordering]) saveAsTextFile(path) 將數(shù)據(jù)集的元素以textfile的形式保存到HDFS文件系統(tǒng)或者其他支持的文件系統(tǒng),對(duì)于每個(gè)元素,Spark將會(huì)調(diào)用toString方法,將它裝換為文件中的文本 saveAsSequenceFile(path) 將數(shù)據(jù)集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統(tǒng)。 saveAsObjectFile(path) countByKey() 針對(duì)(K,V)類型的RDD,返回一個(gè)(K,Int)的map,表示每一個(gè)key對(duì)應(yīng)的元素個(gè)數(shù)。 foreach(func) 在數(shù)據(jù)集的每一個(gè)元素上,運(yùn)行函數(shù)func進(jìn)行更新。2.5、RDD的依賴關(guān)系
RDD和它依賴的父RDD(s)的關(guān)系有兩種不同的類型,即窄依賴(narrow dependency)和寬依賴(wide dependency)。
2.5.1、窄依賴
窄依賴指的是每一個(gè)父RDD的Partition最多被子RDD的一個(gè)Partition使用
總結(jié):窄依賴我們形象的比喻為獨(dú)生子女
2.5.2、寬依賴
寬依賴指的是多個(gè)子RDD的Partition會(huì)依賴同一個(gè)父RDD的Partition
總結(jié):寬依賴我們形象的比喻為超生
2.6、RDD的緩存
Spark速度非??斓脑蛑?#xff0c;就是在不同操作中可以在內(nèi)存中持久化或緩存?zhèn)€數(shù)據(jù)集。當(dāng)持久化某個(gè)RDD后,每一個(gè)節(jié)點(diǎn)都將把計(jì)算的分片結(jié)果保存在內(nèi)存中,并在對(duì)此RDD或衍生出的RDD進(jìn)行的其他動(dòng)作中重用。這使得后續(xù)的動(dòng)作變得更加迅速。RDD相關(guān)的持久化和緩存,是Spark最重要的特征之一。可以說,緩存是Spark構(gòu)建迭代式算法和快速交互式查詢的關(guān)鍵。
2.6.1、RDD緩存方式
RDD通過persist方法或cache方法可以將前面的計(jì)算結(jié)果緩存,但是并不是這兩個(gè)方法被調(diào)用時(shí)立即緩存,而是觸發(fā)后面的action時(shí),該RDD將會(huì)被緩存在計(jì)算節(jié)點(diǎn)的內(nèi)存中,并供后面重用。
通過查看源碼發(fā)現(xiàn)cache最終也是調(diào)用了persist方法,默認(rèn)的存儲(chǔ)級(jí)別都是僅在內(nèi)存存儲(chǔ)一份,Spark的存儲(chǔ)級(jí)別還有好多種,存儲(chǔ)級(jí)別在object StorageLevel中定義的。
緩存有可能丟失,或者存儲(chǔ)存儲(chǔ)于內(nèi)存的數(shù)據(jù)由于內(nèi)存不足而被刪除,RDD的緩存容錯(cuò)機(jī)制保證了即使緩存丟失也能保證計(jì)算的正確執(zhí)行。通過基于RDD的一系列轉(zhuǎn)換,丟失的數(shù)據(jù)會(huì)被重算,由于RDD的各個(gè)Partition是相對(duì)獨(dú)立的,因此只需要計(jì)算丟失的部分即可,并不需要重算全部Partition。
2.7、DAG的生成
DAG(Directed Acyclic Graph)叫做有向無環(huán)圖,原始的RDD通過一系列的轉(zhuǎn)換就就形成了DAG,根據(jù)RDD之間的依賴關(guān)系的不同將DAG劃分成不同的Stage,對(duì)于窄依賴,partition的轉(zhuǎn)換處理在Stage中完成計(jì)算。對(duì)于寬依賴,由于有Shuffle的存在,只能在parent RDD處理完成后,才能開始接下來的計(jì)算,因此寬依賴是劃分Stage的依據(jù)。
總結(jié):
關(guān)于本篇文章,你需要了解以下幾個(gè)知識(shí)點(diǎn):
1、Spark是什么、用來干什么的、有什么特點(diǎn)。
2、RDD是什么、常用的RDD有哪些對(duì)應(yīng)的作用是什么、RDD的依賴關(guān)系有哪些。
3、DAG是什么。
總結(jié)
以上是生活随笔為你收集整理的大数据之Spark简介及RDD说明的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 肌电信号 聚类 Matlab
- 下一篇: 3┃音视频直播系统之浏览器中通过 Web