RDD:基于内存的集群计算容错抽象
轉(zhuǎn)載自:http://shiyanjun.cn/archives/744.html
摘要
本文提出了分布式內(nèi)存抽象的概念——彈性分布式數(shù)據(jù)集(RDD,Resilient Distributed Datasets),它具備像MapReduce等數(shù)據(jù)流模型的容錯(cuò)特性,并且允許開發(fā)人員在大型集群上執(zhí)行基于內(nèi)存的計(jì)算。現(xiàn)有的數(shù)據(jù)流系統(tǒng)對(duì)兩種應(yīng)用的處理并不高效:一是迭代式算法,這在圖應(yīng)用和機(jī)器學(xué)習(xí)領(lǐng)域很常見;二是交互式數(shù)據(jù)挖掘工具。這兩種情況下,將數(shù)據(jù)保存在內(nèi)存中能夠極大地提高性能。為了有效地實(shí)現(xiàn)容錯(cuò),RDD提供了一種高度受限的共享內(nèi)存,即RDD是只讀的,并且只能通過其他RDD上的批量操作來創(chuàng)建。盡管如此,RDD仍然足以表示很多類型的計(jì)算,包括MapReduce和專用的迭代編程模型(如Pregel)等。我們實(shí)現(xiàn)的RDD在迭代計(jì)算方面比Hadoop快20多倍,同時(shí)還可以在5-7秒內(nèi)交互式地查詢1TB數(shù)據(jù)集。
1.引言
無論是工業(yè)界還是學(xué)術(shù)界,都已經(jīng)廣泛使用高級(jí)集群編程模型來處理日益增長(zhǎng)的數(shù)據(jù),如MapReduce和Dryad。這些系統(tǒng)將分布式編程簡(jiǎn)化為自動(dòng)提供位置感知性調(diào)度、容錯(cuò)以及負(fù)載均衡,使得大量用戶能夠在商用集群上分析超大數(shù)據(jù)集。
大多數(shù)現(xiàn)有的集群計(jì)算系統(tǒng)都是基于非循環(huán)的數(shù)據(jù)流模型。從穩(wěn)定的物理存儲(chǔ)(如分布式文件系統(tǒng))中加載記錄,記錄被傳入由一組確定性操作構(gòu)成的DAG,然后寫回穩(wěn)定存儲(chǔ)。DAG數(shù)據(jù)流圖能夠在運(yùn)行時(shí)自動(dòng)實(shí)現(xiàn)任務(wù)調(diào)度和故障恢復(fù)。
盡管非循環(huán)數(shù)據(jù)流是一種很強(qiáng)大的抽象方法,但仍然有些應(yīng)用無法使用這種方式描述。我們就是針對(duì)這些不太適合非循環(huán)模型的應(yīng)用,它們的特點(diǎn)是在多個(gè)并行操作之間重用工作數(shù)據(jù)集。這類應(yīng)用包括:(1)機(jī)器學(xué)習(xí)和圖應(yīng)用中常用的迭代算法(每一步對(duì)數(shù)據(jù)執(zhí)行相似的函數(shù));(2)交互式數(shù)據(jù)挖掘工具(用戶反復(fù)查詢一個(gè)數(shù)據(jù)子集)。基于數(shù)據(jù)流的框架并不明確支持工作集,所以需要將數(shù)據(jù)輸出到磁盤,然后在每次查詢時(shí)重新加載,這帶來較大的開銷。
我們提出了一種分布式的內(nèi)存抽象,稱為彈性分布式數(shù)據(jù)集(RDD,Resilient Distributed Datasets)。它支持基于工作集的應(yīng)用,同時(shí)具有數(shù)據(jù)流模型的特點(diǎn):自動(dòng)容錯(cuò)、位置感知調(diào)度和可伸縮性。RDD允許用戶在執(zhí)行多個(gè)查詢時(shí)顯式地將工作集緩存在內(nèi)存中,后續(xù)的查詢能夠重用工作集,這極大地提升了查詢速度。
RDD提供了一種高度受限的共享內(nèi)存模型,即RDD是只讀的記錄分區(qū)的集合,只能通過在其他RDD執(zhí)行確定的轉(zhuǎn)換操作(如map、join和group by)而創(chuàng)建,然而這些限制使得實(shí)現(xiàn)容錯(cuò)的開銷很低。與分布式共享內(nèi)存系統(tǒng)需要付出高昂代價(jià)的檢查點(diǎn)和回滾機(jī)制不同,RDD通過Lineage來重建丟失的分區(qū):一個(gè)RDD中包含了如何從其他RDD衍生所必需的相關(guān)信息,從而不需要檢查點(diǎn)操作就可以重構(gòu)丟失的數(shù)據(jù)分區(qū)。盡管RDD不是一個(gè)通用的共享內(nèi)存抽象,但卻具備了良好的描述能力、可伸縮性和可靠性,但卻能夠廣泛適用于數(shù)據(jù)并行類應(yīng)用。
第一個(gè)指出非循環(huán)數(shù)據(jù)流存在不足的并非是我們,例如,Google的Pregel[21],是一種專門用于迭代式圖算法的編程模型;Twister[13]和HaLoop[8],是兩種典型的迭代式MapReduce模型。但是,對(duì)于一些特定類型的應(yīng)用,這些系統(tǒng)提供了一個(gè)受限的通信模型。相比之下,RDD則為基于工作集的應(yīng)用提供了更為通用的抽象,用戶可以對(duì)中間結(jié)果進(jìn)行顯式的命名和物化,控制其分區(qū),還能執(zhí)行用戶選擇的特定操作(而不是在運(yùn)行時(shí)去循環(huán)執(zhí)行一系列MapReduce步驟)。RDD可以用來描述Pregel、迭代式MapReduce,以及這兩種模型無法描述的其他應(yīng)用,如交互式數(shù)據(jù)挖掘工具(用戶將數(shù)據(jù)集裝入內(nèi)存,然后執(zhí)行ad-hoc查詢)。
Spark是我們實(shí)現(xiàn)的RDD系統(tǒng),在我們內(nèi)部能夠被用于開發(fā)多種并行應(yīng)用。Spark采用Scala語言[5]實(shí)現(xiàn),提供類似于DryadLINQ的集成語言編程接口[34],使用戶可以非常容易地編寫并行任務(wù)。此外,隨著Scala新版本解釋器的完善,Spark還能夠用于交互式查詢大數(shù)據(jù)集。我們相信Spark會(huì)是第一個(gè)能夠使用有效、通用編程語言,并在集群上對(duì)大數(shù)據(jù)集進(jìn)行交互式分析的系統(tǒng)。
我們通過微基準(zhǔn)和用戶應(yīng)用程序來評(píng)估RDD。實(shí)驗(yàn)表明,在處理迭代式應(yīng)用上Spark比Hadoop快高達(dá)20多倍,計(jì)算數(shù)據(jù)分析類報(bào)表的性能提高了40多倍,同時(shí)能夠在5-7秒的延時(shí)內(nèi)交互式掃描1TB數(shù)據(jù)集。此外,我們還在Spark之上實(shí)現(xiàn)了Pregel和HaLoop編程模型(包括其位置優(yōu)化策略),以庫的形式實(shí)現(xiàn)(分別使用了100和200行Scala代碼)。最后,利用RDD內(nèi)在的確定性特性,我們還創(chuàng)建了一種Spark調(diào)試工具rddbg,允許用戶在任務(wù)期間利用Lineage重建RDD,然后像傳統(tǒng)調(diào)試器那樣重新執(zhí)行任務(wù)。
本文首先在第2部分介紹了RDD的概念,然后第3部分描述Spark API,第4部分解釋如何使用RDD表示幾種并行應(yīng)用(包括Pregel和HaLoop),第5部分討論Spark中RDD的表示方法以及任務(wù)調(diào)度器,第6部分描述具體實(shí)現(xiàn)和rddbg,第7部分對(duì)RDD進(jìn)行評(píng)估,第8部分給出了相關(guān)研究工作,最后第9部分總結(jié)。
2.彈性分布式數(shù)據(jù)集(RDD)
本部分描述RDD和編程模型。首先討論設(shè)計(jì)目標(biāo)(2.1),然后定義RDD(2.2),討論Spark的編程模型(2.3),并給出一個(gè)示例(2.4),最后對(duì)比RDD與分布式共享內(nèi)存(2.5)。
2.1 目標(biāo)和概述
我們的目標(biāo)是為基于工作集的應(yīng)用(即多個(gè)并行操作重用中間結(jié)果的這類應(yīng)用)提供抽象,同時(shí)保持MapReduce及其相關(guān)模型的優(yōu)勢(shì)特性:即自動(dòng)容錯(cuò)、位置感知性調(diào)度和可伸縮性。RDD比數(shù)據(jù)流模型更易于編程,同時(shí)基于工作集的計(jì)算也具有良好的描述能力。
在這些特性中,最難實(shí)現(xiàn)的是容錯(cuò)性。一般來說,分布式數(shù)據(jù)集的容錯(cuò)性有兩種方式:即數(shù)據(jù)檢查點(diǎn)和記錄數(shù)據(jù)的更新。我們面向的是大規(guī)模數(shù)據(jù)分析,數(shù)據(jù)檢查點(diǎn)操作成本很高:需要通過數(shù)據(jù)中心的網(wǎng)絡(luò)連接在機(jī)器之間復(fù)制龐大的數(shù)據(jù)集,而網(wǎng)絡(luò)帶寬往往比內(nèi)存帶寬低得多,同時(shí)還需要消耗更多的存儲(chǔ)資源(在內(nèi)存中復(fù)制數(shù)據(jù)可以減少需要緩存的數(shù)據(jù)量,而存儲(chǔ)到磁盤則會(huì)拖慢應(yīng)用程序)。所以,我們選擇記錄更新的方式。但是,如果更新太多,那么記錄更新成本也不低。因此,RDD只支持粗粒度轉(zhuǎn)換,即在大量記錄上執(zhí)行的單個(gè)操作。將創(chuàng)建RDD的一系列轉(zhuǎn)換記錄下來(即Lineage),以便恢復(fù)丟失的分區(qū)。
雖然只支持粗粒度轉(zhuǎn)換限制了編程模型,但我們發(fā)現(xiàn)RDD仍然可以很好地適用于很多應(yīng)用,特別是支持?jǐn)?shù)據(jù)并行的批量分析應(yīng)用,包括數(shù)據(jù)挖掘、機(jī)器學(xué)習(xí)、圖算法等,因?yàn)檫@些程序通常都會(huì)在很多記錄上執(zhí)行相同的操作。RDD不太適合那些異步更新共享狀態(tài)的應(yīng)用,例如并行web爬行器。因此,我們的目標(biāo)是為大多數(shù)分析型應(yīng)用提供有效的編程模型,而其他類型的應(yīng)用交給專門的系統(tǒng)。
2.2 RDD抽象
RDD是只讀的、分區(qū)記錄的集合。RDD只能基于在穩(wěn)定物理存儲(chǔ)中的數(shù)據(jù)集和其他已有的RDD上執(zhí)行確定性操作來創(chuàng)建。這些確定性操作稱之為轉(zhuǎn)換,如map、filter、groupBy、join(轉(zhuǎn)換不是程開發(fā)人員在RDD上執(zhí)行的操作)。
RDD不需要物化。RDD含有如何從其他RDD衍生(即計(jì)算)出本RDD的相關(guān)信息(即Lineage),據(jù)此可以從物理存儲(chǔ)的數(shù)據(jù)計(jì)算出相應(yīng)的RDD分區(qū)。
2.3 編程模型
在Spark中,RDD被表示為對(duì)象,通過這些對(duì)象上的方法(或函數(shù))調(diào)用轉(zhuǎn)換。
定義RDD之后,程序員就可以在動(dòng)作中使用RDD了。動(dòng)作是向應(yīng)用程序返回值,或向存儲(chǔ)系統(tǒng)導(dǎo)出數(shù)據(jù)的那些操作,例如,count(返回RDD中的元素個(gè)數(shù)),collect(返回元素本身),save(將RDD輸出到存儲(chǔ)系統(tǒng))。在Spark中,只有在動(dòng)作第一次使用RDD時(shí),才會(huì)計(jì)算RDD(即延遲計(jì)算)。這樣在構(gòu)建RDD的時(shí)候,運(yùn)行時(shí)通過管道的方式傳輸多個(gè)轉(zhuǎn)換。
程序員還可以從兩個(gè)方面控制RDD,即緩存和分區(qū)。用戶可以請(qǐng)求將RDD緩存,這樣運(yùn)行時(shí)將已經(jīng)計(jì)算好的RDD分區(qū)存儲(chǔ)起來,以加速后期的重用。緩存的RDD一般存儲(chǔ)在內(nèi)存中,但如果內(nèi)存不夠,可以寫到磁盤上。
另一方面,RDD還允許用戶根據(jù)關(guān)鍵字(key)指定分區(qū)順序,這是一個(gè)可選的功能。目前支持哈希分區(qū)和范圍分區(qū)。例如,應(yīng)用程序請(qǐng)求將兩個(gè)RDD按照同樣的哈希分區(qū)方式進(jìn)行分區(qū)(將同一機(jī)器上具有相同關(guān)鍵字的記錄放在一個(gè)分區(qū)),以加速它們之間的join操作。在Pregel和HaLoop中,多次迭代之間采用一致性的分區(qū)置換策略進(jìn)行優(yōu)化,我們同樣也允許用戶指定這種優(yōu)化。
2.4 示例:控制臺(tái)日志挖掘
本部分我們通過一個(gè)具體示例來闡述RDD。假定有一個(gè)大型網(wǎng)站出錯(cuò),操作員想要檢查Hadoop文件系統(tǒng)(HDFS)中的日志文件(TB級(jí)大小)來找出原因。通過使用Spark,操作員只需將日志中的錯(cuò)誤信息裝載到一組節(jié)點(diǎn)的內(nèi)存中,然后執(zhí)行交互式查詢。首先,需要在Spark解釋器中輸入如下Scala命令:
| 1 | lines?=?spark.textFile("hdfs://...") |
| 2 | errors?=?lines.filter(_.startsWith("ERROR")) |
| 3 | errors.cache() |
第1行從HDFS文件定義了一個(gè)RDD(即一個(gè)文本行集合),第2行獲得一個(gè)過濾后的RDD,第3行請(qǐng)求將errors緩存起來。注意在Scala語法中filter的參數(shù)是一個(gè)閉包。
這時(shí)集群還沒有開始執(zhí)行任何任務(wù)。但是,用戶已經(jīng)可以在這個(gè)RDD上執(zhí)行對(duì)應(yīng)的動(dòng)作,例如統(tǒng)計(jì)錯(cuò)誤消息的數(shù)目:
| 1 | errors.count() |
用戶還可以在RDD上執(zhí)行更多的轉(zhuǎn)換操作,并使用轉(zhuǎn)換結(jié)果,如:
| 1 | // Count errors mentioning MySQL: |
| 2 | errors.filter(_.contains("MySQL")).count() |
| 3 | // Return the time fields of errors mentioning |
| 4 | // HDFS as an array (assuming time is field |
| 5 | // number 3 in a tab-separated format): |
| 6 | errors.filter(_.contains("HDFS")) |
| 7 | ????.map(_.split('\t')(3)) |
| 8 | ????.collect() |
使用errors的第一個(gè)action運(yùn)行以后,Spark會(huì)把errors的分區(qū)緩存在內(nèi)存中,極大地加快了后續(xù)計(jì)算速度。注意,最初的RDD lines不會(huì)被緩存。因?yàn)殄e(cuò)誤信息可能只占原數(shù)據(jù)集的很小一部分(小到足以放入內(nèi)存)。
最后,為了說明模型的容錯(cuò)性,圖1給出了第3個(gè)查詢的Lineage圖。在lines RDD上執(zhí)行filter操作,得到errors,然后再filter、map后得到新的RDD,在這個(gè)RDD上執(zhí)行collect操作。Spark調(diào)度器以流水線的方式執(zhí)行后兩個(gè)轉(zhuǎn)換,向擁有errors分區(qū)緩存的節(jié)點(diǎn)發(fā)送一組任務(wù)。此外,如果某個(gè)errors分區(qū)丟失,Spark只在相應(yīng)的lines分區(qū)上執(zhí)行filter操作來重建該errors分區(qū)。
圖1?示例中第三個(gè)查詢的Lineage圖。(方框表示RDD,箭頭表示轉(zhuǎn)換)
2.5 RDD與分布式共享內(nèi)存
為了進(jìn)一步理解RDD是一種分布式的內(nèi)存抽象,表1列出了RDD與分布式共享內(nèi)存(DSM,Distributed Shared Memory)[24]的對(duì)比。在DSM系統(tǒng)中,應(yīng)用可以向全局地址空間的任意位置進(jìn)行讀寫操作。(注意這里的DSM,不僅指?jìng)鹘y(tǒng)的共享內(nèi)存系統(tǒng),還包括那些通過分布式哈希表或分布式文件系統(tǒng)進(jìn)行數(shù)據(jù)共享的系統(tǒng),比如Piccolo[28])DSM是一種通用的抽象,但這種通用性同時(shí)也使得在商用集群上實(shí)現(xiàn)有效的容錯(cuò)性更加困難。
RDD與DSM主要區(qū)別在于,不僅可以通過批量轉(zhuǎn)換創(chuàng)建(即“寫”)RDD,還可以對(duì)任意內(nèi)存位置讀寫。也就是說,RDD限制應(yīng)用執(zhí)行批量寫操作,這樣有利于實(shí)現(xiàn)有效的容錯(cuò)。特別地,RDD沒有檢查點(diǎn)開銷,因?yàn)榭梢允褂肔ineage來恢復(fù)RDD。而且,失效時(shí)只需要重新計(jì)算丟失的那些RDD分區(qū),可以在不同節(jié)點(diǎn)上并行執(zhí)行,而不需要回滾整個(gè)程序。
| 對(duì)比項(xiàng)目 | RDD | 分布式共享內(nèi)存(DSM) |
| 讀 | 批量或細(xì)粒度操作 | 細(xì)粒度操作 |
| 寫 | 批量轉(zhuǎn)換操作 | 細(xì)粒度操作 |
| 一致性 | 不重要(RDD是不可更改的) | 取決于應(yīng)用程序或運(yùn)行時(shí) |
| 容錯(cuò)性 | 細(xì)粒度,低開銷(使用Lineage) | 需要檢查點(diǎn)操作和程序回滾 |
| 落后任務(wù)的處理 | 任務(wù)備份 | 很難處理 |
| 任務(wù)安排 | 基于數(shù)據(jù)存放的位置自動(dòng)實(shí)現(xiàn) | 取決于應(yīng)用程序(通過運(yùn)行時(shí)實(shí)現(xiàn)透明性) |
| 如果內(nèi)存不夠 | 與已有的數(shù)據(jù)流系統(tǒng)類似 | 性能較差(交換?) |
注意,通過備份任務(wù)的拷貝,RDD還可以處理落后任務(wù)(即運(yùn)行很慢的節(jié)點(diǎn)),這點(diǎn)與MapReduce[12]類似。而DSM則難以實(shí)現(xiàn)備份任務(wù),因?yàn)槿蝿?wù)及其副本都需要讀寫同一個(gè)內(nèi)存位置。
與DSM相比,RDD模型有兩個(gè)好處。第一,對(duì)于RDD中的批量操作,運(yùn)行時(shí)將根據(jù)數(shù)據(jù)存放的位置來調(diào)度任務(wù),從而提高性能。第二,對(duì)于基于掃描的操作,如果內(nèi)存不足以緩存整個(gè)RDD,就進(jìn)行部分緩存。把內(nèi)存放不下的分區(qū)存儲(chǔ)到磁盤上,此時(shí)性能與現(xiàn)有的數(shù)據(jù)流系統(tǒng)差不多。
最后看一下讀操作的粒度。RDD上的很多動(dòng)作(如count和collect)都是批量讀操作,即掃描整個(gè)數(shù)據(jù)集,可以將任務(wù)分配到距離數(shù)據(jù)最近的節(jié)點(diǎn)上。同時(shí),RDD也支持細(xì)粒度操作,即在哈希或范圍分區(qū)的RDD上執(zhí)行關(guān)鍵字查找。
3. Spark編程接口
Spark用Scala[5]語言實(shí)現(xiàn)了RDD的API。Scala是一種基于JVM的靜態(tài)類型、函數(shù)式、面向?qū)ο蟮恼Z言。我們選擇Scala是因?yàn)樗?jiǎn)潔(特別適合交互式使用)、有效(因?yàn)槭庆o態(tài)類型)。但是,RDD抽象并不局限于函數(shù)式語言,也可以使用其他語言來實(shí)現(xiàn)RDD,比如像Hadoop[2]那樣用類表示用戶函數(shù)。
要使用Spark,開發(fā)者需要編寫一個(gè)driver程序,連接到集群以運(yùn)行Worker,如圖2所示。Driver定義了一個(gè)或多個(gè)RDD,并調(diào)用RDD上的動(dòng)作。Worker是長(zhǎng)時(shí)間運(yùn)行的進(jìn)程,將RDD分區(qū)以Java對(duì)象的形式緩存在內(nèi)存中。
圖2?Spark的運(yùn)行時(shí)。用戶的driver程序啟動(dòng)多個(gè)worker,worker從分布式文件系統(tǒng)中讀取數(shù)據(jù)塊,并將計(jì)算后的RDD分區(qū)緩存在內(nèi)存中。
再看看2.4中的例子,用戶執(zhí)行RDD操作時(shí)會(huì)提供參數(shù),比如map傳遞一個(gè)閉包(closure,函數(shù)式編程中的概念)。Scala將閉包表示為Java對(duì)象,如果傳遞的參數(shù)是閉包,則這些對(duì)象被序列化,通過網(wǎng)絡(luò)傳輸?shù)狡渌?jié)點(diǎn)上進(jìn)行裝載。Scala將閉包內(nèi)的變量保存為Java對(duì)象的字段。例如,var x = 5; rdd.map(_ + x) 這段代碼將RDD中的每個(gè)元素加5。總的來說,Spark的語言集成類似于DryadLINQ。
RDD本身是靜態(tài)類型對(duì)象,由參數(shù)指定其元素類型。例如,RDD[int]是一個(gè)整型RDD。不過,我們舉的例子幾乎都省略了這個(gè)類型參數(shù),因?yàn)镾cala支持類型推斷。
雖然在概念上使用Scala實(shí)現(xiàn)RDD很簡(jiǎn)單,但還是要處理一些Scala閉包對(duì)象的反射問題。如何通過Scala解釋器來使用Spark還需要更多工作,這點(diǎn)我們將在第6部分討論。不管怎樣,我們都不需要修改Scala編譯器。
3.1 Spark中的RDD操作
表2列出了Spark中的RDD轉(zhuǎn)換和動(dòng)作。每個(gè)操作都給出了標(biāo)識(shí),其中方括號(hào)表示類型參數(shù)。前面說過轉(zhuǎn)換是延遲操作,用于定義新的RDD;而動(dòng)作啟動(dòng)計(jì)算操作,并向用戶程序返回值或向外部存儲(chǔ)寫數(shù)據(jù)。
| 轉(zhuǎn)換 | map(f : T ) U) : RDD[T] ) RDD[U] filter(f : T ) Bool) : RDD[T] ) RDD[T] flatMap(f : T ) Seq[U]) : RDD[T] ) RDD[U] sample(fraction : Float) : RDD[T] ) RDD[T] (Deterministic sampling) groupByKey() : RDD[(K, V)] ) RDD[(K, Seq[V])] reduceByKey(f : (V; V) ) V) : RDD[(K, V)] ) RDD[(K, V)] union() : (RDD[T]; RDD[T]) ) RDD[T] join() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (V, W))] cogroup() : (RDD[(K, V)]; RDD[(K, W)]) ) RDD[(K, (Seq[V], Seq[W]))] crossProduct() : (RDD[T]; RDD[U]) ) RDD[(T, U)] mapValues(f : V ) W) : RDD[(K, V)] ) RDD[(K, W)] (Preserves partitioning) sort(c : Comparator[K]) : RDD[(K, V)] ) RDD[(K, V)] partitionBy(p : Partitioner[K]) : RDD[(K, V)] ) RDD[(K, V)] |
| 動(dòng)作 | count() : RDD[T] ) Long collect() : RDD[T] ) Seq[T] reduce(f : (T; T) ) T) : RDD[T] ) T lookup(k : K) : RDD[(K, V)] ) Seq[V] (On hash/range partitioned RDDs) save(path : String) : Outputs RDD to a storage system, e.g., HDFS |
注意,有些操作只對(duì)鍵值對(duì)可用,比如join。另外,函數(shù)名與Scala及其他函數(shù)式語言中的API匹配,例如map是一對(duì)一的映射,而flatMap是將每個(gè)輸入映射為一個(gè)或多個(gè)輸出(與MapReduce中的map類似)。
除了這些操作以外,用戶還可以請(qǐng)求將RDD緩存起來。而且,用戶還可以通過Partitioner類獲取RDD的分區(qū)順序,然后將另一個(gè)RDD按照同樣的方式分區(qū)。有些操作會(huì)自動(dòng)產(chǎn)生一個(gè)哈希或范圍分區(qū)的RDD,像groupByKey,reduceByKey和sort等。
4. 應(yīng)用程序示例
現(xiàn)在我們講述如何使用RDD表示幾種基于數(shù)據(jù)并行的應(yīng)用。首先討論一些迭代式機(jī)器學(xué)習(xí)應(yīng)用(4.1),然后看看如何使用RDD描述幾種已有的集群編程模型,即MapReduce(4.2),Pregel(4.3),和Hadoop(4.4)。最后討論一下RDD不適合哪些應(yīng)用(4.5)。
4.1 迭代式機(jī)器學(xué)習(xí)
很多機(jī)器學(xué)習(xí)算法都具有迭代特性,運(yùn)行迭代優(yōu)化方法來優(yōu)化某個(gè)目標(biāo)函數(shù),例如梯度下降方法。如果這些算法的工作集能夠放入內(nèi)存,將極大地加速程序運(yùn)行。而且,這些算法通常采用批量操作,例如映射和求和,這樣更容易使用RDD來表示。
例如下面的程序是邏輯回歸[15]的實(shí)現(xiàn)。邏輯回歸是一種常見的分類算法,即尋找一個(gè)最佳分割兩組點(diǎn)(即垃圾郵件和非垃圾郵件)的超平面w。算法采用梯度下降的方法:開始時(shí)w為隨機(jī)值,在每一次迭代的過程中,對(duì)w的函數(shù)求和,然后朝著優(yōu)化的方向移動(dòng)w。
| 1 | val?points?=?spark.textFile(...) |
| 2 | ?????.map(parsePoint).persist() |
| 3 | var?w?=?// random initial vector |
| 4 | for?(i <-?1?to ITERATIONS) { |
| 5 | ?????val?gradient?=?points.map{ p?=> |
| 6 | ??????????p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y |
| 7 | ?????}.reduce((a,b)?=> a+b) |
| 8 | ?????w -=?gradient |
| 9 | } |
首先定義一個(gè)名為points的緩存RDD,這是在文本文件上執(zhí)行map轉(zhuǎn)換之后得到的,即將每個(gè)文本行解析為一個(gè)Point對(duì)象。然后在points上反復(fù)執(zhí)行map和reduce操作,每次迭代時(shí)通過對(duì)當(dāng)前w的函數(shù)進(jìn)行求和來計(jì)算梯度。7.1小節(jié)我們將看到這種在內(nèi)存中緩存points的方式,比每次迭代都從磁盤文件裝載數(shù)據(jù)并進(jìn)行解析要快得多。
已經(jīng)在Spark中實(shí)現(xiàn)的迭代式機(jī)器學(xué)習(xí)算法還有:kmeans(像邏輯回歸一樣每次迭代時(shí)執(zhí)行一對(duì)map和reduce操作),期望最大化算法(EM,兩個(gè)不同的map/reduce步驟交替執(zhí)行),交替最小二乘矩陣分解和協(xié)同過濾算法。Chu等人提出迭代式MapReduce也可以用來實(shí)現(xiàn)常用的學(xué)習(xí)算法[11]。
4.2 使用RDD實(shí)現(xiàn)MapReduce
MapReduce模型[12]很容易使用RDD進(jìn)行描述。假設(shè)有一個(gè)輸入數(shù)據(jù)集(其元素類型為T),和兩個(gè)函數(shù)myMap: T => List[(Ki, Vi)] 和 myReduce: (Ki; List[Vi]) ) List[R],代碼如下:
| 1 | data.flatMap(myMap) |
| 2 | ????.groupByKey() |
| 3 | ????.map((k, vs)?=> myReduce(k, vs)) |
如果任務(wù)包含combiner,則相應(yīng)的代碼為:
| 1 | data.flatMap(myMap) |
| 2 | ????.reduceByKey(myCombiner) |
| 3 | ????.map((k, v)?=> myReduce(k, v)) |
ReduceByKey操作在mapper節(jié)點(diǎn)上執(zhí)行部分聚集,與MapReduce的combiner類似。
4.3 使用RDD實(shí)現(xiàn)Pregel
Pregel[21]是面向圖算法的基于BSP范式[32]的編程模型。程序由一系列超步(Superstep)協(xié)調(diào)迭代運(yùn)行。在每個(gè)超步中,各個(gè)頂點(diǎn)執(zhí)行用戶函數(shù),并更新相應(yīng)的頂點(diǎn)狀態(tài),變異圖拓?fù)?#xff0c;然后向下一個(gè)超步的頂點(diǎn)集發(fā)送消息。這種模型能夠描述很多圖算法,包括最短路徑,雙邊匹配和PageRank等。
以PageRank為例介紹一下Pregel的實(shí)現(xiàn)。當(dāng)前PageRank[7]記為r,頂點(diǎn)表示狀態(tài)。在每個(gè)超步中,各個(gè)頂點(diǎn)向其所有鄰居發(fā)送貢獻(xiàn)值r/n,這里n是鄰居的數(shù)目。下一個(gè)超步開始時(shí),每個(gè)頂點(diǎn)將其分值(rank)更新為 α/N + (1 - α) * Σci,這里的求和是各個(gè)頂點(diǎn)收到的所有貢獻(xiàn)值的和,N是頂點(diǎn)的總數(shù)。
Pregel將輸入的圖劃分到各個(gè)worker上,并存儲(chǔ)在其內(nèi)存中。在每個(gè)超步中,各個(gè)worker通過一種類似MapReduce的Shuffle操作交換消息。
Pregel的通信模式可以用RDD來描述,如圖3。主要思想是:將每個(gè)超步中的頂點(diǎn)狀態(tài)和要發(fā)送的消息存儲(chǔ)為RDD,然后根據(jù)頂點(diǎn)ID分組,進(jìn)行Shuffle通信(即cogroup操作)。然后對(duì)每個(gè)頂點(diǎn)ID上的狀態(tài)和消息應(yīng)用用戶函數(shù)(即mapValues操作),產(chǎn)生一個(gè)新的RDD,即(VertexID, (NewState, OutgoingMessages))。然后執(zhí)行map操作分離出下一次迭代的頂點(diǎn)狀態(tài)和消息(即mapValues和flatMap操作)。代碼如下:
| 1 | val?vertices?=?// RDD of (ID, State) pairs |
| 2 | val?messages?=?// RDD of (ID, Message) pairs |
| 3 | val?grouped?=?vertices.cogroup(messages) |
| 4 | val?newData?=?grouped.mapValues { |
| 5 | ????(vert, msgs)?=> userFunc(vert, msgs) |
| 6 | ????// returns (newState, outgoingMsgs) |
| 7 | }.cache() |
| 8 | val?newVerts?=?newData.mapValues((v,ms)?=> v) |
| 9 | val?newMsgs?=?newData.flatMap((id,(v,ms))?=> ms) |
圖3?使用RDD實(shí)現(xiàn)Pregel時(shí),一步迭代的數(shù)據(jù)流。(方框表示RDD,箭頭表示轉(zhuǎn)換)
需要注意的是,這種實(shí)現(xiàn)方法中,RDD grouped,newData和newVerts的分區(qū)方法與輸入RDD vertices一樣。所以,頂點(diǎn)狀態(tài)一直存在于它們開始執(zhí)行的機(jī)器上,這跟原Pregel一樣,這樣就減少了通信成本。因?yàn)閏ogroup和mapValues保持了與輸入RDD相同的分區(qū)方法,所以分區(qū)是自動(dòng)進(jìn)行的。
完整的Pregel編程模型還包括其他工具,比如combiner,附錄A討論了它們的實(shí)現(xiàn)。下面將討論P(yáng)regel的容錯(cuò)性,以及如何在實(shí)現(xiàn)相同容錯(cuò)性的同時(shí)減少需要執(zhí)行檢查點(diǎn)操作的數(shù)據(jù)量。
我們差不多用了100行Scala代碼在Spark上實(shí)現(xiàn)了一個(gè)類Pregel的API。7.2小節(jié)將使用PageRank算法評(píng)估它的性能。
4.3.1 Pregel容錯(cuò)
當(dāng)前,Pregel基于檢查點(diǎn)機(jī)制來為頂點(diǎn)狀態(tài)及其消息實(shí)現(xiàn)容錯(cuò)[21]。然而作者是這樣描述的:通過在其它的節(jié)點(diǎn)上記錄已發(fā)消息日志,然后單獨(dú)重建丟失的分區(qū),只需要恢復(fù)局部數(shù)據(jù)即可。上面提到這兩種方式,RDD都能夠很好地支持。
通過4.3小節(jié)的實(shí)現(xiàn),Spark總是能夠基于Lineage實(shí)現(xiàn)頂點(diǎn)和消息RDD的重建,但是由于過長(zhǎng)的Lineage鏈,恢復(fù)可能會(huì)付出高昂的代價(jià)。因?yàn)榈鶵DD依賴于上一個(gè)RDD,對(duì)于部分分區(qū)來說,節(jié)點(diǎn)故障可能會(huì)導(dǎo)致這些分區(qū)狀態(tài)的所有迭代版本丟失,這就要求使用一種“級(jí)聯(lián)-重新執(zhí)行”[20]的方式去依次重建每一個(gè)丟失的分區(qū)。為了避免這個(gè)問題,用戶可以周期性地在頂點(diǎn)和消息RDD上執(zhí)行save操作,將狀態(tài)信息保存到持久存儲(chǔ)中。然后,Spark能夠在失敗的時(shí)候自動(dòng)地重新計(jì)算這些丟失的分區(qū)(而不是回滾整個(gè)程序)。
最后,我們意識(shí)到,RDD也能夠?qū)崿F(xiàn)檢查點(diǎn)數(shù)據(jù)的reduce操作,這要求通過一種高效的檢查點(diǎn)方案來表達(dá)檢查點(diǎn)數(shù)據(jù)。在很多Pregel作業(yè)中,頂點(diǎn)狀態(tài)都包括可變與不可變的組件,例如,在PageRank中,與一個(gè)頂點(diǎn)相鄰的頂點(diǎn)列表是不可變的,但是它們的排名是可變的,在這種情況下,我們可以使用一個(gè)來自可變數(shù)據(jù)的單獨(dú)RDD來替換不可變RDD,基于這樣一個(gè)較短的Lineage鏈,檢查點(diǎn)僅僅是可變狀態(tài),圖4解釋了這種方式。
圖4?經(jīng)過優(yōu)化的Pregel使用RDD的數(shù)據(jù)流。可變狀態(tài)RDD必須設(shè)置檢查點(diǎn),不可變狀態(tài)才可被快速重建。
在PageRank中,不可變狀態(tài)(相鄰頂點(diǎn)列表)遠(yuǎn)大于可變狀態(tài)(浮點(diǎn)值),所以這種方式能夠極大地降低開銷。
4.4 使用RDD實(shí)現(xiàn)HaLoop
HaLoop[8]是Hadoop的一個(gè)擴(kuò)展版本,它能夠改善具有迭代特性的MapReduce程序的性能。基于HaLoop編程模型的應(yīng)用,使用reduce階段的輸出作為map階段下一輪迭代的輸入。它的循環(huán)感知任務(wù)調(diào)度器能夠保證,在每一輪迭代中處理同一個(gè)分區(qū)數(shù)據(jù)的連續(xù)map和reduce任務(wù),一定能夠在同一臺(tái)物理機(jī)上執(zhí)行。確保迭代間locality特性,reduce數(shù)據(jù)在物理節(jié)點(diǎn)之間傳輸,并且允許數(shù)據(jù)緩存在本地磁盤而能夠被后續(xù)迭代重用。
使用RDD來優(yōu)化HaLoop,我們?cè)赟park上實(shí)現(xiàn)了一個(gè)類似HaLoop的API,這個(gè)庫只使用了200行Scala代碼。通過partitionBy能夠保證跨迭代的分區(qū)的一致性,每一個(gè)階段的輸入和輸出被緩存以用于后續(xù)迭代。
4.5 不適合使用RDD的應(yīng)用
在2.1節(jié)我們討論過,RDD適用于具有批量轉(zhuǎn)換需求的應(yīng)用,并且相同的操作作用于數(shù)據(jù)集的每一個(gè)元素上。在這種情況下,RDD能夠記住每個(gè)轉(zhuǎn)換操作,對(duì)應(yīng)于Lineage圖中的一個(gè)步驟,恢復(fù)丟失分區(qū)數(shù)據(jù)時(shí)不需要寫日志記錄大量數(shù)據(jù)。RDD不適合那些通過異步細(xì)粒度地更新來共享狀態(tài)的應(yīng)用,例如Web應(yīng)用中的存儲(chǔ)系統(tǒng),或者增量抓取和索引Web數(shù)據(jù)的系統(tǒng),這樣的應(yīng)用更適合使用一些傳統(tǒng)的方法,例如數(shù)據(jù)庫、RAMCloud[26]、Percolator[27]和Piccolo[28]。我們的目標(biāo)是,面向批量分析應(yīng)用的這類特定系統(tǒng),提供一種高效的編程模型,而不是一些異步應(yīng)用程序。
5. RDD的描述及作業(yè)調(diào)度
我們希望在不修改調(diào)度器的前提下,支持RDD上的各種轉(zhuǎn)換操作,同時(shí)能夠從這些轉(zhuǎn)換獲取Lineage信息。為此,我們?yōu)镽DD設(shè)計(jì)了一組小型通用的內(nèi)部接口。
簡(jiǎn)單地說,每個(gè)RDD都包含:(1)一組RDD分區(qū)(partition,即數(shù)據(jù)集的原子組成部分);(2)對(duì)父RDD的一組依賴,這些依賴描述了RDD的Lineage;(3)一個(gè)函數(shù),即在父RDD上執(zhí)行何種計(jì)算;(4)元數(shù)據(jù),描述分區(qū)模式和數(shù)據(jù)存放的位置。例如,一個(gè)表示HDFS文件的RDD包含:各個(gè)數(shù)據(jù)塊的一個(gè)分區(qū),并知道各個(gè)數(shù)據(jù)塊放在哪些節(jié)點(diǎn)上。而且這個(gè)RDD上的map操作結(jié)果也具有同樣的分區(qū),map函數(shù)是在父數(shù)據(jù)上執(zhí)行的。表3總結(jié)了RDD的內(nèi)部接口。
| 操作 | 含義 |
| partitions() | 返回一組Partition對(duì)象 |
| preferredLocations(p) | 根據(jù)數(shù)據(jù)存放的位置,返回分區(qū)p在哪些節(jié)點(diǎn)訪問更快 |
| dependencies() | 返回一組依賴 |
| iterator(p, parentIters) | 按照父分區(qū)的迭代器,逐個(gè)計(jì)算分區(qū)p的元素 |
| partitioner() | 返回RDD是否hash/range分區(qū)的元數(shù)據(jù)信息 |
設(shè)計(jì)接口的一個(gè)關(guān)鍵問題就是,如何表示RDD之間的依賴。我們發(fā)現(xiàn)RDD之間的依賴關(guān)系可以分為兩類,即:(1)窄依賴(narrow dependencies):子RDD的每個(gè)分區(qū)依賴于常數(shù)個(gè)父分區(qū)(即與數(shù)據(jù)規(guī)模無關(guān));(2)寬依賴(wide dependencies):子RDD的每個(gè)分區(qū)依賴于所有父RDD分區(qū)。例如,map產(chǎn)生窄依賴,而join則是寬依賴(除非父RDD被哈希分區(qū))。另一個(gè)例子見圖5。
圖5?窄依賴和寬依賴的例子。(方框表示RDD,實(shí)心矩形表示分區(qū))
區(qū)分這兩種依賴很有用。首先,窄依賴允許在一個(gè)集群節(jié)點(diǎn)上以流水線的方式(pipeline)計(jì)算所有父分區(qū)。例如,逐個(gè)元素地執(zhí)行map、然后filter操作;而寬依賴則需要首先計(jì)算好所有父分區(qū)數(shù)據(jù),然后在節(jié)點(diǎn)之間進(jìn)行Shuffle,這與MapReduce類似。第二,窄依賴能夠更有效地進(jìn)行失效節(jié)點(diǎn)的恢復(fù),即只需重新計(jì)算丟失RDD分區(qū)的父分區(qū),而且不同節(jié)點(diǎn)之間可以并行計(jì)算;而對(duì)于一個(gè)寬依賴關(guān)系的Lineage圖,單個(gè)節(jié)點(diǎn)失效可能導(dǎo)致這個(gè)RDD的所有祖先丟失部分分區(qū),因而需要整體重新計(jì)算。
通過RDD接口,Spark只需要不超過20行代碼實(shí)現(xiàn)便可以實(shí)現(xiàn)大多數(shù)轉(zhuǎn)換。5.1小節(jié)給出了例子,然后我們討論了怎樣使用RDD接口進(jìn)行調(diào)度(5.2),最后討論一下基于RDD的程序何時(shí)需要數(shù)據(jù)檢查點(diǎn)操作(5.3)。
5.1 RDD實(shí)現(xiàn)舉例
HDFS文件:目前為止我們給的例子中輸入RDD都是HDFS文件,對(duì)這些RDD可以執(zhí)行:partitions操作返回各個(gè)數(shù)據(jù)塊的一個(gè)分區(qū)(每個(gè)Partition對(duì)象中保存數(shù)據(jù)塊的偏移),preferredLocations操作返回?cái)?shù)據(jù)塊所在的節(jié)點(diǎn)列表,iterator操作對(duì)數(shù)據(jù)塊進(jìn)行讀取。
map:任何RDD上都可以執(zhí)行map操作,返回一個(gè)MappedRDD對(duì)象。該操作傳遞一個(gè)函數(shù)參數(shù)給map,對(duì)父RDD上的記錄按照iterator的方式執(zhí)行這個(gè)函數(shù),并返回一組符合條件的父RDD分區(qū)及其位置。
union:在兩個(gè)RDD上執(zhí)行union操作,返回兩個(gè)父RDD分區(qū)的并集。通過相應(yīng)父RDD上的窄依賴關(guān)系計(jì)算每個(gè)子RDD分區(qū)(注意union操作不會(huì)過濾重復(fù)值,相當(dāng)于SQL中的UNION ALL)。
sample:抽樣與映射類似,但是sample操作中,RDD需要存儲(chǔ)一個(gè)隨機(jī)數(shù)產(chǎn)生器的種子,這樣每個(gè)分區(qū)能夠確定哪些父RDD記錄被抽樣。
join:對(duì)兩個(gè)RDD執(zhí)行join操作可能產(chǎn)生窄依賴(如果這兩個(gè)RDD擁有相同的哈希分區(qū)或范圍分區(qū)),可能是寬依賴,也可能兩種依賴都有(比如一個(gè)父RDD有分區(qū),而另一父RDD沒有)。
5.2 Spark任務(wù)調(diào)度器
調(diào)度器根據(jù)RDD的結(jié)構(gòu)信息為每個(gè)動(dòng)作確定有效的執(zhí)行計(jì)劃。調(diào)度器的接口是runJob函數(shù),參數(shù)為RDD及其分區(qū)集,和一個(gè)RDD分區(qū)上的函數(shù)。該接口足以表示Spark中的所有動(dòng)作(即count、collect、save等)。
總的來說,我們的調(diào)度器跟Dryad類似,但我們還考慮了哪些RDD分區(qū)是緩存在內(nèi)存中的。調(diào)度器根據(jù)目標(biāo)RDD的Lineage圖創(chuàng)建一個(gè)由stage構(gòu)成的無回路有向圖(DAG)。每個(gè)stage內(nèi)部盡可能多地包含一組具有窄依賴關(guān)系的轉(zhuǎn)換,并將它們流水線并行化(pipeline)。stage的邊界有兩種情況:一是寬依賴上的Shuffle操作;二是已緩存分區(qū),它可以縮短父RDD的計(jì)算過程。例如圖6。父RDD完成計(jì)算后,可以在stage內(nèi)啟動(dòng)一組任務(wù)計(jì)算丟失的分區(qū)。
圖6?Spark怎樣劃分任務(wù)階段(stage)的例子。實(shí)線方框表示RDD,實(shí)心矩形表示分區(qū)(黑色表示該分區(qū)被緩存)。要在RDD G上執(zhí)行一個(gè)動(dòng)作,調(diào)度器根據(jù)寬依賴創(chuàng)建一組stage,并在每個(gè)stage內(nèi)部將具有窄依賴的轉(zhuǎn)換流水線化(pipeline)。 本例不用再執(zhí)行stage 1,因?yàn)锽已經(jīng)存在于緩存中了,所以只需要運(yùn)行2和3。
調(diào)度器根據(jù)數(shù)據(jù)存放的位置分配任務(wù),以最小化通信開銷。如果某個(gè)任務(wù)需要處理一個(gè)已緩存分區(qū),則直接將任務(wù)分配給擁有這個(gè)分區(qū)的節(jié)點(diǎn)。否則,如果需要處理的分區(qū)位于多個(gè)可能的位置(例如,由HDFS的數(shù)據(jù)存放位置決定),則將任務(wù)分配給這一組節(jié)點(diǎn)。
對(duì)于寬依賴(例如需要Shuffle的依賴),目前的實(shí)現(xiàn)方式是,在擁有父分區(qū)的節(jié)點(diǎn)上將中間結(jié)果物化,簡(jiǎn)化容錯(cuò)處理,這跟MapReduce中物化map輸出很像。
如果某個(gè)任務(wù)失效,只要stage中的父RDD分區(qū)可用,則只需在另一個(gè)節(jié)點(diǎn)上重新運(yùn)行這個(gè)任務(wù)即可。如果某些stage不可用(例如,Shuffle時(shí)某個(gè)map輸出丟失),則需要重新提交這個(gè)stage中的所有任務(wù)來計(jì)算丟失的分區(qū)。
最后,lookup動(dòng)作允許用戶從一個(gè)哈希或范圍分區(qū)的RDD上,根據(jù)關(guān)鍵字讀取一個(gè)數(shù)據(jù)元素。這里有一個(gè)設(shè)計(jì)問題。Driver程序調(diào)用lookup時(shí),只需要使用當(dāng)前調(diào)度器接口計(jì)算關(guān)鍵字所在的那個(gè)分區(qū)。當(dāng)然任務(wù)也可以在集群上調(diào)用lookup,這時(shí)可以將RDD視為一個(gè)大的分布式哈希表。這種情況下,任務(wù)和被查詢的RDD之間的并沒有明確的依賴關(guān)系(因?yàn)閣orker執(zhí)行的是lookup),如果所有節(jié)點(diǎn)上都沒有相應(yīng)的緩存分區(qū),那么任務(wù)需要告訴調(diào)度器計(jì)算哪些RDD來完成查找操作。
5.3 檢查點(diǎn)
盡管RDD中的Lineage信息可以用來故障恢復(fù),但對(duì)于那些Lineage鏈較長(zhǎng)的RDD來說,這種恢復(fù)可能很耗時(shí)。例如4.3小節(jié)中的Pregel任務(wù),每次迭代的頂點(diǎn)狀態(tài)和消息都跟前一次迭代有關(guān),所以Lineage鏈很長(zhǎng)。如果將Lineage鏈存到物理存儲(chǔ)中,再定期對(duì)RDD執(zhí)行檢查點(diǎn)操作就很有效。
一般來說,Lineage鏈較長(zhǎng)、寬依賴的RDD需要采用檢查點(diǎn)機(jī)制。這種情況下,集群的節(jié)點(diǎn)故障可能導(dǎo)致每個(gè)父RDD的數(shù)據(jù)塊丟失,因此需要全部重新計(jì)算[20]。將窄依賴的RDD數(shù)據(jù)存到物理存儲(chǔ)中可以實(shí)現(xiàn)優(yōu)化,例如前面4.1小節(jié)邏輯回歸的例子,將數(shù)據(jù)點(diǎn)和不變的頂點(diǎn)狀態(tài)存儲(chǔ)起來,就不再需要檢查點(diǎn)操作。
當(dāng)前Spark版本提供檢查點(diǎn)API,但由用戶決定是否需要執(zhí)行檢查點(diǎn)操作。今后我們將實(shí)現(xiàn)自動(dòng)檢查點(diǎn),根據(jù)成本效益分析確定RDD Lineage圖中的最佳檢查點(diǎn)位置。
值得注意的是,因?yàn)镽DD是只讀的,所以不需要任何一致性維護(hù)(例如寫復(fù)制策略,分布式快照或者程序暫停等)帶來的開銷,后臺(tái)執(zhí)行檢查點(diǎn)操作。
我們使用10000行Scala代碼實(shí)現(xiàn)了Spark。系統(tǒng)可以使用任何Hadoop數(shù)據(jù)源(如HDFS,Hbase)作為輸入,這樣很容易與Hadoop環(huán)境集成。Spark以庫的形式實(shí)現(xiàn),不需要修改Scala編譯器。
這里討論關(guān)于實(shí)現(xiàn)的三方面問題:(1)修改Scala解釋器,允許交互模式使用Spark(6.1);(2)緩存管理(6.2);(3)調(diào)試工具rddbg(6.3)。
6. 實(shí)現(xiàn)
6.1 解釋器的集成
像Ruby和Python一樣,Scala也有一個(gè)交互式shell。基于內(nèi)存的數(shù)據(jù)可以實(shí)現(xiàn)低延時(shí),我們希望允許用戶從解釋器交互式地運(yùn)行Spark,從而在大數(shù)據(jù)集上實(shí)現(xiàn)大規(guī)模并行數(shù)據(jù)挖掘。
Scala解釋器通常根據(jù)將用戶輸入的代碼行,來對(duì)類進(jìn)行編譯,接著裝載到JVM中,然后調(diào)用類的函數(shù)。這個(gè)類是一個(gè)包含輸入行變量或函數(shù)的單例對(duì)象,并在一個(gè)初始化函數(shù)中運(yùn)行這行代碼。例如,如果用戶輸入代碼var x = 5,接著又輸入println(x),則解釋器會(huì)定義一個(gè)包含x的Line1類,并將第2行編譯為println(Line1.getInstance().x)。
在Spark中我們對(duì)解釋器做了兩點(diǎn)改動(dòng):
圖7?Spark解釋器如何將用戶輸入的兩行代碼解釋為Java對(duì)象
Spark解釋器便于跟蹤處理大量對(duì)象關(guān)系引用,并且便利了HDFS數(shù)據(jù)集的研究。我們計(jì)劃以Spark解釋器為基礎(chǔ),開發(fā)提供高級(jí)數(shù)據(jù)分析語言支持的交互式工具,比如類似SQL和Matlab。
6.2 緩存管理
Worker節(jié)點(diǎn)將RDD分區(qū)以Java對(duì)象的形式緩存在內(nèi)存中。由于大部分操作是基于掃描的,采取RDD級(jí)的LRU(最近最少使用)替換策略(即不會(huì)為了裝載一個(gè)RDD分區(qū)而將同一RDD的其他分區(qū)替換出去)。目前這種簡(jiǎn)單的策略適合大多數(shù)用戶應(yīng)用。另外,使用帶參數(shù)的cache操作可以設(shè)定RDD的緩存優(yōu)先級(jí)。
6.3 rddbg:RDD程序的調(diào)試工具
RDD的初衷是為了實(shí)現(xiàn)容錯(cuò)以能夠再計(jì)算(re-computation),這個(gè)特性使得調(diào)試更容易。我們創(chuàng)建了一個(gè)名為rddbg的調(diào)試工具,它是通過基于程序記錄的Lineage信息來實(shí)現(xiàn)的,允許用戶:(1)重建任何由程序創(chuàng)建的RDD,并執(zhí)行交互式查詢;(2)使用一個(gè)單進(jìn)程Java調(diào)試器(如jdb)傳入計(jì)算好的RDD分區(qū),能夠重新運(yùn)行作業(yè)中的任何任務(wù)。
我們強(qiáng)調(diào)一下,rddbg不是一個(gè)完全重放的調(diào)試器:特別是不對(duì)非確定性的代碼或動(dòng)作進(jìn)行重放。但如果某個(gè)任務(wù)一直運(yùn)行很慢(比如由于數(shù)據(jù)分布不均勻或者異常輸入等原因),仍然可以用它來幫助找到其中的邏輯錯(cuò)誤和性能問題。
舉個(gè)例子,我們使用rddbg去解決用戶Spam分類作業(yè)中的一個(gè)bug,這個(gè)作業(yè)中的每次迭代都產(chǎn)生0值。在調(diào)試器中重新執(zhí)行reduce任務(wù),很快就能發(fā)現(xiàn),輸入的權(quán)重向量(存儲(chǔ)在一個(gè)用戶自定義的向量類中)竟然是空值。由于從一個(gè)未初始化的稀疏向量中讀取總是返回0,運(yùn)行時(shí)也不會(huì)拋出異常。在這個(gè)向量類中設(shè)置一個(gè)斷點(diǎn),然后運(yùn)行這個(gè)任務(wù),引導(dǎo)程序很快就運(yùn)行到設(shè)置的斷點(diǎn)處,我們發(fā)現(xiàn)向量類的一個(gè)數(shù)組字段的值為空,我們?cè)\斷出了這個(gè)bug:稀疏向量類中的數(shù)據(jù)字段被錯(cuò)誤地使用transient來修飾,導(dǎo)致序列化時(shí)忽略了該字段的數(shù)據(jù)。
rddbg給程序執(zhí)行帶來的開銷很小。程序本來就需要將各個(gè)RDD中的所有閉包序列化并通過網(wǎng)絡(luò)傳送,只不過使用rddbg同時(shí)還要將這些閉集記錄到磁盤。
7. 評(píng)估
我們?cè)贏mazon EC2[1]上進(jìn)行了一系列實(shí)驗(yàn)來評(píng)估Spark及RDD的性能,并與Hadoop及其他應(yīng)用程序的基準(zhǔn)進(jìn)行了對(duì)比。總的說來,結(jié)果如下:
(1)對(duì)于迭代式機(jī)器學(xué)習(xí)應(yīng)用,Spark比Hadoop快20多倍。這種加速比是因?yàn)?#xff1a;數(shù)據(jù)存儲(chǔ)在內(nèi)存中,同時(shí)Java對(duì)象緩存避免了反序列化操作。
(2)用戶編寫的應(yīng)用程序執(zhí)行結(jié)果很好。例如,Spark分析報(bào)表比Hadoop快40多倍。
(3)如果節(jié)點(diǎn)發(fā)生失效,通過重建那些丟失的RDD分區(qū),Spark能夠?qū)崿F(xiàn)快速恢復(fù)。
(4)Spark能夠在5-7s延時(shí)范圍內(nèi),交互式地查詢1TB大小的數(shù)據(jù)集。
我們基準(zhǔn)測(cè)試首先從一個(gè)運(yùn)行在Hadoop上的具有迭代特征的機(jī)器學(xué)習(xí)應(yīng)用(7.1)和PageRank(7.2)開始,然后評(píng)估在Spark中當(dāng)工作集不能適應(yīng)緩存(7.4)時(shí)系統(tǒng)容錯(cuò)恢復(fù)能力(7.3),最后討論用戶應(yīng)用程序(7.5)和交互式數(shù)據(jù)挖掘(7.6)的結(jié)果。
除非特殊說明,我們的實(shí)驗(yàn)使用m1.xlarge EC2 節(jié)點(diǎn),4核15GB內(nèi)存,使用HDFS作為持久存儲(chǔ),塊大小為256M。在每個(gè)作業(yè)運(yùn)行執(zhí)行時(shí),為了保證磁盤讀時(shí)間更加精確,我們清理了集群中每個(gè)節(jié)點(diǎn)的操作系統(tǒng)緩存。
7.1 可迭代的機(jī)器學(xué)習(xí)應(yīng)用
我們實(shí)現(xiàn)了2個(gè)迭代式機(jī)器學(xué)習(xí)(ML)應(yīng)用,Logistic回歸和K-means算法,與如下系統(tǒng)進(jìn)行性能對(duì)比:
- Hadoop:Hadoop 0.20.0穩(wěn)定版。
- HadoopBinMem:在首輪迭代中執(zhí)行預(yù)處理,通過將輸入數(shù)據(jù)轉(zhuǎn)換成為開銷較低的二進(jìn)制格式來減少后續(xù)迭代過程中文本解析的開銷,在HDFS中加載到內(nèi)存。
- Spark:基于RDD的系統(tǒng),在首輪迭代中緩存Java對(duì)象以減少后續(xù)迭代過程中解析、反序列化的開銷。
我們使用同一數(shù)據(jù)集在相同條件下運(yùn)行Logistic回歸和K-means算法:使用400個(gè)任務(wù)(每個(gè)任務(wù)處理的輸入數(shù)據(jù)塊大小為256M),在25-100臺(tái)機(jī)器,執(zhí)行10次迭代處理100G輸入數(shù)據(jù)集(表4)。兩個(gè)作業(yè)的關(guān)鍵區(qū)別在于每輪迭代單個(gè)字節(jié)的計(jì)算量不同。K-means的迭代時(shí)間取決于更新聚類坐標(biāo)耗時(shí),Logistic回歸是非計(jì)算密集型的,但是在序列化和解析過程中非常耗時(shí)。
由于典型的機(jī)器學(xué)習(xí)算法需要數(shù)10輪迭代,然后再合并,我們分別統(tǒng)計(jì)了首輪迭代和后續(xù)迭代計(jì)算的耗時(shí),并從中發(fā)現(xiàn),在內(nèi)存中緩存RDD極大地加快了后續(xù)迭代的速度。
| 應(yīng)用 | 數(shù)據(jù)描述 | 大小 |
| Logistic回歸 | 10億9維點(diǎn)數(shù)據(jù) | 100G |
| K-means | 10億10維點(diǎn)數(shù)據(jù)(k=10) | 100G |
| PageRank | 400萬Wikipedia文章超鏈接圖 | 49G |
| 交互式數(shù)據(jù)挖掘 | Wikipedia瀏覽日志(2008-10~2009-4) | 1TB |
首輪迭代。在首輪迭代過程中,三個(gè)系統(tǒng)都是從HDFS中讀取文本數(shù)據(jù)作為輸入。圖9中“First Iteration”顯示了首輪迭代的柱狀圖,實(shí)驗(yàn)中Spark快于Hadoop,主要是因?yàn)镠adoop中的各個(gè)分布式組件基于心跳協(xié)議來發(fā)送信號(hào)帶來了開銷。HadoopBinMem是最慢的,因?yàn)樗ㄟ^一個(gè)額外的MapReduce作業(yè)將數(shù)據(jù)轉(zhuǎn)換成二進(jìn)制格式。
圖8?首輪迭代后Hadoop、HadoopBinMen、Spark運(yùn)行時(shí)間對(duì)比
后續(xù)迭代。圖9顯示了后續(xù)迭代的平均耗時(shí),圖8對(duì)比了不同聚類大小條件下耗時(shí)情況,我們發(fā)現(xiàn)在100個(gè)節(jié)點(diǎn)上運(yùn)行Logistic回歸程序,Spark比Hadoop、HadoopBinMem分別快25.3、20.7倍。從圖8(b)可以看到,Spark僅僅比Hadoop、HadoopBinMem分別快1.9、3.2倍,這是因?yàn)镵-means程序的開銷取決于計(jì)算(用更多的節(jié)點(diǎn)有助于提高計(jì)算速度的倍數(shù))。
后續(xù)迭代中,Hadoop仍然從HDFS讀取文本數(shù)據(jù)作為輸入,所以從首輪迭代開始Hadoop的迭代時(shí)間并沒有明顯的改善。使用預(yù)先轉(zhuǎn)換的SequenceFile文件(Hadoop內(nèi)建的二進(jìn)制文件格式),HadoopBinMem在后續(xù)迭代中節(jié)省了解析的代價(jià),但是仍然帶來的其他的開銷,如從HDFS讀SequenceFile文件并轉(zhuǎn)換成Java對(duì)象。因?yàn)镾park直接讀取緩存于RDD中的Java對(duì)象,隨著聚類尺寸的線性增長(zhǎng),迭代時(shí)間大幅下降。
圖9:首輪及其后續(xù)迭代平均時(shí)間對(duì)比
理解速度提升。我們非常驚奇地發(fā)現(xiàn),Spark甚至勝過了基于內(nèi)存存儲(chǔ)二進(jìn)制數(shù)據(jù)的Hadoop(HadoopBinMem),幅度高達(dá)20倍之多,Hadoop運(yùn)行慢是由于如下幾個(gè)原因:
為了估測(cè)1,我們運(yùn)行空的Hadoop作業(yè),僅僅執(zhí)行作業(yè)的初始化、啟動(dòng)任務(wù)、清理工作就至少耗時(shí)25秒。對(duì)于2,我們發(fā)現(xiàn)為了服務(wù)每一個(gè)HDFS數(shù)據(jù)塊,HDFS進(jìn)行了多次復(fù)制以及計(jì)算校驗(yàn)和操作。
為了估測(cè)3,我們?cè)趩蝹€(gè)節(jié)點(diǎn)上運(yùn)行了微基準(zhǔn)程序,在輸入的256M數(shù)據(jù)上計(jì)算Logistic回歸,結(jié)果如表5所示。首先,在內(nèi)存中的HDFS文件和本地文件的不同導(dǎo)致通過HDFS接口讀取耗時(shí)2秒,甚至數(shù)據(jù)就在本地內(nèi)存中。其次,文本和二進(jìn)制格式輸入的不同造成了解析耗時(shí)7秒的開銷。最后,預(yù)解析的二進(jìn)制文件轉(zhuǎn)換為內(nèi)存中的Java對(duì)象,耗時(shí)3秒。每個(gè)節(jié)點(diǎn)處理多個(gè)塊時(shí)這些開銷都會(huì)累積起來,然而通過緩存RDD作為內(nèi)存中的Java對(duì)象,Spark只需要耗時(shí)3秒。
| 內(nèi)存中的HDFS文件 | 內(nèi)存中的本地文件 | 緩存的RDD | |
| 文本輸入 二進(jìn)制輸入 | 15.38 (0.26) 8.38 (0.10) | 13.13 (0.26) 6.86 (0.02) | 2.93 (0.31) 2.93 (0.31) |
7.2 PageRank
通過使用存儲(chǔ)在HDFS上的49G Wikipedia導(dǎo)出數(shù)據(jù),我們比較了使用RDD實(shí)現(xiàn)的Pregel與使用Hadoop計(jì)算PageRank的性能。PageRank算法通過10輪迭代處理了大約400萬文章的鏈接圖數(shù)據(jù),圖10顯示了在30個(gè)節(jié)點(diǎn)上,Spark處理速度是Hadoop的2倍多,改進(jìn)后對(duì)輸入進(jìn)行Hash分區(qū)速度提升到2.6倍,使用Combiner后提升到3.6倍,這些結(jié)果數(shù)據(jù)也隨著節(jié)點(diǎn)擴(kuò)展到60個(gè)時(shí)同步放大。
圖10?迭代時(shí)間對(duì)比
7.3 容錯(cuò)恢復(fù)
基于K-means算法應(yīng)用程序,我們?cè)u(píng)估了在單點(diǎn)故障(SPOF)時(shí)使用Lneage信息創(chuàng)建RDD分區(qū)的開銷。圖11顯示了,K-means應(yīng)用程序運(yùn)行在75個(gè)節(jié)點(diǎn)的集群中進(jìn)行了10輪迭代,我們?cè)谡2僮骱瓦M(jìn)行第6輪迭代開始時(shí)一個(gè)節(jié)點(diǎn)發(fā)生故障的情況下對(duì)耗時(shí)進(jìn)行了對(duì)比。沒有任何失敗,每輪迭代啟動(dòng)了400個(gè)任務(wù)處理100G數(shù)據(jù)。
圖11?SPOF時(shí)K-means應(yīng)用程序迭代時(shí)間
第5輪迭代結(jié)束時(shí)大約耗時(shí)58秒,第6輪迭代時(shí)Kill掉一個(gè)節(jié)點(diǎn),該節(jié)點(diǎn)上的任務(wù)都被終止(包括緩存的分區(qū)數(shù)據(jù))。Spark調(diào)度器調(diào)度這些任務(wù)在其他節(jié)點(diǎn)上重新并行運(yùn)行,并且重新讀取基于Lineage信息重建的RDD輸入數(shù)據(jù)并進(jìn)行緩存,這使得迭代計(jì)算耗時(shí)增加到80秒。一旦丟失的RDD分區(qū)被重建,平均迭代時(shí)間又回落到58秒。
7.4 內(nèi)存不足時(shí)表現(xiàn)
到現(xiàn)在為止,我們能保證集群中的每個(gè)節(jié)點(diǎn)都有足夠的內(nèi)存去緩存迭代過程中使用的RDD,如果沒有足夠的內(nèi)存來緩存一個(gè)作業(yè)的工作集,Spark又是如何運(yùn)行的呢?在實(shí)驗(yàn)中,我們通過在每個(gè)節(jié)點(diǎn)上限制緩存RDD所需要的內(nèi)存資源來配置Spark,在不同的緩存配置條件下執(zhí)行Logistic回歸,結(jié)果如圖12。我們可以看出,隨著緩存的減小,性能平緩地下降。
圖12?Spark上運(yùn)行Logistic回歸的性能表現(xiàn)
7.5 基于Spark構(gòu)建的用戶應(yīng)用程序
In-Memory分析。視頻分發(fā)公司Conviva使用Spark極大地提升了為客戶處理分析報(bào)告的速度,以前基于Hadoop使用大約20個(gè)Hive[3]查詢來完成,這些查詢作用在相同的數(shù)據(jù)子集上(滿足用戶提供的條件),但是在不同分組的字段上執(zhí)行聚合操作(SUM、AVG、COUNT DISTINCT等)需要使用單獨(dú)的MapReduce作業(yè)。該公司使用Spark只需要將相關(guān)數(shù)據(jù)加載到內(nèi)存中一次,然后運(yùn)行上述聚合操作,在Hadoop集群上處理200G壓縮數(shù)據(jù)并生成報(bào)耗時(shí)20小時(shí),而使用Spark基于96G內(nèi)存的2個(gè)節(jié)點(diǎn)耗時(shí)30分鐘即可完成,速度提升40倍,主要是因?yàn)椴恍枰賹?duì)每個(gè)作業(yè)重復(fù)地執(zhí)行解壓縮和過濾操作。
城市交通建模。在Berkeley的Mobile Millennium項(xiàng)目[17]中,基于一系列分散的汽車GPS監(jiān)測(cè)數(shù)據(jù),研究人員使用并行化機(jī)器學(xué)習(xí)算法來推算公路交通擁堵狀況。數(shù)據(jù)來自市區(qū)10000個(gè)互聯(lián)的公路線路網(wǎng),還有600000個(gè)由汽車GPS裝置采集到的樣本數(shù)據(jù),這些數(shù)據(jù)記錄了汽車在兩個(gè)地點(diǎn)之間行駛的時(shí)間(每一條路線的行駛時(shí)間可能跨多個(gè)公路線路網(wǎng))。使用一個(gè)交通模型,通過推算跨多個(gè)公路網(wǎng)行駛耗時(shí)預(yù)期,系統(tǒng)能夠估算擁堵狀況。研究人員使用Spark實(shí)現(xiàn)了一個(gè)可迭代的EM算法,其中包括向Worker節(jié)點(diǎn)廣播路線網(wǎng)絡(luò)信息,在E和M階段之間執(zhí)行reduceByKey操作,應(yīng)用從20個(gè)節(jié)點(diǎn)擴(kuò)展到80個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)4核),如圖13(a)所示:
圖13?每輪迭代運(yùn)行時(shí)間(a)交通建模應(yīng)用程序(b)基于Spark的社交網(wǎng)絡(luò)的Spam分類
社交網(wǎng)絡(luò)Spam分類。Berkeley的Monarch項(xiàng)目[31]使用Spark識(shí)別Twitter消息上的Spam鏈接。他們?cè)赟park上實(shí)現(xiàn)了一個(gè)類似7.1小節(jié)中示例的Logistic回歸分類器,不同的是使用分布式的reduceByKey操作并行對(duì)梯度向量求和。圖13(b)顯示了基于50G數(shù)據(jù)子集訓(xùn)練訓(xùn)練分類器的結(jié)果,整個(gè)數(shù)據(jù)集是250000的URL、至少10^7個(gè)與網(wǎng)絡(luò)相關(guān)的特征/維度,內(nèi)容、詞性與訪問一個(gè)URL的頁面相關(guān)。隨著節(jié)點(diǎn)的增加,這并不像交通應(yīng)用程序那樣近似線性,主要是因?yàn)槊枯喌墓潭ㄍㄐ糯鷥r(jià)較高。
7.6 交互式數(shù)據(jù)挖掘
為了展示Spark交互式處理大數(shù)據(jù)集的能力,我們?cè)?00個(gè)m2.4xlarge EC2實(shí)例(8核68G內(nèi)存)上使用Spark分析1TB從2008-10到2009-4這段時(shí)間的Wikipedia頁面瀏覽日志數(shù)據(jù),在整個(gè)輸入數(shù)據(jù)集上簡(jiǎn)單地查詢?nèi)缦聝?nèi)容以獲取頁面瀏覽總數(shù):(1)全部頁面;(2)頁面的標(biāo)題能精確匹配給定的關(guān)鍵詞;(3)頁面的標(biāo)題能部分匹配給定的關(guān)鍵詞。
圖14?顯示了分別在整個(gè)、1/2、1/10的數(shù)據(jù)上查詢的響應(yīng)時(shí)間,甚至1TB數(shù)據(jù)在Spark上查詢僅耗時(shí)5-7秒,這比直接操作磁盤數(shù)據(jù)快幾個(gè)數(shù)量級(jí)。例如,從磁盤上查詢1TB數(shù)據(jù)耗時(shí)170秒,這表明了RDD緩存使得Spark成為一個(gè)交互式數(shù)據(jù)挖掘的強(qiáng)大工具。
8. 相關(guān)工作
分布式共享內(nèi)存(DSM)。RDD可以看成是一個(gè)基于DSM研究[24]得到的抽象。在2.5節(jié)我們討論過,RDD提供了一個(gè)比DSM限制更嚴(yán)格的編程模型,并能在節(jié)點(diǎn)失效時(shí)高效地重建數(shù)據(jù)集。DSM通過檢查點(diǎn)[19]實(shí)現(xiàn)容錯(cuò),而Spark使用Lineage重建RDD分區(qū),這些分區(qū)可以在不同的節(jié)點(diǎn)上重新并行處理,而不需要將整個(gè)程序回退到檢查點(diǎn)再重新運(yùn)行。RDD能夠像MapReduce一樣將計(jì)算推向數(shù)據(jù)[12],并通過推測(cè)執(zhí)行來解決某些任務(wù)計(jì)算進(jìn)度落后的問題,推測(cè)執(zhí)行在一般的DSM系統(tǒng)上是很難實(shí)現(xiàn)的。
In-Memory集群計(jì)算。Piccolo[28]是一個(gè)基于可變的、In-Memory的分布式表的集群編程模型。因?yàn)镻iccolo允許讀寫表中的記錄,它具有與DSM類似的恢復(fù)機(jī)制,需要檢查點(diǎn)和回滾,但是不能推測(cè)執(zhí)行,也沒有提供類似groupBy、sort等更高級(jí)別的數(shù)據(jù)流算子,用戶只能直接讀取表單元數(shù)據(jù)來實(shí)現(xiàn)。可見,Piccolo是比Spark更低級(jí)別的編程模型,但是比DSM要高級(jí)。
RAMClouds[26]適合作為Web應(yīng)用的存儲(chǔ)系統(tǒng),它同樣提供了細(xì)粒度讀寫操作,所以需要通過記錄日志來實(shí)現(xiàn)容錯(cuò)。
數(shù)據(jù)流系統(tǒng)。RDD借鑒了DryadLINQ[34]、Pig[25]和FlumeJava[9]的“并行收集”編程模型,通過允許用戶顯式地將未序列化的對(duì)象保存在內(nèi)存中,以此來控制分區(qū)和基于key隨機(jī)查找,從而有效地支持基于工作集的應(yīng)用。RDD保留了那些數(shù)據(jù)流系統(tǒng)更高級(jí)別的編程特性,這對(duì)那些開發(fā)人員來說也比較熟悉,而且,RDD也能夠支持更多類型的應(yīng)用。RDD新增的擴(kuò)展,從概念上看很簡(jiǎn)單,其中Spark是第一個(gè)使用了這些特性的系統(tǒng),類似DryadLINQ編程模型,能夠有效地支持基于工作集的應(yīng)用。
面向基于工作集的應(yīng)用,已經(jīng)開發(fā)了一些專用系統(tǒng),像Twister[13]、HaLoop[8]實(shí)現(xiàn)了一個(gè)支持迭代的MapReduce模型;Pregel[21],支持圖應(yīng)用的BSP計(jì)算模型。RDD是一個(gè)更通用的抽象,它能夠描述支持迭代的MapReduce、Pregel,還有現(xiàn)有一些系統(tǒng)未能處理的應(yīng)用,如交互式數(shù)據(jù)挖掘。特別地,它能夠讓開發(fā)人員動(dòng)態(tài)地選擇操作來運(yùn)行在RDD上(如查看查詢的結(jié)果以決定下一步運(yùn)行哪個(gè)查詢),而不是提供一系列固定的步驟去執(zhí)行迭代,RDD還支持更多類型的轉(zhuǎn)換。
最后,Dremel[22]是一個(gè)低延遲查詢引擎,它面向基于磁盤存儲(chǔ)的大數(shù)據(jù)集,這類數(shù)據(jù)集是把嵌套記錄數(shù)據(jù)生成基于列的格式。這種格式的數(shù)據(jù)也能夠保存為RDD并在Spark系統(tǒng)中使用,但Spark也具備將數(shù)據(jù)加載到內(nèi)存來實(shí)現(xiàn)快速查詢的能力。
Lineage。我們通過參考[6]到[10]做過調(diào)研,在科學(xué)計(jì)算和數(shù)據(jù)庫領(lǐng)域,對(duì)于一些應(yīng)用,如需要解釋結(jié)果以及允許被重新生成、工作流中發(fā)現(xiàn)了bug或者數(shù)據(jù)集丟失需要重新處理數(shù)據(jù),表示數(shù)據(jù)的Lineage和原始信息一直以來都是一個(gè)研究課題。RDD提供了一個(gè)受限的編程模型,在這個(gè)模型中使用細(xì)粒度的Lineage來表示是非常容易的,因此它可以被用于容錯(cuò)。
緩存系統(tǒng)。Nectar[14]能夠通過識(shí)別帶有程序分析的子表達(dá)式,跨DryadLINQ作業(yè)重用中間結(jié)果,如果將這種能力加入到基于RDD的系統(tǒng)會(huì)非常有趣。但是Nectar并沒有提供In-Memory緩存,也不能夠讓用戶顯式地控制應(yīng)該緩存那個(gè)數(shù)據(jù)集,以及如何對(duì)其進(jìn)行分區(qū)。Ciel[23]同樣能夠記住任務(wù)結(jié)果,但不能提供In-Memory緩存并顯式控制它。
語言迭代。DryadLINQ[34]能夠使用LINQ獲取到表達(dá)式樹然后在集群上運(yùn)行,Spark系統(tǒng)的語言集成與它很類似。不像DryadLINQ,Spark允許用戶顯式地跨查詢將RDD存儲(chǔ)到內(nèi)存中,并通過控制分區(qū)來優(yōu)化通信。Spark支持交互式處理,但DryadLINQ卻不支持。
關(guān)系數(shù)據(jù)庫。從概念上看,RDD類似于數(shù)據(jù)庫中的視圖,緩存RDD類似于物化視圖[29]。然而,數(shù)據(jù)庫像DSM系統(tǒng)一樣,允許典型地讀寫所有記錄,通過記錄操作和數(shù)據(jù)的日志來實(shí)現(xiàn)容錯(cuò),還需要花費(fèi)額外的開銷來維護(hù)一致性。RDD編程模型通過增加更多限制來避免這些開銷。
9. 總結(jié)
我們提出的RDD是一個(gè)面向,運(yùn)行在普通商用機(jī)集群之上并行數(shù)據(jù)處理應(yīng)用的分布式內(nèi)存抽象。RDD廣泛支持基于工作集的應(yīng)用,包括迭代式機(jī)器學(xué)習(xí)和圖算法,還有交互式數(shù)據(jù)挖掘,然而它保留了數(shù)據(jù)流模型中引人注目的特點(diǎn),如自動(dòng)容錯(cuò)恢復(fù),處理執(zhí)行進(jìn)度落后的任務(wù),以及感知調(diào)度。它是通過限制編程模型,進(jìn)而允許高效地重建RDD分區(qū)來實(shí)現(xiàn)的。RDD實(shí)現(xiàn)處理迭代式作業(yè)的速度超過Hadoop大約20倍,而且還能夠交互式查詢數(shù)百G數(shù)據(jù)。
轉(zhuǎn)載于:https://www.cnblogs.com/wuwuwu/p/6162615.html
總結(jié)
以上是生活随笔為你收集整理的RDD:基于内存的集群计算容错抽象的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: WEB前端笔试题(4)
- 下一篇: Android ContentProvi